1use crate::common::truncate_text;
34use crate::config::Verbosity;
35use crate::logger::{Colors, CHECK, CROSS};
36use std::cell::RefCell;
37use std::fmt::Write as _;
38use std::io::{self, BufRead, Write};
39use std::rc::Rc;
40
41use super::delta_display;
42use super::delta_display::{DeltaRenderer, TextDeltaRenderer};
43use super::health::HealthMonitor;
44#[cfg(feature = "test-utils")]
45use super::health::StreamingQualityMetrics;
46use super::printer::SharedPrinter;
47use super::streaming_state::StreamingSession;
48use super::terminal::TerminalMode;
49use super::types::{format_tool_input, format_unknown_json_event, ContentType, GeminiEvent};
50
51pub struct GeminiParser {
53 colors: Colors,
54 verbosity: Verbosity,
55 log_path: Option<std::path::PathBuf>,
57 display_name: String,
58 streaming_session: Rc<RefCell<StreamingSession>>,
60 terminal_mode: RefCell<TerminalMode>,
62 show_streaming_metrics: bool,
64 printer: SharedPrinter,
66}
67
68impl GeminiParser {
69 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
70 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
71 }
72
73 pub(crate) fn with_printer(
85 colors: Colors,
86 verbosity: Verbosity,
87 printer: SharedPrinter,
88 ) -> Self {
89 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
90 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
91
92 let _printer_is_terminal = printer.borrow().is_terminal();
94
95 Self {
96 colors,
97 verbosity,
98 log_path: None,
99 display_name: "Gemini".to_string(),
100 streaming_session: Rc::new(RefCell::new(streaming_session)),
101 terminal_mode: RefCell::new(TerminalMode::detect()),
102 show_streaming_metrics: false,
103 printer,
104 }
105 }
106
107 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
108 self.show_streaming_metrics = show;
109 self
110 }
111
112 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
113 self.display_name = display_name.to_string();
114 self
115 }
116
117 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
118 self.log_path = Some(std::path::PathBuf::from(path));
119 self
120 }
121
122 #[cfg(test)]
123 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
124 *self.terminal_mode.borrow_mut() = mode;
125 self
126 }
127
128 #[cfg(any(test, feature = "test-utils"))]
133 pub fn with_printer_for_test(
134 colors: Colors,
135 verbosity: Verbosity,
136 printer: SharedPrinter,
137 ) -> Self {
138 Self::with_printer(colors, verbosity, printer)
139 }
140
141 #[cfg(any(test, feature = "test-utils"))]
145 pub fn with_log_file_for_test(mut self, path: &str) -> Self {
146 self.log_path = Some(std::path::PathBuf::from(path));
147 self
148 }
149
150 #[cfg(any(test, feature = "test-utils"))]
154 pub fn parse_stream_for_test<R: std::io::BufRead>(
155 &self,
156 reader: R,
157 workspace: &dyn crate::workspace::Workspace,
158 ) -> std::io::Result<()> {
159 self.parse_stream(reader, workspace)
160 }
161
162 #[cfg(feature = "test-utils")]
172 pub fn printer(&self) -> SharedPrinter {
173 Rc::clone(&self.printer)
174 }
175
176 #[cfg(feature = "test-utils")]
185 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
186 self.streaming_session
187 .borrow()
188 .get_streaming_quality_metrics()
189 }
190
191 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
198 let event: GeminiEvent = if let Ok(e) = serde_json::from_str(line) {
199 e
200 } else {
201 let trimmed = line.trim();
203 if !trimmed.is_empty() && !trimmed.starts_with('{') {
204 return Some(format!("{trimmed}\n"));
205 }
206 return None;
207 };
208 let c = &self.colors;
209 let prefix = &self.display_name;
210
211 let output = match event {
212 GeminiEvent::Init {
213 session_id, model, ..
214 } => self.format_init_event(session_id, model),
215 GeminiEvent::Message {
216 role,
217 content,
218 delta,
219 } => self.format_message_event(role, content, delta),
220 GeminiEvent::ToolUse {
221 tool_name,
222 parameters,
223 ..
224 } => self.format_tool_use_event(tool_name, parameters.as_ref()),
225 GeminiEvent::ToolResult { status, output, .. } => {
226 self.format_tool_result_event(status, output.as_ref())
227 }
228 GeminiEvent::Error { message, code, .. } => self.format_error_event(message, code),
229 GeminiEvent::Result { status, stats, .. } => self.format_result_event(status, stats),
230 GeminiEvent::Unknown => {
231 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
233 }
234 };
235
236 if output.is_empty() {
237 None
238 } else {
239 Some(output)
240 }
241 }
242
243 fn format_init_event(&self, session_id: Option<String>, model: Option<String>) -> String {
245 let c = &self.colors;
246 let prefix = &self.display_name;
247
248 self.streaming_session.borrow_mut().on_message_start();
250 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
251 self.streaming_session
253 .borrow_mut()
254 .set_current_message_id(Some(sid.clone()));
255 let model_str = model.unwrap_or_else(|| "unknown".to_string());
256 format!(
257 "{}[{}]{} {}Session started{} {}({:.8}..., {}){}\n",
258 c.dim(),
259 prefix,
260 c.reset(),
261 c.cyan(),
262 c.reset(),
263 c.dim(),
264 sid,
265 model_str,
266 c.reset()
267 )
268 }
269
270 fn format_message_event(
272 &self,
273 role: Option<String>,
274 content: Option<String>,
275 delta: Option<bool>,
276 ) -> String {
277 let c = &self.colors;
278 let prefix = &self.display_name;
279
280 let role_str = role.unwrap_or_else(|| "unknown".to_string());
281 let is_delta = delta.unwrap_or(false);
282
283 if let Some(text) = content {
284 if is_delta && role_str == "assistant" {
285 let (show_prefix, accumulated_text, has_prefix) = {
293 let mut session = self.streaming_session.borrow_mut();
294 let show_prefix = session.on_text_delta(0, &text);
295
296 let accumulated_text = session
297 .get_accumulated(ContentType::Text, "0")
298 .unwrap_or("")
299 .to_string();
300
301 let sanitized_text = delta_display::sanitize_for_display(&accumulated_text);
302 if sanitized_text.is_empty() {
303 return String::new();
304 }
305
306 if session.is_content_hash_rendered(ContentType::Text, "0", &sanitized_text) {
307 return String::new();
308 }
309
310 let has_prefix = session.has_rendered_prefix(ContentType::Text, "0");
311
312 session.mark_rendered(ContentType::Text, "0");
313 session.mark_content_hash_rendered(ContentType::Text, "0", &sanitized_text);
314
315 (show_prefix, accumulated_text, has_prefix)
316 };
317
318 let terminal_mode = *self.terminal_mode.borrow();
319 if show_prefix && !has_prefix {
320 return TextDeltaRenderer::render_first_delta(
321 &accumulated_text,
322 prefix,
323 *c,
324 terminal_mode,
325 );
326 }
327
328 return TextDeltaRenderer::render_subsequent_delta(
329 &accumulated_text,
330 prefix,
331 *c,
332 terminal_mode,
333 );
334 } else if !is_delta && role_str == "assistant" {
335 let session = self.streaming_session.borrow();
337 let is_duplicate = session.get_current_message_id().map_or_else(
338 || session.has_any_streamed_content(),
339 |message_id| session.is_duplicate_final_message(message_id),
340 );
341 let was_streaming = session.has_any_streamed_content();
342 let metrics = session.get_streaming_quality_metrics();
343 drop(session);
344
345 let _was_in_block = self.streaming_session.borrow_mut().on_message_stop();
347
348 if is_duplicate || was_streaming {
350 let terminal_mode = *self.terminal_mode.borrow();
351 let completion = TextDeltaRenderer::render_completion(terminal_mode);
352 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
353 && metrics.total_deltas > 0;
354 if show_metrics {
355 return format!("{}\n{}", completion, metrics.format(*c));
356 }
357 return completion;
358 }
359
360 let limit = self.verbosity.truncate_limit("text");
362 let preview = truncate_text(&text, limit);
363
364 return format!(
365 "{}[{}]{} {}{}{}\n",
366 c.dim(),
367 prefix,
368 c.reset(),
369 c.white(),
370 preview,
371 c.reset()
372 );
373 }
374 let limit = self.verbosity.truncate_limit("text");
376 let preview = truncate_text(&text, limit);
377 return format!(
378 "{}[{}]{} {}{}:{} {}{}{}\n",
379 c.dim(),
380 prefix,
381 c.reset(),
382 c.blue(),
383 role_str,
384 c.reset(),
385 c.dim(),
386 preview,
387 c.reset()
388 );
389 }
390 String::new()
391 }
392
393 fn format_tool_use_event(
395 &self,
396 tool_name: Option<String>,
397 parameters: Option<&serde_json::Value>,
398 ) -> String {
399 let c = &self.colors;
400 let prefix = &self.display_name;
401
402 let tool_name = tool_name.unwrap_or_else(|| "unknown".to_string());
403 let mut out = format!(
404 "{}[{}]{} {}Tool{}: {}{}{}\n",
405 c.dim(),
406 prefix,
407 c.reset(),
408 c.magenta(),
409 c.reset(),
410 c.bold(),
411 tool_name,
412 c.reset()
413 );
414 if self.verbosity.show_tool_input() {
415 if let Some(params) = parameters {
416 let params_str = format_tool_input(params);
417 let limit = self.verbosity.truncate_limit("tool_input");
418 let preview = truncate_text(¶ms_str, limit);
419 if !preview.is_empty() {
420 let _ = writeln!(
421 out,
422 "{}[{}]{} {} └─ {}{}",
423 c.dim(),
424 prefix,
425 c.reset(),
426 c.dim(),
427 preview,
428 c.reset()
429 );
430 }
431 }
432 }
433 out
434 }
435
436 fn format_tool_result_event(&self, status: Option<String>, output: Option<&String>) -> String {
438 let c = &self.colors;
439 let prefix = &self.display_name;
440
441 let status_str = status.unwrap_or_else(|| "unknown".to_string());
442 let is_success = status_str == "success";
443 let icon = if is_success { CHECK } else { CROSS };
444 let color = if is_success { c.green() } else { c.red() };
445
446 let mut out = format!(
447 "{}[{}]{} {}{} Tool result{}\n",
448 c.dim(),
449 prefix,
450 c.reset(),
451 color,
452 icon,
453 c.reset()
454 );
455
456 if self.verbosity.is_verbose() {
457 if let Some(output_text) = output {
458 let limit = self.verbosity.truncate_limit("tool_result");
459 let preview = truncate_text(output_text, limit);
460 let _ = writeln!(
461 out,
462 "{}[{}]{} {} └─ {}{}",
463 c.dim(),
464 prefix,
465 c.reset(),
466 c.dim(),
467 preview,
468 c.reset()
469 );
470 }
471 }
472 out
473 }
474
475 fn format_error_event(&self, message: Option<String>, code: Option<String>) -> String {
477 let c = &self.colors;
478 let prefix = &self.display_name;
479
480 let msg = message.unwrap_or_else(|| "unknown error".to_string());
481 let code_str = code.map_or_else(String::new, |c| format!(" ({c})"));
482 format!(
483 "{}[{}]{} {}{} Error{}:{} {}\n",
484 c.dim(),
485 prefix,
486 c.reset(),
487 c.red(),
488 CROSS,
489 code_str,
490 c.reset(),
491 msg
492 )
493 }
494
495 fn format_result_event(
497 &self,
498 status: Option<String>,
499 event_stats: Option<crate::json_parser::types::GeminiStats>,
500 ) -> String {
501 let c = &self.colors;
502 let prefix = &self.display_name;
503
504 let status_result = status.unwrap_or_else(|| "unknown".to_string());
505 let is_success = status_result == "success";
506 let icon = if is_success { CHECK } else { CROSS };
507 let color = if is_success { c.green() } else { c.red() };
508
509 let stats_display = event_stats.map_or_else(String::new, |s| {
510 let duration_s = s.duration_ms.unwrap_or(0) / 1000;
511 let duration_m = duration_s / 60;
512 let duration_s_rem = duration_s % 60;
513 let input = s.input_tokens.unwrap_or(0);
514 let output = s.output_tokens.unwrap_or(0);
515 let tools = s.tool_calls.unwrap_or(0);
516 format!("({duration_m}m {duration_s_rem}s, in:{input} out:{output}, {tools} tools)")
517 });
518
519 format!(
520 "{}[{}]{} {}{} {}{} {}{}{}\n",
521 c.dim(),
522 prefix,
523 c.reset(),
524 color,
525 icon,
526 status_result,
527 c.reset(),
528 c.dim(),
529 stats_display,
530 c.reset()
531 )
532 }
533
534 const fn is_control_event(event: &GeminiEvent) -> bool {
540 match event {
541 GeminiEvent::Init { .. } | GeminiEvent::Result { .. } => true,
543 _ => false,
544 }
545 }
546
547 pub(crate) fn parse_stream<R: BufRead>(
549 &self,
550 mut reader: R,
551 workspace: &dyn crate::workspace::Workspace,
552 ) -> io::Result<()> {
553 use super::incremental_parser::IncrementalNdjsonParser;
554
555 let c = &self.colors;
556 let monitor = HealthMonitor::new("Gemini");
557 let logging_enabled = self.log_path.is_some();
559 let mut log_buffer: Vec<u8> = Vec::new();
560
561 let mut incremental_parser = IncrementalNdjsonParser::new();
564 let mut byte_buffer = Vec::new();
565
566 loop {
567 byte_buffer.clear();
569 let chunk = reader.fill_buf()?;
570 if chunk.is_empty() {
571 break;
572 }
573
574 byte_buffer.extend_from_slice(chunk);
576 let consumed = chunk.len();
577 reader.consume(consumed);
578
579 let json_events = incremental_parser.feed(&byte_buffer);
581
582 for line in json_events {
584 let trimmed = line.trim();
585 if trimmed.is_empty() {
586 continue;
587 }
588
589 if self.verbosity.is_debug() {
591 let mut printer = self.printer.borrow_mut();
592 writeln!(
593 printer,
594 "{}[DEBUG]{} {}{}{}",
595 c.dim(),
596 c.reset(),
597 c.dim(),
598 &line,
599 c.reset()
600 )?;
601 printer.flush()?;
602 }
603
604 match self.parse_event(&line) {
606 Some(output) => {
607 monitor.record_parsed();
608 let mut printer = self.printer.borrow_mut();
610 write!(printer, "{output}")?;
611 printer.flush()?;
612 }
613 None => {
614 if trimmed.starts_with('{') {
616 if let Ok(event) = serde_json::from_str::<GeminiEvent>(&line) {
617 if Self::is_control_event(&event) {
618 monitor.record_control_event();
619 } else {
620 monitor.record_unknown_event();
622 }
623 } else {
624 monitor.record_parse_error();
626 }
627 } else {
628 monitor.record_ignored();
629 }
630 }
631 }
632
633 if logging_enabled {
635 writeln!(log_buffer, "{line}")?;
636 }
637 }
638 }
639
640 if let Some(log_path) = &self.log_path {
642 workspace.append_bytes(log_path, &log_buffer)?;
643 }
644 if let Some(warning) = monitor.check_and_warn(*c) {
645 let mut printer = self.printer.borrow_mut();
646 writeln!(printer, "{warning}\n")?;
647 }
648 Ok(())
649 }
650}