1use crate::common::truncate_text;
34use crate::config::Verbosity;
35use crate::logger::{Colors, CHECK, CROSS};
36use std::cell::RefCell;
37use std::fmt::Write as _;
38use std::io::{self, BufRead, Write};
39use std::rc::Rc;
40
41use super::delta_display::{DeltaRenderer, TextDeltaRenderer};
42use super::health::HealthMonitor;
43#[cfg(feature = "test-utils")]
44use super::health::StreamingQualityMetrics;
45use super::printer::SharedPrinter;
46use super::streaming_state::StreamingSession;
47use super::terminal::TerminalMode;
48use super::types::{format_tool_input, format_unknown_json_event, ContentType, GeminiEvent};
49
50pub struct GeminiParser {
52 colors: Colors,
53 verbosity: Verbosity,
54 log_file: Option<String>,
55 display_name: String,
56 streaming_session: Rc<RefCell<StreamingSession>>,
58 terminal_mode: RefCell<TerminalMode>,
60 show_streaming_metrics: bool,
62 printer: SharedPrinter,
64}
65
66impl GeminiParser {
67 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
68 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
69 }
70
71 pub(crate) fn with_printer(
83 colors: Colors,
84 verbosity: Verbosity,
85 printer: SharedPrinter,
86 ) -> Self {
87 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
88 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
89
90 let _printer_is_terminal = printer.borrow().is_terminal();
92
93 Self {
94 colors,
95 verbosity,
96 log_file: None,
97 display_name: "Gemini".to_string(),
98 streaming_session: Rc::new(RefCell::new(streaming_session)),
99 terminal_mode: RefCell::new(TerminalMode::detect()),
100 show_streaming_metrics: false,
101 printer,
102 }
103 }
104
105 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
106 self.show_streaming_metrics = show;
107 self
108 }
109
110 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
111 self.display_name = display_name.to_string();
112 self
113 }
114
115 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
116 self.log_file = Some(path.to_string());
117 self
118 }
119
120 #[cfg(test)]
121 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
122 *self.terminal_mode.borrow_mut() = mode;
123 self
124 }
125
126 #[cfg(any(test, feature = "test-utils"))]
131 pub fn with_printer_for_test(
132 colors: Colors,
133 verbosity: Verbosity,
134 printer: SharedPrinter,
135 ) -> Self {
136 Self::with_printer(colors, verbosity, printer)
137 }
138
139 #[cfg(any(test, feature = "test-utils"))]
143 pub fn with_log_file_for_test(mut self, path: &str) -> Self {
144 self.log_file = Some(path.to_string());
145 self
146 }
147
148 #[cfg(any(test, feature = "test-utils"))]
152 pub fn parse_stream_for_test<R: std::io::BufRead>(&self, reader: R) -> std::io::Result<()> {
153 self.parse_stream(reader)
154 }
155
156 #[cfg(feature = "test-utils")]
166 pub fn printer(&self) -> SharedPrinter {
167 Rc::clone(&self.printer)
168 }
169
170 #[cfg(feature = "test-utils")]
179 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
180 self.streaming_session
181 .borrow()
182 .get_streaming_quality_metrics()
183 }
184
185 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
192 let event: GeminiEvent = if let Ok(e) = serde_json::from_str(line) {
193 e
194 } else {
195 let trimmed = line.trim();
197 if !trimmed.is_empty() && !trimmed.starts_with('{') {
198 return Some(format!("{trimmed}\n"));
199 }
200 return None;
201 };
202 let c = &self.colors;
203 let prefix = &self.display_name;
204
205 let output = match event {
206 GeminiEvent::Init {
207 session_id, model, ..
208 } => self.format_init_event(session_id, model),
209 GeminiEvent::Message {
210 role,
211 content,
212 delta,
213 } => self.format_message_event(role, content, delta),
214 GeminiEvent::ToolUse {
215 tool_name,
216 parameters,
217 ..
218 } => self.format_tool_use_event(tool_name, parameters.as_ref()),
219 GeminiEvent::ToolResult { status, output, .. } => {
220 self.format_tool_result_event(status, output.as_ref())
221 }
222 GeminiEvent::Error { message, code, .. } => self.format_error_event(message, code),
223 GeminiEvent::Result { status, stats, .. } => self.format_result_event(status, stats),
224 GeminiEvent::Unknown => {
225 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
227 }
228 };
229
230 if output.is_empty() {
231 None
232 } else {
233 Some(output)
234 }
235 }
236
237 fn format_init_event(&self, session_id: Option<String>, model: Option<String>) -> String {
239 let c = &self.colors;
240 let prefix = &self.display_name;
241
242 self.streaming_session.borrow_mut().on_message_start();
244 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
245 self.streaming_session
247 .borrow_mut()
248 .set_current_message_id(Some(sid.clone()));
249 let model_str = model.unwrap_or_else(|| "unknown".to_string());
250 format!(
251 "{}[{}]{} {}Session started{} {}({:.8}..., {}){}\n",
252 c.dim(),
253 prefix,
254 c.reset(),
255 c.cyan(),
256 c.reset(),
257 c.dim(),
258 sid,
259 model_str,
260 c.reset()
261 )
262 }
263
264 fn format_message_event(
266 &self,
267 role: Option<String>,
268 content: Option<String>,
269 delta: Option<bool>,
270 ) -> String {
271 let c = &self.colors;
272 let prefix = &self.display_name;
273
274 let role_str = role.unwrap_or_else(|| "unknown".to_string());
275 let is_delta = delta.unwrap_or(false);
276
277 if let Some(text) = content {
278 if is_delta && role_str == "assistant" {
279 let (show_prefix, accumulated_text) = {
281 let mut session = self.streaming_session.borrow_mut();
282 let show_prefix = session.on_text_delta_key("main", &text);
283 let accumulated_text = session
285 .get_accumulated(ContentType::Text, "main")
286 .unwrap_or("")
287 .to_string();
288 (show_prefix, accumulated_text)
289 };
290
291 let terminal_mode = *self.terminal_mode.borrow();
293 if show_prefix {
294 return TextDeltaRenderer::render_first_delta(
296 &accumulated_text,
297 prefix,
298 *c,
299 terminal_mode,
300 );
301 }
302 return TextDeltaRenderer::render_subsequent_delta(
304 &accumulated_text,
305 prefix,
306 *c,
307 terminal_mode,
308 );
309 } else if !is_delta && role_str == "assistant" {
310 let session = self.streaming_session.borrow();
312 let is_duplicate = session.get_current_message_id().map_or_else(
313 || session.has_any_streamed_content(),
314 |message_id| session.is_duplicate_final_message(message_id),
315 );
316 let was_streaming = session.has_any_streamed_content();
317 let metrics = session.get_streaming_quality_metrics();
318 drop(session);
319
320 let _was_in_block = self.streaming_session.borrow_mut().on_message_stop();
322
323 if is_duplicate || was_streaming {
325 let terminal_mode = *self.terminal_mode.borrow();
326 let completion = TextDeltaRenderer::render_completion(terminal_mode);
327 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
328 && metrics.total_deltas > 0;
329 if show_metrics {
330 return format!("{}\n{}", completion, metrics.format(*c));
331 }
332 return completion;
333 }
334
335 let limit = self.verbosity.truncate_limit("text");
337 let preview = truncate_text(&text, limit);
338
339 return format!(
340 "{}[{}]{} {}{}{}\n",
341 c.dim(),
342 prefix,
343 c.reset(),
344 c.white(),
345 preview,
346 c.reset()
347 );
348 }
349 let limit = self.verbosity.truncate_limit("text");
351 let preview = truncate_text(&text, limit);
352 return format!(
353 "{}[{}]{} {}{}:{} {}{}{}\n",
354 c.dim(),
355 prefix,
356 c.reset(),
357 c.blue(),
358 role_str,
359 c.reset(),
360 c.dim(),
361 preview,
362 c.reset()
363 );
364 }
365 String::new()
366 }
367
368 fn format_tool_use_event(
370 &self,
371 tool_name: Option<String>,
372 parameters: Option<&serde_json::Value>,
373 ) -> String {
374 let c = &self.colors;
375 let prefix = &self.display_name;
376
377 let tool_name = tool_name.unwrap_or_else(|| "unknown".to_string());
378 let mut out = format!(
379 "{}[{}]{} {}Tool{}: {}{}{}\n",
380 c.dim(),
381 prefix,
382 c.reset(),
383 c.magenta(),
384 c.reset(),
385 c.bold(),
386 tool_name,
387 c.reset()
388 );
389 if self.verbosity.show_tool_input() {
390 if let Some(params) = parameters {
391 let params_str = format_tool_input(params);
392 let limit = self.verbosity.truncate_limit("tool_input");
393 let preview = truncate_text(¶ms_str, limit);
394 if !preview.is_empty() {
395 let _ = writeln!(
396 out,
397 "{}[{}]{} {} └─ {}{}",
398 c.dim(),
399 prefix,
400 c.reset(),
401 c.dim(),
402 preview,
403 c.reset()
404 );
405 }
406 }
407 }
408 out
409 }
410
411 fn format_tool_result_event(&self, status: Option<String>, output: Option<&String>) -> String {
413 let c = &self.colors;
414 let prefix = &self.display_name;
415
416 let status_str = status.unwrap_or_else(|| "unknown".to_string());
417 let is_success = status_str == "success";
418 let icon = if is_success { CHECK } else { CROSS };
419 let color = if is_success { c.green() } else { c.red() };
420
421 let mut out = format!(
422 "{}[{}]{} {}{} Tool result{}\n",
423 c.dim(),
424 prefix,
425 c.reset(),
426 color,
427 icon,
428 c.reset()
429 );
430
431 if self.verbosity.is_verbose() {
432 if let Some(output_text) = output {
433 let limit = self.verbosity.truncate_limit("tool_result");
434 let preview = truncate_text(output_text, limit);
435 let _ = writeln!(
436 out,
437 "{}[{}]{} {} └─ {}{}",
438 c.dim(),
439 prefix,
440 c.reset(),
441 c.dim(),
442 preview,
443 c.reset()
444 );
445 }
446 }
447 out
448 }
449
450 fn format_error_event(&self, message: Option<String>, code: Option<String>) -> String {
452 let c = &self.colors;
453 let prefix = &self.display_name;
454
455 let msg = message.unwrap_or_else(|| "unknown error".to_string());
456 let code_str = code.map_or_else(String::new, |c| format!(" ({c})"));
457 format!(
458 "{}[{}]{} {}{} Error{}:{} {}\n",
459 c.dim(),
460 prefix,
461 c.reset(),
462 c.red(),
463 CROSS,
464 code_str,
465 c.reset(),
466 msg
467 )
468 }
469
470 fn format_result_event(
472 &self,
473 status: Option<String>,
474 event_stats: Option<crate::json_parser::types::GeminiStats>,
475 ) -> String {
476 let c = &self.colors;
477 let prefix = &self.display_name;
478
479 let status_result = status.unwrap_or_else(|| "unknown".to_string());
480 let is_success = status_result == "success";
481 let icon = if is_success { CHECK } else { CROSS };
482 let color = if is_success { c.green() } else { c.red() };
483
484 let stats_display = event_stats.map_or_else(String::new, |s| {
485 let duration_s = s.duration_ms.unwrap_or(0) / 1000;
486 let duration_m = duration_s / 60;
487 let duration_s_rem = duration_s % 60;
488 let input = s.input_tokens.unwrap_or(0);
489 let output = s.output_tokens.unwrap_or(0);
490 let tools = s.tool_calls.unwrap_or(0);
491 format!("({duration_m}m {duration_s_rem}s, in:{input} out:{output}, {tools} tools)")
492 });
493
494 format!(
495 "{}[{}]{} {}{} {}{} {}{}{}\n",
496 c.dim(),
497 prefix,
498 c.reset(),
499 color,
500 icon,
501 status_result,
502 c.reset(),
503 c.dim(),
504 stats_display,
505 c.reset()
506 )
507 }
508
509 const fn is_control_event(event: &GeminiEvent) -> bool {
515 match event {
516 GeminiEvent::Init { .. } | GeminiEvent::Result { .. } => true,
518 _ => false,
519 }
520 }
521
522 pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
524 use super::incremental_parser::IncrementalNdjsonParser;
525
526 let c = &self.colors;
527 let monitor = HealthMonitor::new("Gemini");
528 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
529 std::fs::OpenOptions::new()
530 .create(true)
531 .append(true)
532 .open(log_path)
533 .ok()
534 .map(std::io::BufWriter::new)
535 });
536
537 let mut incremental_parser = IncrementalNdjsonParser::new();
540 let mut byte_buffer = Vec::new();
541
542 loop {
543 byte_buffer.clear();
545 let chunk = reader.fill_buf()?;
546 if chunk.is_empty() {
547 break;
548 }
549
550 byte_buffer.extend_from_slice(chunk);
552 let consumed = chunk.len();
553 reader.consume(consumed);
554
555 let json_events = incremental_parser.feed(&byte_buffer);
557
558 for line in json_events {
560 let trimmed = line.trim();
561 if trimmed.is_empty() {
562 continue;
563 }
564
565 if self.verbosity.is_debug() {
567 let mut printer = self.printer.borrow_mut();
568 writeln!(
569 printer,
570 "{}[DEBUG]{} {}{}{}",
571 c.dim(),
572 c.reset(),
573 c.dim(),
574 &line,
575 c.reset()
576 )?;
577 printer.flush()?;
578 }
579
580 match self.parse_event(&line) {
582 Some(output) => {
583 monitor.record_parsed();
584 let mut printer = self.printer.borrow_mut();
586 write!(printer, "{output}")?;
587 printer.flush()?;
588 }
589 None => {
590 if trimmed.starts_with('{') {
592 if let Ok(event) = serde_json::from_str::<GeminiEvent>(&line) {
593 if Self::is_control_event(&event) {
594 monitor.record_control_event();
595 } else {
596 monitor.record_unknown_event();
598 }
599 } else {
600 monitor.record_parse_error();
602 }
603 } else {
604 monitor.record_ignored();
605 }
606 }
607 }
608
609 if let Some(ref mut file) = log_writer {
611 writeln!(file, "{line}")?;
612 }
613 }
614 }
615
616 if let Some(ref mut file) = log_writer {
617 file.flush()?;
618 let _ = file.get_mut().sync_all();
621 }
622 if let Some(warning) = monitor.check_and_warn(*c) {
623 let mut printer = self.printer.borrow_mut();
624 writeln!(printer, "{warning}\n")?;
625 }
626 Ok(())
627 }
628}