ralph_workflow/json_parser/
gemini.rs

1//! Gemini CLI JSON parser.
2//!
3//! Parses NDJSON output from Gemini CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `message` events with `delta: true`), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Gemini] Hello\r         (first delta with prefix, no newline)
19//! \x1b[2K\r[Gemini] Hello World\r  (second delta clears line, rewrites with accumulated)
20//! [Gemini] Hello World\n   (final non-delta message shows complete result)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32
33use crate::common::truncate_text;
34use crate::config::Verbosity;
35use crate::logger::{Colors, CHECK, CROSS};
36use std::cell::RefCell;
37use std::fmt::Write as _;
38use std::io::{self, BufRead, Write};
39use std::rc::Rc;
40
41use super::delta_display::{DeltaRenderer, TextDeltaRenderer};
42use super::health::HealthMonitor;
43#[cfg(feature = "test-utils")]
44use super::health::StreamingQualityMetrics;
45use super::printer::SharedPrinter;
46use super::streaming_state::StreamingSession;
47use super::terminal::TerminalMode;
48use super::types::{format_tool_input, format_unknown_json_event, ContentType, GeminiEvent};
49
50/// Gemini event parser
51pub struct GeminiParser {
52    colors: Colors,
53    verbosity: Verbosity,
54    log_file: Option<String>,
55    display_name: String,
56    /// Unified streaming session for state tracking
57    streaming_session: Rc<RefCell<StreamingSession>>,
58    /// Terminal mode for output formatting
59    terminal_mode: RefCell<TerminalMode>,
60    /// Whether to show streaming quality metrics
61    show_streaming_metrics: bool,
62    /// Output printer for capturing or displaying output
63    printer: SharedPrinter,
64}
65
66impl GeminiParser {
67    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
68        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
69    }
70
71    /// Create a new `GeminiParser` with a custom printer.
72    ///
73    /// # Arguments
74    ///
75    /// * `colors` - Colors for terminal output
76    /// * `verbosity` - Verbosity level for output
77    /// * `printer` - Shared printer for output
78    ///
79    /// # Returns
80    ///
81    /// A new `GeminiParser` instance
82    pub(crate) fn with_printer(
83        colors: Colors,
84        verbosity: Verbosity,
85        printer: SharedPrinter,
86    ) -> Self {
87        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
88        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
89
90        // Use the printer's is_terminal method to validate it's connected correctly
91        let _printer_is_terminal = printer.borrow().is_terminal();
92
93        Self {
94            colors,
95            verbosity,
96            log_file: None,
97            display_name: "Gemini".to_string(),
98            streaming_session: Rc::new(RefCell::new(streaming_session)),
99            terminal_mode: RefCell::new(TerminalMode::detect()),
100            show_streaming_metrics: false,
101            printer,
102        }
103    }
104
105    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
106        self.show_streaming_metrics = show;
107        self
108    }
109
110    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
111        self.display_name = display_name.to_string();
112        self
113    }
114
115    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
116        self.log_file = Some(path.to_string());
117        self
118    }
119
120    #[cfg(test)]
121    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
122        *self.terminal_mode.borrow_mut() = mode;
123        self
124    }
125
126    /// Create a new parser with a test printer.
127    ///
128    /// This is the primary entry point for integration tests that need
129    /// to capture parser output for verification.
130    #[cfg(any(test, feature = "test-utils"))]
131    pub fn with_printer_for_test(
132        colors: Colors,
133        verbosity: Verbosity,
134        printer: SharedPrinter,
135    ) -> Self {
136        Self::with_printer(colors, verbosity, printer)
137    }
138
139    /// Set the log file path for testing.
140    ///
141    /// This allows tests to verify log file content after parsing.
142    #[cfg(any(test, feature = "test-utils"))]
143    pub fn with_log_file_for_test(mut self, path: &str) -> Self {
144        self.log_file = Some(path.to_string());
145        self
146    }
147
148    /// Parse a stream for testing purposes.
149    ///
150    /// This exposes the internal `parse_stream` method for integration tests.
151    #[cfg(any(test, feature = "test-utils"))]
152    pub fn parse_stream_for_test<R: std::io::BufRead>(&self, reader: R) -> std::io::Result<()> {
153        self.parse_stream(reader)
154    }
155
156    /// Get a shared reference to the printer.
157    ///
158    /// This allows tests, monitoring, and other code to access the printer after parsing
159    /// to verify output content, check for duplicates, or capture output for analysis.
160    /// Only available with the `test-utils` feature.
161    ///
162    /// # Returns
163    ///
164    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
165    #[cfg(feature = "test-utils")]
166    pub fn printer(&self) -> SharedPrinter {
167        Rc::clone(&self.printer)
168    }
169
170    /// Get streaming quality metrics from the current session.
171    ///
172    /// This provides insight into the deduplication and streaming quality of the
173    /// parsing session. Only available with the `test-utils` feature.
174    ///
175    /// # Returns
176    ///
177    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
178    #[cfg(feature = "test-utils")]
179    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
180        self.streaming_session
181            .borrow()
182            .get_streaming_quality_metrics()
183    }
184
185    /// Parse and display a single Gemini JSON event
186    ///
187    /// Returns `Some(formatted_output)` for valid events, or None for:
188    /// - Malformed JSON (non-JSON text passed through if meaningful)
189    /// - Unknown event types
190    /// - Empty or whitespace-only output
191    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
192        let event: GeminiEvent = if let Ok(e) = serde_json::from_str(line) {
193            e
194        } else {
195            // Non-JSON line - pass through as-is if meaningful
196            let trimmed = line.trim();
197            if !trimmed.is_empty() && !trimmed.starts_with('{') {
198                return Some(format!("{trimmed}\n"));
199            }
200            return None;
201        };
202        let c = &self.colors;
203        let prefix = &self.display_name;
204
205        let output = match event {
206            GeminiEvent::Init {
207                session_id, model, ..
208            } => self.format_init_event(session_id, model),
209            GeminiEvent::Message {
210                role,
211                content,
212                delta,
213            } => self.format_message_event(role, content, delta),
214            GeminiEvent::ToolUse {
215                tool_name,
216                parameters,
217                ..
218            } => self.format_tool_use_event(tool_name, parameters.as_ref()),
219            GeminiEvent::ToolResult { status, output, .. } => {
220                self.format_tool_result_event(status, output.as_ref())
221            }
222            GeminiEvent::Error { message, code, .. } => self.format_error_event(message, code),
223            GeminiEvent::Result { status, stats, .. } => self.format_result_event(status, stats),
224            GeminiEvent::Unknown => {
225                // Use the generic unknown event formatter for consistent handling
226                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
227            }
228        };
229
230        if output.is_empty() {
231            None
232        } else {
233            Some(output)
234        }
235    }
236
237    /// Format an Init event
238    fn format_init_event(&self, session_id: Option<String>, model: Option<String>) -> String {
239        let c = &self.colors;
240        let prefix = &self.display_name;
241
242        // Reset streaming state on new session
243        self.streaming_session.borrow_mut().on_message_start();
244        let sid = session_id.unwrap_or_else(|| "unknown".to_string());
245        // Set the current message ID for duplicate detection
246        self.streaming_session
247            .borrow_mut()
248            .set_current_message_id(Some(sid.clone()));
249        let model_str = model.unwrap_or_else(|| "unknown".to_string());
250        format!(
251            "{}[{}]{} {}Session started{} {}({:.8}..., {}){}\n",
252            c.dim(),
253            prefix,
254            c.reset(),
255            c.cyan(),
256            c.reset(),
257            c.dim(),
258            sid,
259            model_str,
260            c.reset()
261        )
262    }
263
264    /// Format a Message event
265    fn format_message_event(
266        &self,
267        role: Option<String>,
268        content: Option<String>,
269        delta: Option<bool>,
270    ) -> String {
271        let c = &self.colors;
272        let prefix = &self.display_name;
273
274        let role_str = role.unwrap_or_else(|| "unknown".to_string());
275        let is_delta = delta.unwrap_or(false);
276
277        if let Some(text) = content {
278            if is_delta && role_str == "assistant" {
279                // Accumulate delta content using StreamingSession
280                let (show_prefix, accumulated_text) = {
281                    let mut session = self.streaming_session.borrow_mut();
282                    let show_prefix = session.on_text_delta_key("main", &text);
283                    // Get accumulated text for streaming display
284                    let accumulated_text = session
285                        .get_accumulated(ContentType::Text, "main")
286                        .unwrap_or("")
287                        .to_string();
288                    (show_prefix, accumulated_text)
289                };
290
291                // Use TextDeltaRenderer for consistent rendering across all parsers
292                let terminal_mode = *self.terminal_mode.borrow();
293                if show_prefix {
294                    // First delta: use renderer with prefix
295                    return TextDeltaRenderer::render_first_delta(
296                        &accumulated_text,
297                        prefix,
298                        *c,
299                        terminal_mode,
300                    );
301                }
302                // Subsequent deltas: use renderer for in-place update
303                return TextDeltaRenderer::render_subsequent_delta(
304                    &accumulated_text,
305                    prefix,
306                    *c,
307                    terminal_mode,
308                );
309            } else if !is_delta && role_str == "assistant" {
310                // Non-delta message - check for duplicate using message ID or fallback to streaming content check
311                let session = self.streaming_session.borrow();
312                let is_duplicate = session.get_current_message_id().map_or_else(
313                    || session.has_any_streamed_content(),
314                    |message_id| session.is_duplicate_final_message(message_id),
315                );
316                let was_streaming = session.has_any_streamed_content();
317                let metrics = session.get_streaming_quality_metrics();
318                drop(session);
319
320                // Finalize the message (this marks it as displayed)
321                let _was_in_block = self.streaming_session.borrow_mut().on_message_stop();
322
323                // If this is a duplicate or content was streamed, use TextDeltaRenderer for completion
324                if is_duplicate || was_streaming {
325                    let terminal_mode = *self.terminal_mode.borrow();
326                    let completion = TextDeltaRenderer::render_completion(terminal_mode);
327                    let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
328                        && metrics.total_deltas > 0;
329                    if show_metrics {
330                        return format!("{}\n{}", completion, metrics.format(*c));
331                    }
332                    return completion;
333                }
334
335                // Otherwise, show the full content (non-streaming path)
336                let limit = self.verbosity.truncate_limit("text");
337                let preview = truncate_text(&text, limit);
338
339                return format!(
340                    "{}[{}]{} {}{}{}\n",
341                    c.dim(),
342                    prefix,
343                    c.reset(),
344                    c.white(),
345                    preview,
346                    c.reset()
347                );
348            }
349            // User or other role messages
350            let limit = self.verbosity.truncate_limit("text");
351            let preview = truncate_text(&text, limit);
352            return format!(
353                "{}[{}]{} {}{}:{} {}{}{}\n",
354                c.dim(),
355                prefix,
356                c.reset(),
357                c.blue(),
358                role_str,
359                c.reset(),
360                c.dim(),
361                preview,
362                c.reset()
363            );
364        }
365        String::new()
366    }
367
368    /// Format a `ToolUse` event
369    fn format_tool_use_event(
370        &self,
371        tool_name: Option<String>,
372        parameters: Option<&serde_json::Value>,
373    ) -> String {
374        let c = &self.colors;
375        let prefix = &self.display_name;
376
377        let tool_name = tool_name.unwrap_or_else(|| "unknown".to_string());
378        let mut out = format!(
379            "{}[{}]{} {}Tool{}: {}{}{}\n",
380            c.dim(),
381            prefix,
382            c.reset(),
383            c.magenta(),
384            c.reset(),
385            c.bold(),
386            tool_name,
387            c.reset()
388        );
389        if self.verbosity.show_tool_input() {
390            if let Some(params) = parameters {
391                let params_str = format_tool_input(params);
392                let limit = self.verbosity.truncate_limit("tool_input");
393                let preview = truncate_text(&params_str, limit);
394                if !preview.is_empty() {
395                    let _ = writeln!(
396                        out,
397                        "{}[{}]{} {}  └─ {}{}",
398                        c.dim(),
399                        prefix,
400                        c.reset(),
401                        c.dim(),
402                        preview,
403                        c.reset()
404                    );
405                }
406            }
407        }
408        out
409    }
410
411    /// Format a `ToolResult` event
412    fn format_tool_result_event(&self, status: Option<String>, output: Option<&String>) -> String {
413        let c = &self.colors;
414        let prefix = &self.display_name;
415
416        let status_str = status.unwrap_or_else(|| "unknown".to_string());
417        let is_success = status_str == "success";
418        let icon = if is_success { CHECK } else { CROSS };
419        let color = if is_success { c.green() } else { c.red() };
420
421        let mut out = format!(
422            "{}[{}]{} {}{} Tool result{}\n",
423            c.dim(),
424            prefix,
425            c.reset(),
426            color,
427            icon,
428            c.reset()
429        );
430
431        if self.verbosity.is_verbose() {
432            if let Some(output_text) = output {
433                let limit = self.verbosity.truncate_limit("tool_result");
434                let preview = truncate_text(output_text, limit);
435                let _ = writeln!(
436                    out,
437                    "{}[{}]{} {}  └─ {}{}",
438                    c.dim(),
439                    prefix,
440                    c.reset(),
441                    c.dim(),
442                    preview,
443                    c.reset()
444                );
445            }
446        }
447        out
448    }
449
450    /// Format an `Error` event
451    fn format_error_event(&self, message: Option<String>, code: Option<String>) -> String {
452        let c = &self.colors;
453        let prefix = &self.display_name;
454
455        let msg = message.unwrap_or_else(|| "unknown error".to_string());
456        let code_str = code.map_or_else(String::new, |c| format!(" ({c})"));
457        format!(
458            "{}[{}]{} {}{} Error{}:{} {}\n",
459            c.dim(),
460            prefix,
461            c.reset(),
462            c.red(),
463            CROSS,
464            code_str,
465            c.reset(),
466            msg
467        )
468    }
469
470    /// Format a `Result` event
471    fn format_result_event(
472        &self,
473        status: Option<String>,
474        event_stats: Option<crate::json_parser::types::GeminiStats>,
475    ) -> String {
476        let c = &self.colors;
477        let prefix = &self.display_name;
478
479        let status_result = status.unwrap_or_else(|| "unknown".to_string());
480        let is_success = status_result == "success";
481        let icon = if is_success { CHECK } else { CROSS };
482        let color = if is_success { c.green() } else { c.red() };
483
484        let stats_display = event_stats.map_or_else(String::new, |s| {
485            let duration_s = s.duration_ms.unwrap_or(0) / 1000;
486            let duration_m = duration_s / 60;
487            let duration_s_rem = duration_s % 60;
488            let input = s.input_tokens.unwrap_or(0);
489            let output = s.output_tokens.unwrap_or(0);
490            let tools = s.tool_calls.unwrap_or(0);
491            format!("({duration_m}m {duration_s_rem}s, in:{input} out:{output}, {tools} tools)")
492        });
493
494        format!(
495            "{}[{}]{} {}{} {}{} {}{}{}\n",
496            c.dim(),
497            prefix,
498            c.reset(),
499            color,
500            icon,
501            status_result,
502            c.reset(),
503            c.dim(),
504            stats_display,
505            c.reset()
506        )
507    }
508
509    /// Check if a Gemini event is a control event (state management with no user output)
510    ///
511    /// Control events are valid JSON that represent state transitions rather than
512    /// user-facing content. They should be tracked separately from "ignored" events
513    /// to avoid false health warnings.
514    const fn is_control_event(event: &GeminiEvent) -> bool {
515        match event {
516            // Init and Result events are control events
517            GeminiEvent::Init { .. } | GeminiEvent::Result { .. } => true,
518            _ => false,
519        }
520    }
521
522    /// Parse a stream of Gemini NDJSON events
523    pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
524        use super::incremental_parser::IncrementalNdjsonParser;
525
526        let c = &self.colors;
527        let monitor = HealthMonitor::new("Gemini");
528        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
529            std::fs::OpenOptions::new()
530                .create(true)
531                .append(true)
532                .open(log_path)
533                .ok()
534                .map(std::io::BufWriter::new)
535        });
536
537        // Use incremental parser for true real-time streaming
538        // This processes JSON as soon as it's complete, not waiting for newlines
539        let mut incremental_parser = IncrementalNdjsonParser::new();
540        let mut byte_buffer = Vec::new();
541
542        loop {
543            // Read available bytes
544            byte_buffer.clear();
545            let chunk = reader.fill_buf()?;
546            if chunk.is_empty() {
547                break;
548            }
549
550            // Process all bytes immediately
551            byte_buffer.extend_from_slice(chunk);
552            let consumed = chunk.len();
553            reader.consume(consumed);
554
555            // Feed bytes to incremental parser
556            let json_events = incremental_parser.feed(&byte_buffer);
557
558            // Process each complete JSON event immediately
559            for line in json_events {
560                let trimmed = line.trim();
561                if trimmed.is_empty() {
562                    continue;
563                }
564
565                // In debug mode, also show the raw JSON
566                if self.verbosity.is_debug() {
567                    let mut printer = self.printer.borrow_mut();
568                    writeln!(
569                        printer,
570                        "{}[DEBUG]{} {}{}{}",
571                        c.dim(),
572                        c.reset(),
573                        c.dim(),
574                        &line,
575                        c.reset()
576                    )?;
577                    printer.flush()?;
578                }
579
580                // Parse the event once - parse_event handles malformed JSON by returning None
581                match self.parse_event(&line) {
582                    Some(output) => {
583                        monitor.record_parsed();
584                        // Write output to printer
585                        let mut printer = self.printer.borrow_mut();
586                        write!(printer, "{output}")?;
587                        printer.flush()?;
588                    }
589                    None => {
590                        // Check if this was a control event (state management with no user output)
591                        if trimmed.starts_with('{') {
592                            if let Ok(event) = serde_json::from_str::<GeminiEvent>(&line) {
593                                if Self::is_control_event(&event) {
594                                    monitor.record_control_event();
595                                } else {
596                                    // Valid JSON but not a control event - track as unknown
597                                    monitor.record_unknown_event();
598                                }
599                            } else {
600                                // Failed to deserialize - track as parse error
601                                monitor.record_parse_error();
602                            }
603                        } else {
604                            monitor.record_ignored();
605                        }
606                    }
607                }
608
609                // Log raw JSON to file if configured
610                if let Some(ref mut file) = log_writer {
611                    writeln!(file, "{line}")?;
612                }
613            }
614        }
615
616        if let Some(ref mut file) = log_writer {
617            file.flush()?;
618            // Ensure data is written to disk before continuing
619            // This prevents race conditions where extraction runs before OS commits writes
620            let _ = file.get_mut().sync_all();
621        }
622        if let Some(warning) = monitor.check_and_warn(*c) {
623            let mut printer = self.printer.borrow_mut();
624            writeln!(printer, "{warning}\n")?;
625        }
626        Ok(())
627    }
628}