1use crate::common::truncate_text;
34use crate::config::Verbosity;
35use crate::logger::{Colors, CHECK, CROSS};
36use std::cell::RefCell;
37use std::fmt::Write as _;
38use std::io::{self, BufRead, Write};
39use std::rc::Rc;
40
41use super::delta_display;
42use super::delta_display::{DeltaRenderer, TextDeltaRenderer};
43use super::health::HealthMonitor;
44#[cfg(feature = "test-utils")]
45use super::health::StreamingQualityMetrics;
46use super::printer::SharedPrinter;
47use super::streaming_state::StreamingSession;
48use super::terminal::TerminalMode;
49use super::types::{format_tool_input, format_unknown_json_event, ContentType, GeminiEvent};
50
51pub struct GeminiParser {
53 colors: Colors,
54 verbosity: Verbosity,
55 log_file: Option<String>,
56 display_name: String,
57 streaming_session: Rc<RefCell<StreamingSession>>,
59 terminal_mode: RefCell<TerminalMode>,
61 show_streaming_metrics: bool,
63 printer: SharedPrinter,
65}
66
67impl GeminiParser {
68 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
69 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
70 }
71
72 pub(crate) fn with_printer(
84 colors: Colors,
85 verbosity: Verbosity,
86 printer: SharedPrinter,
87 ) -> Self {
88 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
89 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
90
91 let _printer_is_terminal = printer.borrow().is_terminal();
93
94 Self {
95 colors,
96 verbosity,
97 log_file: None,
98 display_name: "Gemini".to_string(),
99 streaming_session: Rc::new(RefCell::new(streaming_session)),
100 terminal_mode: RefCell::new(TerminalMode::detect()),
101 show_streaming_metrics: false,
102 printer,
103 }
104 }
105
106 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
107 self.show_streaming_metrics = show;
108 self
109 }
110
111 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
112 self.display_name = display_name.to_string();
113 self
114 }
115
116 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
117 self.log_file = Some(path.to_string());
118 self
119 }
120
121 #[cfg(test)]
122 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
123 *self.terminal_mode.borrow_mut() = mode;
124 self
125 }
126
127 #[cfg(any(test, feature = "test-utils"))]
132 pub fn with_printer_for_test(
133 colors: Colors,
134 verbosity: Verbosity,
135 printer: SharedPrinter,
136 ) -> Self {
137 Self::with_printer(colors, verbosity, printer)
138 }
139
140 #[cfg(any(test, feature = "test-utils"))]
144 pub fn with_log_file_for_test(mut self, path: &str) -> Self {
145 self.log_file = Some(path.to_string());
146 self
147 }
148
149 #[cfg(any(test, feature = "test-utils"))]
153 pub fn parse_stream_for_test<R: std::io::BufRead>(&self, reader: R) -> std::io::Result<()> {
154 self.parse_stream(reader)
155 }
156
157 #[cfg(feature = "test-utils")]
167 pub fn printer(&self) -> SharedPrinter {
168 Rc::clone(&self.printer)
169 }
170
171 #[cfg(feature = "test-utils")]
180 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
181 self.streaming_session
182 .borrow()
183 .get_streaming_quality_metrics()
184 }
185
186 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
193 let event: GeminiEvent = if let Ok(e) = serde_json::from_str(line) {
194 e
195 } else {
196 let trimmed = line.trim();
198 if !trimmed.is_empty() && !trimmed.starts_with('{') {
199 return Some(format!("{trimmed}\n"));
200 }
201 return None;
202 };
203 let c = &self.colors;
204 let prefix = &self.display_name;
205
206 let output = match event {
207 GeminiEvent::Init {
208 session_id, model, ..
209 } => self.format_init_event(session_id, model),
210 GeminiEvent::Message {
211 role,
212 content,
213 delta,
214 } => self.format_message_event(role, content, delta),
215 GeminiEvent::ToolUse {
216 tool_name,
217 parameters,
218 ..
219 } => self.format_tool_use_event(tool_name, parameters.as_ref()),
220 GeminiEvent::ToolResult { status, output, .. } => {
221 self.format_tool_result_event(status, output.as_ref())
222 }
223 GeminiEvent::Error { message, code, .. } => self.format_error_event(message, code),
224 GeminiEvent::Result { status, stats, .. } => self.format_result_event(status, stats),
225 GeminiEvent::Unknown => {
226 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
228 }
229 };
230
231 if output.is_empty() {
232 None
233 } else {
234 Some(output)
235 }
236 }
237
238 fn format_init_event(&self, session_id: Option<String>, model: Option<String>) -> String {
240 let c = &self.colors;
241 let prefix = &self.display_name;
242
243 self.streaming_session.borrow_mut().on_message_start();
245 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
246 self.streaming_session
248 .borrow_mut()
249 .set_current_message_id(Some(sid.clone()));
250 let model_str = model.unwrap_or_else(|| "unknown".to_string());
251 format!(
252 "{}[{}]{} {}Session started{} {}({:.8}..., {}){}\n",
253 c.dim(),
254 prefix,
255 c.reset(),
256 c.cyan(),
257 c.reset(),
258 c.dim(),
259 sid,
260 model_str,
261 c.reset()
262 )
263 }
264
265 fn format_message_event(
267 &self,
268 role: Option<String>,
269 content: Option<String>,
270 delta: Option<bool>,
271 ) -> String {
272 let c = &self.colors;
273 let prefix = &self.display_name;
274
275 let role_str = role.unwrap_or_else(|| "unknown".to_string());
276 let is_delta = delta.unwrap_or(false);
277
278 if let Some(text) = content {
279 if is_delta && role_str == "assistant" {
280 let (show_prefix, accumulated_text, has_prefix) = {
288 let mut session = self.streaming_session.borrow_mut();
289 let show_prefix = session.on_text_delta(0, &text);
290
291 let accumulated_text = session
292 .get_accumulated(ContentType::Text, "0")
293 .unwrap_or("")
294 .to_string();
295
296 let sanitized_text = delta_display::sanitize_for_display(&accumulated_text);
297 if sanitized_text.is_empty() {
298 return String::new();
299 }
300
301 if session.is_content_hash_rendered(ContentType::Text, "0", &sanitized_text) {
302 return String::new();
303 }
304
305 let has_prefix = session.has_rendered_prefix(ContentType::Text, "0");
306
307 session.mark_rendered(ContentType::Text, "0");
308 session.mark_content_hash_rendered(ContentType::Text, "0", &sanitized_text);
309
310 (show_prefix, accumulated_text, has_prefix)
311 };
312
313 let terminal_mode = *self.terminal_mode.borrow();
314 if show_prefix && !has_prefix {
315 return TextDeltaRenderer::render_first_delta(
316 &accumulated_text,
317 prefix,
318 *c,
319 terminal_mode,
320 );
321 }
322
323 return TextDeltaRenderer::render_subsequent_delta(
324 &accumulated_text,
325 prefix,
326 *c,
327 terminal_mode,
328 );
329 } else if !is_delta && role_str == "assistant" {
330 let session = self.streaming_session.borrow();
332 let is_duplicate = session.get_current_message_id().map_or_else(
333 || session.has_any_streamed_content(),
334 |message_id| session.is_duplicate_final_message(message_id),
335 );
336 let was_streaming = session.has_any_streamed_content();
337 let metrics = session.get_streaming_quality_metrics();
338 drop(session);
339
340 let _was_in_block = self.streaming_session.borrow_mut().on_message_stop();
342
343 if is_duplicate || was_streaming {
345 let terminal_mode = *self.terminal_mode.borrow();
346 let completion = TextDeltaRenderer::render_completion(terminal_mode);
347 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
348 && metrics.total_deltas > 0;
349 if show_metrics {
350 return format!("{}\n{}", completion, metrics.format(*c));
351 }
352 return completion;
353 }
354
355 let limit = self.verbosity.truncate_limit("text");
357 let preview = truncate_text(&text, limit);
358
359 return format!(
360 "{}[{}]{} {}{}{}\n",
361 c.dim(),
362 prefix,
363 c.reset(),
364 c.white(),
365 preview,
366 c.reset()
367 );
368 }
369 let limit = self.verbosity.truncate_limit("text");
371 let preview = truncate_text(&text, limit);
372 return format!(
373 "{}[{}]{} {}{}:{} {}{}{}\n",
374 c.dim(),
375 prefix,
376 c.reset(),
377 c.blue(),
378 role_str,
379 c.reset(),
380 c.dim(),
381 preview,
382 c.reset()
383 );
384 }
385 String::new()
386 }
387
388 fn format_tool_use_event(
390 &self,
391 tool_name: Option<String>,
392 parameters: Option<&serde_json::Value>,
393 ) -> String {
394 let c = &self.colors;
395 let prefix = &self.display_name;
396
397 let tool_name = tool_name.unwrap_or_else(|| "unknown".to_string());
398 let mut out = format!(
399 "{}[{}]{} {}Tool{}: {}{}{}\n",
400 c.dim(),
401 prefix,
402 c.reset(),
403 c.magenta(),
404 c.reset(),
405 c.bold(),
406 tool_name,
407 c.reset()
408 );
409 if self.verbosity.show_tool_input() {
410 if let Some(params) = parameters {
411 let params_str = format_tool_input(params);
412 let limit = self.verbosity.truncate_limit("tool_input");
413 let preview = truncate_text(¶ms_str, limit);
414 if !preview.is_empty() {
415 let _ = writeln!(
416 out,
417 "{}[{}]{} {} └─ {}{}",
418 c.dim(),
419 prefix,
420 c.reset(),
421 c.dim(),
422 preview,
423 c.reset()
424 );
425 }
426 }
427 }
428 out
429 }
430
431 fn format_tool_result_event(&self, status: Option<String>, output: Option<&String>) -> String {
433 let c = &self.colors;
434 let prefix = &self.display_name;
435
436 let status_str = status.unwrap_or_else(|| "unknown".to_string());
437 let is_success = status_str == "success";
438 let icon = if is_success { CHECK } else { CROSS };
439 let color = if is_success { c.green() } else { c.red() };
440
441 let mut out = format!(
442 "{}[{}]{} {}{} Tool result{}\n",
443 c.dim(),
444 prefix,
445 c.reset(),
446 color,
447 icon,
448 c.reset()
449 );
450
451 if self.verbosity.is_verbose() {
452 if let Some(output_text) = output {
453 let limit = self.verbosity.truncate_limit("tool_result");
454 let preview = truncate_text(output_text, limit);
455 let _ = writeln!(
456 out,
457 "{}[{}]{} {} └─ {}{}",
458 c.dim(),
459 prefix,
460 c.reset(),
461 c.dim(),
462 preview,
463 c.reset()
464 );
465 }
466 }
467 out
468 }
469
470 fn format_error_event(&self, message: Option<String>, code: Option<String>) -> String {
472 let c = &self.colors;
473 let prefix = &self.display_name;
474
475 let msg = message.unwrap_or_else(|| "unknown error".to_string());
476 let code_str = code.map_or_else(String::new, |c| format!(" ({c})"));
477 format!(
478 "{}[{}]{} {}{} Error{}:{} {}\n",
479 c.dim(),
480 prefix,
481 c.reset(),
482 c.red(),
483 CROSS,
484 code_str,
485 c.reset(),
486 msg
487 )
488 }
489
490 fn format_result_event(
492 &self,
493 status: Option<String>,
494 event_stats: Option<crate::json_parser::types::GeminiStats>,
495 ) -> String {
496 let c = &self.colors;
497 let prefix = &self.display_name;
498
499 let status_result = status.unwrap_or_else(|| "unknown".to_string());
500 let is_success = status_result == "success";
501 let icon = if is_success { CHECK } else { CROSS };
502 let color = if is_success { c.green() } else { c.red() };
503
504 let stats_display = event_stats.map_or_else(String::new, |s| {
505 let duration_s = s.duration_ms.unwrap_or(0) / 1000;
506 let duration_m = duration_s / 60;
507 let duration_s_rem = duration_s % 60;
508 let input = s.input_tokens.unwrap_or(0);
509 let output = s.output_tokens.unwrap_or(0);
510 let tools = s.tool_calls.unwrap_or(0);
511 format!("({duration_m}m {duration_s_rem}s, in:{input} out:{output}, {tools} tools)")
512 });
513
514 format!(
515 "{}[{}]{} {}{} {}{} {}{}{}\n",
516 c.dim(),
517 prefix,
518 c.reset(),
519 color,
520 icon,
521 status_result,
522 c.reset(),
523 c.dim(),
524 stats_display,
525 c.reset()
526 )
527 }
528
529 const fn is_control_event(event: &GeminiEvent) -> bool {
535 match event {
536 GeminiEvent::Init { .. } | GeminiEvent::Result { .. } => true,
538 _ => false,
539 }
540 }
541
542 pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
544 use super::incremental_parser::IncrementalNdjsonParser;
545
546 let c = &self.colors;
547 let monitor = HealthMonitor::new("Gemini");
548 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
549 std::fs::OpenOptions::new()
550 .create(true)
551 .append(true)
552 .open(log_path)
553 .ok()
554 .map(std::io::BufWriter::new)
555 });
556
557 let mut incremental_parser = IncrementalNdjsonParser::new();
560 let mut byte_buffer = Vec::new();
561
562 loop {
563 byte_buffer.clear();
565 let chunk = reader.fill_buf()?;
566 if chunk.is_empty() {
567 break;
568 }
569
570 byte_buffer.extend_from_slice(chunk);
572 let consumed = chunk.len();
573 reader.consume(consumed);
574
575 let json_events = incremental_parser.feed(&byte_buffer);
577
578 for line in json_events {
580 let trimmed = line.trim();
581 if trimmed.is_empty() {
582 continue;
583 }
584
585 if self.verbosity.is_debug() {
587 let mut printer = self.printer.borrow_mut();
588 writeln!(
589 printer,
590 "{}[DEBUG]{} {}{}{}",
591 c.dim(),
592 c.reset(),
593 c.dim(),
594 &line,
595 c.reset()
596 )?;
597 printer.flush()?;
598 }
599
600 match self.parse_event(&line) {
602 Some(output) => {
603 monitor.record_parsed();
604 let mut printer = self.printer.borrow_mut();
606 write!(printer, "{output}")?;
607 printer.flush()?;
608 }
609 None => {
610 if trimmed.starts_with('{') {
612 if let Ok(event) = serde_json::from_str::<GeminiEvent>(&line) {
613 if Self::is_control_event(&event) {
614 monitor.record_control_event();
615 } else {
616 monitor.record_unknown_event();
618 }
619 } else {
620 monitor.record_parse_error();
622 }
623 } else {
624 monitor.record_ignored();
625 }
626 }
627 }
628
629 if let Some(ref mut file) = log_writer {
631 writeln!(file, "{line}")?;
632 }
633 }
634 }
635
636 if let Some(ref mut file) = log_writer {
637 file.flush()?;
638 let _ = file.get_mut().sync_all();
641 }
642 if let Some(warning) = monitor.check_and_warn(*c) {
643 let mut printer = self.printer.borrow_mut();
644 writeln!(printer, "{warning}\n")?;
645 }
646 Ok(())
647 }
648}