1use crate::common::truncate_text;
34use crate::config::Verbosity;
35use crate::logger::{Colors, CHECK, CROSS};
36use std::cell::RefCell;
37use std::fmt::Write as _;
38use std::io::{self, BufRead, Write};
39use std::rc::Rc;
40
41use super::delta_display::{DeltaRenderer, TextDeltaRenderer};
42use super::health::HealthMonitor;
43use super::health::StreamingQualityMetrics;
44use super::printer::SharedPrinter;
45use super::streaming_state::StreamingSession;
46use super::terminal::TerminalMode;
47use super::types::{format_tool_input, format_unknown_json_event, ContentType, GeminiEvent};
48
49pub struct GeminiParser {
51 colors: Colors,
52 verbosity: Verbosity,
53 log_file: Option<String>,
54 display_name: String,
55 streaming_session: Rc<RefCell<StreamingSession>>,
57 terminal_mode: RefCell<TerminalMode>,
59 show_streaming_metrics: bool,
61 printer: SharedPrinter,
63}
64
65impl GeminiParser {
66 pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
67 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
68 }
69
70 pub(crate) fn with_printer(
82 colors: Colors,
83 verbosity: Verbosity,
84 printer: SharedPrinter,
85 ) -> Self {
86 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
87 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
88
89 let _printer_is_terminal = printer.borrow().is_terminal();
91
92 Self {
93 colors,
94 verbosity,
95 log_file: None,
96 display_name: "Gemini".to_string(),
97 streaming_session: Rc::new(RefCell::new(streaming_session)),
98 terminal_mode: RefCell::new(TerminalMode::detect()),
99 show_streaming_metrics: false,
100 printer,
101 }
102 }
103
104 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
105 self.show_streaming_metrics = show;
106 self
107 }
108
109 pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
110 self.display_name = display_name.to_string();
111 self
112 }
113
114 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
115 self.log_file = Some(path.to_string());
116 self
117 }
118
119 #[cfg(test)]
120 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
121 *self.terminal_mode.borrow_mut() = mode;
122 self
123 }
124
125 #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
134 pub fn printer(&self) -> SharedPrinter {
135 Rc::clone(&self.printer)
136 }
137
138 #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
147 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
148 self.streaming_session
149 .borrow()
150 .get_streaming_quality_metrics()
151 }
152
153 pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
160 let event: GeminiEvent = if let Ok(e) = serde_json::from_str(line) {
161 e
162 } else {
163 let trimmed = line.trim();
165 if !trimmed.is_empty() && !trimmed.starts_with('{') {
166 return Some(format!("{trimmed}\n"));
167 }
168 return None;
169 };
170 let c = &self.colors;
171 let prefix = &self.display_name;
172
173 let output = match event {
174 GeminiEvent::Init {
175 session_id, model, ..
176 } => self.format_init_event(session_id, model),
177 GeminiEvent::Message {
178 role,
179 content,
180 delta,
181 } => self.format_message_event(role, content, delta),
182 GeminiEvent::ToolUse {
183 tool_name,
184 parameters,
185 ..
186 } => self.format_tool_use_event(tool_name, parameters.as_ref()),
187 GeminiEvent::ToolResult { status, output, .. } => {
188 self.format_tool_result_event(status, output.as_ref())
189 }
190 GeminiEvent::Error { message, code, .. } => self.format_error_event(message, code),
191 GeminiEvent::Result { status, stats, .. } => self.format_result_event(status, stats),
192 GeminiEvent::Unknown => {
193 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
195 }
196 };
197
198 if output.is_empty() {
199 None
200 } else {
201 Some(output)
202 }
203 }
204
205 fn format_init_event(&self, session_id: Option<String>, model: Option<String>) -> String {
207 let c = &self.colors;
208 let prefix = &self.display_name;
209
210 self.streaming_session.borrow_mut().on_message_start();
212 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
213 self.streaming_session
215 .borrow_mut()
216 .set_current_message_id(Some(sid.clone()));
217 let model_str = model.unwrap_or_else(|| "unknown".to_string());
218 format!(
219 "{}[{}]{} {}Session started{} {}({:.8}..., {}){}\n",
220 c.dim(),
221 prefix,
222 c.reset(),
223 c.cyan(),
224 c.reset(),
225 c.dim(),
226 sid,
227 model_str,
228 c.reset()
229 )
230 }
231
232 fn format_message_event(
234 &self,
235 role: Option<String>,
236 content: Option<String>,
237 delta: Option<bool>,
238 ) -> String {
239 let c = &self.colors;
240 let prefix = &self.display_name;
241
242 let role_str = role.unwrap_or_else(|| "unknown".to_string());
243 let is_delta = delta.unwrap_or(false);
244
245 if let Some(text) = content {
246 if is_delta && role_str == "assistant" {
247 let (show_prefix, accumulated_text) = {
249 let mut session = self.streaming_session.borrow_mut();
250 let show_prefix = session.on_text_delta_key("main", &text);
251 let accumulated_text = session
253 .get_accumulated(ContentType::Text, "main")
254 .unwrap_or("")
255 .to_string();
256 (show_prefix, accumulated_text)
257 };
258
259 let terminal_mode = *self.terminal_mode.borrow();
261 if show_prefix {
262 return TextDeltaRenderer::render_first_delta(
264 &accumulated_text,
265 prefix,
266 *c,
267 terminal_mode,
268 );
269 }
270 return TextDeltaRenderer::render_subsequent_delta(
272 &accumulated_text,
273 prefix,
274 *c,
275 terminal_mode,
276 );
277 } else if !is_delta && role_str == "assistant" {
278 let session = self.streaming_session.borrow();
280 let is_duplicate = session.get_current_message_id().map_or_else(
281 || session.has_any_streamed_content(),
282 |message_id| session.is_duplicate_final_message(message_id),
283 );
284 let was_streaming = session.has_any_streamed_content();
285 let metrics = session.get_streaming_quality_metrics();
286 drop(session);
287
288 let _was_in_block = self.streaming_session.borrow_mut().on_message_stop();
290
291 if is_duplicate || was_streaming {
293 let terminal_mode = *self.terminal_mode.borrow();
294 let completion = TextDeltaRenderer::render_completion(terminal_mode);
295 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
296 && metrics.total_deltas > 0;
297 if show_metrics {
298 return format!("{}\n{}", completion, metrics.format(*c));
299 }
300 return completion;
301 }
302
303 let limit = self.verbosity.truncate_limit("text");
305 let preview = truncate_text(&text, limit);
306
307 return format!(
308 "{}[{}]{} {}{}{}\n",
309 c.dim(),
310 prefix,
311 c.reset(),
312 c.white(),
313 preview,
314 c.reset()
315 );
316 }
317 let limit = self.verbosity.truncate_limit("text");
319 let preview = truncate_text(&text, limit);
320 return format!(
321 "{}[{}]{} {}{}:{} {}{}{}\n",
322 c.dim(),
323 prefix,
324 c.reset(),
325 c.blue(),
326 role_str,
327 c.reset(),
328 c.dim(),
329 preview,
330 c.reset()
331 );
332 }
333 String::new()
334 }
335
336 fn format_tool_use_event(
338 &self,
339 tool_name: Option<String>,
340 parameters: Option<&serde_json::Value>,
341 ) -> String {
342 let c = &self.colors;
343 let prefix = &self.display_name;
344
345 let tool_name = tool_name.unwrap_or_else(|| "unknown".to_string());
346 let mut out = format!(
347 "{}[{}]{} {}Tool{}: {}{}{}\n",
348 c.dim(),
349 prefix,
350 c.reset(),
351 c.magenta(),
352 c.reset(),
353 c.bold(),
354 tool_name,
355 c.reset()
356 );
357 if self.verbosity.show_tool_input() {
358 if let Some(params) = parameters {
359 let params_str = format_tool_input(params);
360 let limit = self.verbosity.truncate_limit("tool_input");
361 let preview = truncate_text(¶ms_str, limit);
362 if !preview.is_empty() {
363 let _ = writeln!(
364 out,
365 "{}[{}]{} {} └─ {}{}",
366 c.dim(),
367 prefix,
368 c.reset(),
369 c.dim(),
370 preview,
371 c.reset()
372 );
373 }
374 }
375 }
376 out
377 }
378
379 fn format_tool_result_event(&self, status: Option<String>, output: Option<&String>) -> String {
381 let c = &self.colors;
382 let prefix = &self.display_name;
383
384 let status_str = status.unwrap_or_else(|| "unknown".to_string());
385 let is_success = status_str == "success";
386 let icon = if is_success { CHECK } else { CROSS };
387 let color = if is_success { c.green() } else { c.red() };
388
389 let mut out = format!(
390 "{}[{}]{} {}{} Tool result{}\n",
391 c.dim(),
392 prefix,
393 c.reset(),
394 color,
395 icon,
396 c.reset()
397 );
398
399 if self.verbosity.is_verbose() {
400 if let Some(output_text) = output {
401 let limit = self.verbosity.truncate_limit("tool_result");
402 let preview = truncate_text(output_text, limit);
403 let _ = writeln!(
404 out,
405 "{}[{}]{} {} └─ {}{}",
406 c.dim(),
407 prefix,
408 c.reset(),
409 c.dim(),
410 preview,
411 c.reset()
412 );
413 }
414 }
415 out
416 }
417
418 fn format_error_event(&self, message: Option<String>, code: Option<String>) -> String {
420 let c = &self.colors;
421 let prefix = &self.display_name;
422
423 let msg = message.unwrap_or_else(|| "unknown error".to_string());
424 let code_str = code.map_or_else(String::new, |c| format!(" ({c})"));
425 format!(
426 "{}[{}]{} {}{} Error{}:{} {}\n",
427 c.dim(),
428 prefix,
429 c.reset(),
430 c.red(),
431 CROSS,
432 code_str,
433 c.reset(),
434 msg
435 )
436 }
437
438 fn format_result_event(
440 &self,
441 status: Option<String>,
442 event_stats: Option<crate::json_parser::types::GeminiStats>,
443 ) -> String {
444 let c = &self.colors;
445 let prefix = &self.display_name;
446
447 let status_result = status.unwrap_or_else(|| "unknown".to_string());
448 let is_success = status_result == "success";
449 let icon = if is_success { CHECK } else { CROSS };
450 let color = if is_success { c.green() } else { c.red() };
451
452 let stats_display = event_stats.map_or_else(String::new, |s| {
453 let duration_s = s.duration_ms.unwrap_or(0) / 1000;
454 let duration_m = duration_s / 60;
455 let duration_s_rem = duration_s % 60;
456 let input = s.input_tokens.unwrap_or(0);
457 let output = s.output_tokens.unwrap_or(0);
458 let tools = s.tool_calls.unwrap_or(0);
459 format!("({duration_m}m {duration_s_rem}s, in:{input} out:{output}, {tools} tools)")
460 });
461
462 format!(
463 "{}[{}]{} {}{} {}{} {}{}{}\n",
464 c.dim(),
465 prefix,
466 c.reset(),
467 color,
468 icon,
469 status_result,
470 c.reset(),
471 c.dim(),
472 stats_display,
473 c.reset()
474 )
475 }
476
477 const fn is_control_event(event: &GeminiEvent) -> bool {
483 match event {
484 GeminiEvent::Init { .. } | GeminiEvent::Result { .. } => true,
486 _ => false,
487 }
488 }
489
490 pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
492 use super::incremental_parser::IncrementalNdjsonParser;
493
494 let c = &self.colors;
495 let monitor = HealthMonitor::new("Gemini");
496 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
497 std::fs::OpenOptions::new()
498 .create(true)
499 .append(true)
500 .open(log_path)
501 .ok()
502 .map(std::io::BufWriter::new)
503 });
504
505 let mut incremental_parser = IncrementalNdjsonParser::new();
508 let mut byte_buffer = Vec::new();
509
510 loop {
511 byte_buffer.clear();
513 let chunk = reader.fill_buf()?;
514 if chunk.is_empty() {
515 break;
516 }
517
518 byte_buffer.extend_from_slice(chunk);
520 let consumed = chunk.len();
521 reader.consume(consumed);
522
523 let json_events = incremental_parser.feed(&byte_buffer);
525
526 for line in json_events {
528 let trimmed = line.trim();
529 if trimmed.is_empty() {
530 continue;
531 }
532
533 if self.verbosity.is_debug() {
535 let mut printer = self.printer.borrow_mut();
536 writeln!(
537 printer,
538 "{}[DEBUG]{} {}{}{}",
539 c.dim(),
540 c.reset(),
541 c.dim(),
542 &line,
543 c.reset()
544 )?;
545 printer.flush()?;
546 }
547
548 match self.parse_event(&line) {
550 Some(output) => {
551 monitor.record_parsed();
552 let mut printer = self.printer.borrow_mut();
554 write!(printer, "{output}")?;
555 printer.flush()?;
556 }
557 None => {
558 if trimmed.starts_with('{') {
560 if let Ok(event) = serde_json::from_str::<GeminiEvent>(&line) {
561 if Self::is_control_event(&event) {
562 monitor.record_control_event();
563 } else {
564 monitor.record_unknown_event();
566 }
567 } else {
568 monitor.record_parse_error();
570 }
571 } else {
572 monitor.record_ignored();
573 }
574 }
575 }
576
577 if let Some(ref mut file) = log_writer {
579 writeln!(file, "{line}")?;
580 }
581 }
582 }
583
584 if let Some(ref mut file) = log_writer {
585 file.flush()?;
586 }
587 if let Some(warning) = monitor.check_and_warn(*c) {
588 let mut printer = self.printer.borrow_mut();
589 writeln!(printer, "{warning}\n")?;
590 }
591 Ok(())
592 }
593}