ralph_workflow/json_parser/
gemini.rs

1//! Gemini CLI JSON parser.
2//!
3//! Parses NDJSON output from Gemini CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `message` events with `delta: true`), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Gemini] Hello\r         (first delta with prefix, no newline)
19//! \x1b[2K\r[Gemini] Hello World\r  (second delta clears line, rewrites with accumulated)
20//! [Gemini] Hello World\n   (final non-delta message shows complete result)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32
33use crate::common::truncate_text;
34use crate::config::Verbosity;
35use crate::logger::{Colors, CHECK, CROSS};
36use std::cell::RefCell;
37use std::fmt::Write as _;
38use std::io::{self, BufRead, Write};
39use std::rc::Rc;
40
41use super::delta_display;
42use super::delta_display::{DeltaRenderer, TextDeltaRenderer};
43use super::health::HealthMonitor;
44#[cfg(feature = "test-utils")]
45use super::health::StreamingQualityMetrics;
46use super::printer::SharedPrinter;
47use super::streaming_state::StreamingSession;
48use super::terminal::TerminalMode;
49use super::types::{format_tool_input, format_unknown_json_event, ContentType, GeminiEvent};
50
51/// Gemini event parser
52pub struct GeminiParser {
53    colors: Colors,
54    verbosity: Verbosity,
55    log_file: Option<String>,
56    display_name: String,
57    /// Unified streaming session for state tracking
58    streaming_session: Rc<RefCell<StreamingSession>>,
59    /// Terminal mode for output formatting
60    terminal_mode: RefCell<TerminalMode>,
61    /// Whether to show streaming quality metrics
62    show_streaming_metrics: bool,
63    /// Output printer for capturing or displaying output
64    printer: SharedPrinter,
65}
66
67impl GeminiParser {
68    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
69        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
70    }
71
72    /// Create a new `GeminiParser` with a custom printer.
73    ///
74    /// # Arguments
75    ///
76    /// * `colors` - Colors for terminal output
77    /// * `verbosity` - Verbosity level for output
78    /// * `printer` - Shared printer for output
79    ///
80    /// # Returns
81    ///
82    /// A new `GeminiParser` instance
83    pub(crate) fn with_printer(
84        colors: Colors,
85        verbosity: Verbosity,
86        printer: SharedPrinter,
87    ) -> Self {
88        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
89        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
90
91        // Use the printer's is_terminal method to validate it's connected correctly
92        let _printer_is_terminal = printer.borrow().is_terminal();
93
94        Self {
95            colors,
96            verbosity,
97            log_file: None,
98            display_name: "Gemini".to_string(),
99            streaming_session: Rc::new(RefCell::new(streaming_session)),
100            terminal_mode: RefCell::new(TerminalMode::detect()),
101            show_streaming_metrics: false,
102            printer,
103        }
104    }
105
106    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
107        self.show_streaming_metrics = show;
108        self
109    }
110
111    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
112        self.display_name = display_name.to_string();
113        self
114    }
115
116    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
117        self.log_file = Some(path.to_string());
118        self
119    }
120
121    #[cfg(test)]
122    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
123        *self.terminal_mode.borrow_mut() = mode;
124        self
125    }
126
127    /// Create a new parser with a test printer.
128    ///
129    /// This is the primary entry point for integration tests that need
130    /// to capture parser output for verification.
131    #[cfg(any(test, feature = "test-utils"))]
132    pub fn with_printer_for_test(
133        colors: Colors,
134        verbosity: Verbosity,
135        printer: SharedPrinter,
136    ) -> Self {
137        Self::with_printer(colors, verbosity, printer)
138    }
139
140    /// Set the log file path for testing.
141    ///
142    /// This allows tests to verify log file content after parsing.
143    #[cfg(any(test, feature = "test-utils"))]
144    pub fn with_log_file_for_test(mut self, path: &str) -> Self {
145        self.log_file = Some(path.to_string());
146        self
147    }
148
149    /// Parse a stream for testing purposes.
150    ///
151    /// This exposes the internal `parse_stream` method for integration tests.
152    #[cfg(any(test, feature = "test-utils"))]
153    pub fn parse_stream_for_test<R: std::io::BufRead>(&self, reader: R) -> std::io::Result<()> {
154        self.parse_stream(reader)
155    }
156
157    /// Get a shared reference to the printer.
158    ///
159    /// This allows tests, monitoring, and other code to access the printer after parsing
160    /// to verify output content, check for duplicates, or capture output for analysis.
161    /// Only available with the `test-utils` feature.
162    ///
163    /// # Returns
164    ///
165    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
166    #[cfg(feature = "test-utils")]
167    pub fn printer(&self) -> SharedPrinter {
168        Rc::clone(&self.printer)
169    }
170
171    /// Get streaming quality metrics from the current session.
172    ///
173    /// This provides insight into the deduplication and streaming quality of the
174    /// parsing session. Only available with the `test-utils` feature.
175    ///
176    /// # Returns
177    ///
178    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
179    #[cfg(feature = "test-utils")]
180    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
181        self.streaming_session
182            .borrow()
183            .get_streaming_quality_metrics()
184    }
185
186    /// Parse and display a single Gemini JSON event
187    ///
188    /// Returns `Some(formatted_output)` for valid events, or None for:
189    /// - Malformed JSON (non-JSON text passed through if meaningful)
190    /// - Unknown event types
191    /// - Empty or whitespace-only output
192    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
193        let event: GeminiEvent = if let Ok(e) = serde_json::from_str(line) {
194            e
195        } else {
196            // Non-JSON line - pass through as-is if meaningful
197            let trimmed = line.trim();
198            if !trimmed.is_empty() && !trimmed.starts_with('{') {
199                return Some(format!("{trimmed}\n"));
200            }
201            return None;
202        };
203        let c = &self.colors;
204        let prefix = &self.display_name;
205
206        let output = match event {
207            GeminiEvent::Init {
208                session_id, model, ..
209            } => self.format_init_event(session_id, model),
210            GeminiEvent::Message {
211                role,
212                content,
213                delta,
214            } => self.format_message_event(role, content, delta),
215            GeminiEvent::ToolUse {
216                tool_name,
217                parameters,
218                ..
219            } => self.format_tool_use_event(tool_name, parameters.as_ref()),
220            GeminiEvent::ToolResult { status, output, .. } => {
221                self.format_tool_result_event(status, output.as_ref())
222            }
223            GeminiEvent::Error { message, code, .. } => self.format_error_event(message, code),
224            GeminiEvent::Result { status, stats, .. } => self.format_result_event(status, stats),
225            GeminiEvent::Unknown => {
226                // Use the generic unknown event formatter for consistent handling
227                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
228            }
229        };
230
231        if output.is_empty() {
232            None
233        } else {
234            Some(output)
235        }
236    }
237
238    /// Format an Init event
239    fn format_init_event(&self, session_id: Option<String>, model: Option<String>) -> String {
240        let c = &self.colors;
241        let prefix = &self.display_name;
242
243        // Reset streaming state on new session
244        self.streaming_session.borrow_mut().on_message_start();
245        let sid = session_id.unwrap_or_else(|| "unknown".to_string());
246        // Set the current message ID for duplicate detection
247        self.streaming_session
248            .borrow_mut()
249            .set_current_message_id(Some(sid.clone()));
250        let model_str = model.unwrap_or_else(|| "unknown".to_string());
251        format!(
252            "{}[{}]{} {}Session started{} {}({:.8}..., {}){}\n",
253            c.dim(),
254            prefix,
255            c.reset(),
256            c.cyan(),
257            c.reset(),
258            c.dim(),
259            sid,
260            model_str,
261            c.reset()
262        )
263    }
264
265    /// Format a Message event
266    fn format_message_event(
267        &self,
268        role: Option<String>,
269        content: Option<String>,
270        delta: Option<bool>,
271    ) -> String {
272        let c = &self.colors;
273        let prefix = &self.display_name;
274
275        let role_str = role.unwrap_or_else(|| "unknown".to_string());
276        let is_delta = delta.unwrap_or(false);
277
278        if let Some(text) = content {
279            if is_delta && role_str == "assistant" {
280                // Accumulate delta content using StreamingSession.
281                //
282                // StreamingSession handles streaming state management (including snapshot-as-delta
283                // repairs and consecutive duplicate filtering). The parser layer handles:
284                // - Skipping whitespace-only accumulated output
285                // - Hash-based deduplication after sanitization (whitespace-insensitive)
286                // - Prefix-aware in-place updates via TextDeltaRenderer
287                let (show_prefix, accumulated_text, has_prefix) = {
288                    let mut session = self.streaming_session.borrow_mut();
289                    let show_prefix = session.on_text_delta(0, &text);
290
291                    let accumulated_text = session
292                        .get_accumulated(ContentType::Text, "0")
293                        .unwrap_or("")
294                        .to_string();
295
296                    let sanitized_text = delta_display::sanitize_for_display(&accumulated_text);
297                    if sanitized_text.is_empty() {
298                        return String::new();
299                    }
300
301                    if session.is_content_hash_rendered(ContentType::Text, "0", &sanitized_text) {
302                        return String::new();
303                    }
304
305                    let has_prefix = session.has_rendered_prefix(ContentType::Text, "0");
306
307                    session.mark_rendered(ContentType::Text, "0");
308                    session.mark_content_hash_rendered(ContentType::Text, "0", &sanitized_text);
309
310                    (show_prefix, accumulated_text, has_prefix)
311                };
312
313                let terminal_mode = *self.terminal_mode.borrow();
314                if show_prefix && !has_prefix {
315                    return TextDeltaRenderer::render_first_delta(
316                        &accumulated_text,
317                        prefix,
318                        *c,
319                        terminal_mode,
320                    );
321                }
322
323                return TextDeltaRenderer::render_subsequent_delta(
324                    &accumulated_text,
325                    prefix,
326                    *c,
327                    terminal_mode,
328                );
329            } else if !is_delta && role_str == "assistant" {
330                // Non-delta message - check for duplicate using message ID or fallback to streaming content check
331                let session = self.streaming_session.borrow();
332                let is_duplicate = session.get_current_message_id().map_or_else(
333                    || session.has_any_streamed_content(),
334                    |message_id| session.is_duplicate_final_message(message_id),
335                );
336                let was_streaming = session.has_any_streamed_content();
337                let metrics = session.get_streaming_quality_metrics();
338                drop(session);
339
340                // Finalize the message (this marks it as displayed)
341                let _was_in_block = self.streaming_session.borrow_mut().on_message_stop();
342
343                // If this is a duplicate or content was streamed, use TextDeltaRenderer for completion
344                if is_duplicate || was_streaming {
345                    let terminal_mode = *self.terminal_mode.borrow();
346                    let completion = TextDeltaRenderer::render_completion(terminal_mode);
347                    let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
348                        && metrics.total_deltas > 0;
349                    if show_metrics {
350                        return format!("{}\n{}", completion, metrics.format(*c));
351                    }
352                    return completion;
353                }
354
355                // Otherwise, show the full content (non-streaming path)
356                let limit = self.verbosity.truncate_limit("text");
357                let preview = truncate_text(&text, limit);
358
359                return format!(
360                    "{}[{}]{} {}{}{}\n",
361                    c.dim(),
362                    prefix,
363                    c.reset(),
364                    c.white(),
365                    preview,
366                    c.reset()
367                );
368            }
369            // User or other role messages
370            let limit = self.verbosity.truncate_limit("text");
371            let preview = truncate_text(&text, limit);
372            return format!(
373                "{}[{}]{} {}{}:{} {}{}{}\n",
374                c.dim(),
375                prefix,
376                c.reset(),
377                c.blue(),
378                role_str,
379                c.reset(),
380                c.dim(),
381                preview,
382                c.reset()
383            );
384        }
385        String::new()
386    }
387
388    /// Format a `ToolUse` event
389    fn format_tool_use_event(
390        &self,
391        tool_name: Option<String>,
392        parameters: Option<&serde_json::Value>,
393    ) -> String {
394        let c = &self.colors;
395        let prefix = &self.display_name;
396
397        let tool_name = tool_name.unwrap_or_else(|| "unknown".to_string());
398        let mut out = format!(
399            "{}[{}]{} {}Tool{}: {}{}{}\n",
400            c.dim(),
401            prefix,
402            c.reset(),
403            c.magenta(),
404            c.reset(),
405            c.bold(),
406            tool_name,
407            c.reset()
408        );
409        if self.verbosity.show_tool_input() {
410            if let Some(params) = parameters {
411                let params_str = format_tool_input(params);
412                let limit = self.verbosity.truncate_limit("tool_input");
413                let preview = truncate_text(&params_str, limit);
414                if !preview.is_empty() {
415                    let _ = writeln!(
416                        out,
417                        "{}[{}]{} {}  └─ {}{}",
418                        c.dim(),
419                        prefix,
420                        c.reset(),
421                        c.dim(),
422                        preview,
423                        c.reset()
424                    );
425                }
426            }
427        }
428        out
429    }
430
431    /// Format a `ToolResult` event
432    fn format_tool_result_event(&self, status: Option<String>, output: Option<&String>) -> String {
433        let c = &self.colors;
434        let prefix = &self.display_name;
435
436        let status_str = status.unwrap_or_else(|| "unknown".to_string());
437        let is_success = status_str == "success";
438        let icon = if is_success { CHECK } else { CROSS };
439        let color = if is_success { c.green() } else { c.red() };
440
441        let mut out = format!(
442            "{}[{}]{} {}{} Tool result{}\n",
443            c.dim(),
444            prefix,
445            c.reset(),
446            color,
447            icon,
448            c.reset()
449        );
450
451        if self.verbosity.is_verbose() {
452            if let Some(output_text) = output {
453                let limit = self.verbosity.truncate_limit("tool_result");
454                let preview = truncate_text(output_text, limit);
455                let _ = writeln!(
456                    out,
457                    "{}[{}]{} {}  └─ {}{}",
458                    c.dim(),
459                    prefix,
460                    c.reset(),
461                    c.dim(),
462                    preview,
463                    c.reset()
464                );
465            }
466        }
467        out
468    }
469
470    /// Format an `Error` event
471    fn format_error_event(&self, message: Option<String>, code: Option<String>) -> String {
472        let c = &self.colors;
473        let prefix = &self.display_name;
474
475        let msg = message.unwrap_or_else(|| "unknown error".to_string());
476        let code_str = code.map_or_else(String::new, |c| format!(" ({c})"));
477        format!(
478            "{}[{}]{} {}{} Error{}:{} {}\n",
479            c.dim(),
480            prefix,
481            c.reset(),
482            c.red(),
483            CROSS,
484            code_str,
485            c.reset(),
486            msg
487        )
488    }
489
490    /// Format a `Result` event
491    fn format_result_event(
492        &self,
493        status: Option<String>,
494        event_stats: Option<crate::json_parser::types::GeminiStats>,
495    ) -> String {
496        let c = &self.colors;
497        let prefix = &self.display_name;
498
499        let status_result = status.unwrap_or_else(|| "unknown".to_string());
500        let is_success = status_result == "success";
501        let icon = if is_success { CHECK } else { CROSS };
502        let color = if is_success { c.green() } else { c.red() };
503
504        let stats_display = event_stats.map_or_else(String::new, |s| {
505            let duration_s = s.duration_ms.unwrap_or(0) / 1000;
506            let duration_m = duration_s / 60;
507            let duration_s_rem = duration_s % 60;
508            let input = s.input_tokens.unwrap_or(0);
509            let output = s.output_tokens.unwrap_or(0);
510            let tools = s.tool_calls.unwrap_or(0);
511            format!("({duration_m}m {duration_s_rem}s, in:{input} out:{output}, {tools} tools)")
512        });
513
514        format!(
515            "{}[{}]{} {}{} {}{} {}{}{}\n",
516            c.dim(),
517            prefix,
518            c.reset(),
519            color,
520            icon,
521            status_result,
522            c.reset(),
523            c.dim(),
524            stats_display,
525            c.reset()
526        )
527    }
528
529    /// Check if a Gemini event is a control event (state management with no user output)
530    ///
531    /// Control events are valid JSON that represent state transitions rather than
532    /// user-facing content. They should be tracked separately from "ignored" events
533    /// to avoid false health warnings.
534    const fn is_control_event(event: &GeminiEvent) -> bool {
535        match event {
536            // Init and Result events are control events
537            GeminiEvent::Init { .. } | GeminiEvent::Result { .. } => true,
538            _ => false,
539        }
540    }
541
542    /// Parse a stream of Gemini NDJSON events
543    pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
544        use super::incremental_parser::IncrementalNdjsonParser;
545
546        let c = &self.colors;
547        let monitor = HealthMonitor::new("Gemini");
548        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
549            std::fs::OpenOptions::new()
550                .create(true)
551                .append(true)
552                .open(log_path)
553                .ok()
554                .map(std::io::BufWriter::new)
555        });
556
557        // Use incremental parser for true real-time streaming
558        // This processes JSON as soon as it's complete, not waiting for newlines
559        let mut incremental_parser = IncrementalNdjsonParser::new();
560        let mut byte_buffer = Vec::new();
561
562        loop {
563            // Read available bytes
564            byte_buffer.clear();
565            let chunk = reader.fill_buf()?;
566            if chunk.is_empty() {
567                break;
568            }
569
570            // Process all bytes immediately
571            byte_buffer.extend_from_slice(chunk);
572            let consumed = chunk.len();
573            reader.consume(consumed);
574
575            // Feed bytes to incremental parser
576            let json_events = incremental_parser.feed(&byte_buffer);
577
578            // Process each complete JSON event immediately
579            for line in json_events {
580                let trimmed = line.trim();
581                if trimmed.is_empty() {
582                    continue;
583                }
584
585                // In debug mode, also show the raw JSON
586                if self.verbosity.is_debug() {
587                    let mut printer = self.printer.borrow_mut();
588                    writeln!(
589                        printer,
590                        "{}[DEBUG]{} {}{}{}",
591                        c.dim(),
592                        c.reset(),
593                        c.dim(),
594                        &line,
595                        c.reset()
596                    )?;
597                    printer.flush()?;
598                }
599
600                // Parse the event once - parse_event handles malformed JSON by returning None
601                match self.parse_event(&line) {
602                    Some(output) => {
603                        monitor.record_parsed();
604                        // Write output to printer
605                        let mut printer = self.printer.borrow_mut();
606                        write!(printer, "{output}")?;
607                        printer.flush()?;
608                    }
609                    None => {
610                        // Check if this was a control event (state management with no user output)
611                        if trimmed.starts_with('{') {
612                            if let Ok(event) = serde_json::from_str::<GeminiEvent>(&line) {
613                                if Self::is_control_event(&event) {
614                                    monitor.record_control_event();
615                                } else {
616                                    // Valid JSON but not a control event - track as unknown
617                                    monitor.record_unknown_event();
618                                }
619                            } else {
620                                // Failed to deserialize - track as parse error
621                                monitor.record_parse_error();
622                            }
623                        } else {
624                            monitor.record_ignored();
625                        }
626                    }
627                }
628
629                // Log raw JSON to file if configured
630                if let Some(ref mut file) = log_writer {
631                    writeln!(file, "{line}")?;
632                }
633            }
634        }
635
636        if let Some(ref mut file) = log_writer {
637            file.flush()?;
638            // Ensure data is written to disk before continuing
639            // This prevents race conditions where extraction runs before OS commits writes
640            let _ = file.get_mut().sync_all();
641        }
642        if let Some(warning) = monitor.check_and_warn(*c) {
643            let mut printer = self.printer.borrow_mut();
644            writeln!(printer, "{warning}\n")?;
645        }
646        Ok(())
647    }
648}