Skip to main content

ralph_workflow/json_parser/
gemini.rs

1//! Gemini CLI JSON parser.
2//!
3//! Parses NDJSON output from Gemini CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `message` events with `delta: true`), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Gemini] Hello\r         (first delta with prefix, no newline)
19//! \x1b[2K\r[Gemini] Hello World\r  (second delta clears line, rewrites with accumulated)
20//! [Gemini] Hello World\n   (final non-delta message shows complete result)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32
33use crate::common::truncate_text;
34use crate::config::Verbosity;
35use crate::logger::{Colors, CHECK, CROSS};
36use std::cell::RefCell;
37use std::fmt::Write as _;
38use std::io::{self, BufRead, Write};
39use std::rc::Rc;
40
41use super::delta_display;
42use super::delta_display::{DeltaRenderer, TextDeltaRenderer};
43use super::health::HealthMonitor;
44#[cfg(feature = "test-utils")]
45use super::health::StreamingQualityMetrics;
46use super::printer::SharedPrinter;
47use super::streaming_state::StreamingSession;
48use super::terminal::TerminalMode;
49use super::types::{format_tool_input, format_unknown_json_event, ContentType, GeminiEvent};
50
51/// Gemini event parser
52pub struct GeminiParser {
53    colors: Colors,
54    verbosity: Verbosity,
55    /// Relative path to log file (if logging enabled)
56    log_path: Option<std::path::PathBuf>,
57    display_name: String,
58    /// Unified streaming session for state tracking
59    streaming_session: Rc<RefCell<StreamingSession>>,
60    /// Terminal mode for output formatting
61    terminal_mode: RefCell<TerminalMode>,
62    /// Whether to show streaming quality metrics
63    show_streaming_metrics: bool,
64    /// Output printer for capturing or displaying output
65    printer: SharedPrinter,
66}
67
68impl GeminiParser {
69    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
70        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
71    }
72
73    /// Create a new `GeminiParser` with a custom printer.
74    ///
75    /// # Arguments
76    ///
77    /// * `colors` - Colors for terminal output
78    /// * `verbosity` - Verbosity level for output
79    /// * `printer` - Shared printer for output
80    ///
81    /// # Returns
82    ///
83    /// A new `GeminiParser` instance
84    pub(crate) fn with_printer(
85        colors: Colors,
86        verbosity: Verbosity,
87        printer: SharedPrinter,
88    ) -> Self {
89        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
90        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
91
92        // Use the printer's is_terminal method to validate it's connected correctly
93        let _printer_is_terminal = printer.borrow().is_terminal();
94
95        Self {
96            colors,
97            verbosity,
98            log_path: None,
99            display_name: "Gemini".to_string(),
100            streaming_session: Rc::new(RefCell::new(streaming_session)),
101            terminal_mode: RefCell::new(TerminalMode::detect()),
102            show_streaming_metrics: false,
103            printer,
104        }
105    }
106
107    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
108        self.show_streaming_metrics = show;
109        self
110    }
111
112    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
113        self.display_name = display_name.to_string();
114        self
115    }
116
117    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
118        self.log_path = Some(std::path::PathBuf::from(path));
119        self
120    }
121
122    #[cfg(test)]
123    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
124        *self.terminal_mode.borrow_mut() = mode;
125        self
126    }
127
128    /// Create a new parser with a test printer.
129    ///
130    /// This is the primary entry point for integration tests that need
131    /// to capture parser output for verification.
132    #[cfg(any(test, feature = "test-utils"))]
133    pub fn with_printer_for_test(
134        colors: Colors,
135        verbosity: Verbosity,
136        printer: SharedPrinter,
137    ) -> Self {
138        Self::with_printer(colors, verbosity, printer)
139    }
140
141    /// Set the log file path for testing.
142    ///
143    /// This allows tests to verify log file content after parsing.
144    #[cfg(any(test, feature = "test-utils"))]
145    pub fn with_log_file_for_test(mut self, path: &str) -> Self {
146        self.log_path = Some(std::path::PathBuf::from(path));
147        self
148    }
149
150    /// Parse a stream for testing purposes.
151    ///
152    /// This exposes the internal `parse_stream` method for integration tests.
153    #[cfg(any(test, feature = "test-utils"))]
154    pub fn parse_stream_for_test<R: std::io::BufRead>(
155        &self,
156        reader: R,
157        workspace: &dyn crate::workspace::Workspace,
158    ) -> std::io::Result<()> {
159        self.parse_stream(reader, workspace)
160    }
161
162    /// Get a shared reference to the printer.
163    ///
164    /// This allows tests, monitoring, and other code to access the printer after parsing
165    /// to verify output content, check for duplicates, or capture output for analysis.
166    /// Only available with the `test-utils` feature.
167    ///
168    /// # Returns
169    ///
170    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
171    #[cfg(feature = "test-utils")]
172    pub fn printer(&self) -> SharedPrinter {
173        Rc::clone(&self.printer)
174    }
175
176    /// Get streaming quality metrics from the current session.
177    ///
178    /// This provides insight into the deduplication and streaming quality of the
179    /// parsing session. Only available with the `test-utils` feature.
180    ///
181    /// # Returns
182    ///
183    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
184    #[cfg(feature = "test-utils")]
185    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
186        self.streaming_session
187            .borrow()
188            .get_streaming_quality_metrics()
189    }
190
191    /// Parse and display a single Gemini JSON event
192    ///
193    /// Returns `Some(formatted_output)` for valid events, or None for:
194    /// - Malformed JSON (non-JSON text passed through if meaningful)
195    /// - Unknown event types
196    /// - Empty or whitespace-only output
197    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
198        let event: GeminiEvent = if let Ok(e) = serde_json::from_str(line) {
199            e
200        } else {
201            // Non-JSON line - pass through as-is if meaningful
202            let trimmed = line.trim();
203            if !trimmed.is_empty() && !trimmed.starts_with('{') {
204                return Some(format!("{trimmed}\n"));
205            }
206            return None;
207        };
208        let c = &self.colors;
209        let prefix = &self.display_name;
210
211        let output = match event {
212            GeminiEvent::Init {
213                session_id, model, ..
214            } => self.format_init_event(session_id, model),
215            GeminiEvent::Message {
216                role,
217                content,
218                delta,
219            } => self.format_message_event(role, content, delta),
220            GeminiEvent::ToolUse {
221                tool_name,
222                parameters,
223                ..
224            } => self.format_tool_use_event(tool_name, parameters.as_ref()),
225            GeminiEvent::ToolResult { status, output, .. } => {
226                self.format_tool_result_event(status, output.as_ref())
227            }
228            GeminiEvent::Error { message, code, .. } => self.format_error_event(message, code),
229            GeminiEvent::Result { status, stats, .. } => self.format_result_event(status, stats),
230            GeminiEvent::Unknown => {
231                // Use the generic unknown event formatter for consistent handling
232                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
233            }
234        };
235
236        if output.is_empty() {
237            None
238        } else {
239            Some(output)
240        }
241    }
242
243    /// Format an Init event
244    fn format_init_event(&self, session_id: Option<String>, model: Option<String>) -> String {
245        let c = &self.colors;
246        let prefix = &self.display_name;
247
248        // Reset streaming state on new session
249        self.streaming_session.borrow_mut().on_message_start();
250        let sid = session_id.unwrap_or_else(|| "unknown".to_string());
251        // Set the current message ID for duplicate detection
252        self.streaming_session
253            .borrow_mut()
254            .set_current_message_id(Some(sid.clone()));
255        let model_str = model.unwrap_or_else(|| "unknown".to_string());
256        format!(
257            "{}[{}]{} {}Session started{} {}({:.8}..., {}){}\n",
258            c.dim(),
259            prefix,
260            c.reset(),
261            c.cyan(),
262            c.reset(),
263            c.dim(),
264            sid,
265            model_str,
266            c.reset()
267        )
268    }
269
270    /// Format a Message event
271    fn format_message_event(
272        &self,
273        role: Option<String>,
274        content: Option<String>,
275        delta: Option<bool>,
276    ) -> String {
277        let c = &self.colors;
278        let prefix = &self.display_name;
279
280        let role_str = role.unwrap_or_else(|| "unknown".to_string());
281        let is_delta = delta.unwrap_or(false);
282
283        if let Some(text) = content {
284            if is_delta && role_str == "assistant" {
285                // Accumulate delta content using StreamingSession.
286                //
287                // StreamingSession handles streaming state management (including snapshot-as-delta
288                // repairs and consecutive duplicate filtering). The parser layer handles:
289                // - Skipping whitespace-only accumulated output
290                // - Hash-based deduplication after sanitization (whitespace-insensitive)
291                // - Prefix-aware in-place updates via TextDeltaRenderer
292                let (show_prefix, accumulated_text, has_prefix) = {
293                    let mut session = self.streaming_session.borrow_mut();
294                    let show_prefix = session.on_text_delta(0, &text);
295
296                    let accumulated_text = session
297                        .get_accumulated(ContentType::Text, "0")
298                        .unwrap_or("")
299                        .to_string();
300
301                    let sanitized_text = delta_display::sanitize_for_display(&accumulated_text);
302                    if sanitized_text.is_empty() {
303                        return String::new();
304                    }
305
306                    if session.is_content_hash_rendered(ContentType::Text, "0", &sanitized_text) {
307                        return String::new();
308                    }
309
310                    let has_prefix = session.has_rendered_prefix(ContentType::Text, "0");
311
312                    session.mark_rendered(ContentType::Text, "0");
313                    session.mark_content_hash_rendered(ContentType::Text, "0", &sanitized_text);
314
315                    (show_prefix, accumulated_text, has_prefix)
316                };
317
318                let terminal_mode = *self.terminal_mode.borrow();
319                if show_prefix && !has_prefix {
320                    return TextDeltaRenderer::render_first_delta(
321                        &accumulated_text,
322                        prefix,
323                        *c,
324                        terminal_mode,
325                    );
326                }
327
328                return TextDeltaRenderer::render_subsequent_delta(
329                    &accumulated_text,
330                    prefix,
331                    *c,
332                    terminal_mode,
333                );
334            } else if !is_delta && role_str == "assistant" {
335                // Non-delta message - check for duplicate using message ID or fallback to streaming content check
336                let session = self.streaming_session.borrow();
337                let is_duplicate = session.get_current_message_id().map_or_else(
338                    || session.has_any_streamed_content(),
339                    |message_id| session.is_duplicate_final_message(message_id),
340                );
341                let was_streaming = session.has_any_streamed_content();
342                let metrics = session.get_streaming_quality_metrics();
343                drop(session);
344
345                // Finalize the message (this marks it as displayed)
346                let _was_in_block = self.streaming_session.borrow_mut().on_message_stop();
347
348                // If this is a duplicate or content was streamed, use TextDeltaRenderer for completion
349                if is_duplicate || was_streaming {
350                    let terminal_mode = *self.terminal_mode.borrow();
351                    let completion = TextDeltaRenderer::render_completion(terminal_mode);
352                    let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
353                        && metrics.total_deltas > 0;
354                    if show_metrics {
355                        return format!("{}\n{}", completion, metrics.format(*c));
356                    }
357                    return completion;
358                }
359
360                // Otherwise, show the full content (non-streaming path)
361                let limit = self.verbosity.truncate_limit("text");
362                let preview = truncate_text(&text, limit);
363
364                return format!(
365                    "{}[{}]{} {}{}{}\n",
366                    c.dim(),
367                    prefix,
368                    c.reset(),
369                    c.white(),
370                    preview,
371                    c.reset()
372                );
373            }
374            // User or other role messages
375            let limit = self.verbosity.truncate_limit("text");
376            let preview = truncate_text(&text, limit);
377            return format!(
378                "{}[{}]{} {}{}:{} {}{}{}\n",
379                c.dim(),
380                prefix,
381                c.reset(),
382                c.blue(),
383                role_str,
384                c.reset(),
385                c.dim(),
386                preview,
387                c.reset()
388            );
389        }
390        String::new()
391    }
392
393    /// Format a `ToolUse` event
394    fn format_tool_use_event(
395        &self,
396        tool_name: Option<String>,
397        parameters: Option<&serde_json::Value>,
398    ) -> String {
399        let c = &self.colors;
400        let prefix = &self.display_name;
401
402        let tool_name = tool_name.unwrap_or_else(|| "unknown".to_string());
403        let mut out = format!(
404            "{}[{}]{} {}Tool{}: {}{}{}\n",
405            c.dim(),
406            prefix,
407            c.reset(),
408            c.magenta(),
409            c.reset(),
410            c.bold(),
411            tool_name,
412            c.reset()
413        );
414        if self.verbosity.show_tool_input() {
415            if let Some(params) = parameters {
416                let params_str = format_tool_input(params);
417                let limit = self.verbosity.truncate_limit("tool_input");
418                let preview = truncate_text(&params_str, limit);
419                if !preview.is_empty() {
420                    let _ = writeln!(
421                        out,
422                        "{}[{}]{} {}  └─ {}{}",
423                        c.dim(),
424                        prefix,
425                        c.reset(),
426                        c.dim(),
427                        preview,
428                        c.reset()
429                    );
430                }
431            }
432        }
433        out
434    }
435
436    /// Format a `ToolResult` event
437    fn format_tool_result_event(&self, status: Option<String>, output: Option<&String>) -> String {
438        let c = &self.colors;
439        let prefix = &self.display_name;
440
441        let status_str = status.unwrap_or_else(|| "unknown".to_string());
442        let is_success = status_str == "success";
443        let icon = if is_success { CHECK } else { CROSS };
444        let color = if is_success { c.green() } else { c.red() };
445
446        let mut out = format!(
447            "{}[{}]{} {}{} Tool result{}\n",
448            c.dim(),
449            prefix,
450            c.reset(),
451            color,
452            icon,
453            c.reset()
454        );
455
456        if self.verbosity.is_verbose() {
457            if let Some(output_text) = output {
458                let limit = self.verbosity.truncate_limit("tool_result");
459                let preview = truncate_text(output_text, limit);
460                let _ = writeln!(
461                    out,
462                    "{}[{}]{} {}  └─ {}{}",
463                    c.dim(),
464                    prefix,
465                    c.reset(),
466                    c.dim(),
467                    preview,
468                    c.reset()
469                );
470            }
471        }
472        out
473    }
474
475    /// Format an `Error` event
476    fn format_error_event(&self, message: Option<String>, code: Option<String>) -> String {
477        let c = &self.colors;
478        let prefix = &self.display_name;
479
480        let msg = message.unwrap_or_else(|| "unknown error".to_string());
481        let code_str = code.map_or_else(String::new, |c| format!(" ({c})"));
482        format!(
483            "{}[{}]{} {}{} Error{}:{} {}\n",
484            c.dim(),
485            prefix,
486            c.reset(),
487            c.red(),
488            CROSS,
489            code_str,
490            c.reset(),
491            msg
492        )
493    }
494
495    /// Format a `Result` event
496    fn format_result_event(
497        &self,
498        status: Option<String>,
499        event_stats: Option<crate::json_parser::types::GeminiStats>,
500    ) -> String {
501        let c = &self.colors;
502        let prefix = &self.display_name;
503
504        let status_result = status.unwrap_or_else(|| "unknown".to_string());
505        let is_success = status_result == "success";
506        let icon = if is_success { CHECK } else { CROSS };
507        let color = if is_success { c.green() } else { c.red() };
508
509        let stats_display = event_stats.map_or_else(String::new, |s| {
510            let duration_s = s.duration_ms.unwrap_or(0) / 1000;
511            let duration_m = duration_s / 60;
512            let duration_s_rem = duration_s % 60;
513            let input = s.input_tokens.unwrap_or(0);
514            let output = s.output_tokens.unwrap_or(0);
515            let tools = s.tool_calls.unwrap_or(0);
516            format!("({duration_m}m {duration_s_rem}s, in:{input} out:{output}, {tools} tools)")
517        });
518
519        format!(
520            "{}[{}]{} {}{} {}{} {}{}{}\n",
521            c.dim(),
522            prefix,
523            c.reset(),
524            color,
525            icon,
526            status_result,
527            c.reset(),
528            c.dim(),
529            stats_display,
530            c.reset()
531        )
532    }
533
534    /// Check if a Gemini event is a control event (state management with no user output)
535    ///
536    /// Control events are valid JSON that represent state transitions rather than
537    /// user-facing content. They should be tracked separately from "ignored" events
538    /// to avoid false health warnings.
539    const fn is_control_event(event: &GeminiEvent) -> bool {
540        match event {
541            // Init and Result events are control events
542            GeminiEvent::Init { .. } | GeminiEvent::Result { .. } => true,
543            _ => false,
544        }
545    }
546
547    /// Parse a stream of Gemini NDJSON events
548    pub(crate) fn parse_stream<R: BufRead>(
549        &self,
550        mut reader: R,
551        workspace: &dyn crate::workspace::Workspace,
552    ) -> io::Result<()> {
553        use super::incremental_parser::IncrementalNdjsonParser;
554
555        let c = &self.colors;
556        let monitor = HealthMonitor::new("Gemini");
557        // Accumulate log content in memory, write to workspace at the end
558        let logging_enabled = self.log_path.is_some();
559        let mut log_buffer: Vec<u8> = Vec::new();
560
561        // Use incremental parser for true real-time streaming
562        // This processes JSON as soon as it's complete, not waiting for newlines
563        let mut incremental_parser = IncrementalNdjsonParser::new();
564        let mut byte_buffer = Vec::new();
565
566        loop {
567            // Read available bytes
568            byte_buffer.clear();
569            let chunk = reader.fill_buf()?;
570            if chunk.is_empty() {
571                break;
572            }
573
574            // Process all bytes immediately
575            byte_buffer.extend_from_slice(chunk);
576            let consumed = chunk.len();
577            reader.consume(consumed);
578
579            // Feed bytes to incremental parser
580            let json_events = incremental_parser.feed(&byte_buffer);
581
582            // Process each complete JSON event immediately
583            for line in json_events {
584                let trimmed = line.trim();
585                if trimmed.is_empty() {
586                    continue;
587                }
588
589                // In debug mode, also show the raw JSON
590                if self.verbosity.is_debug() {
591                    let mut printer = self.printer.borrow_mut();
592                    writeln!(
593                        printer,
594                        "{}[DEBUG]{} {}{}{}",
595                        c.dim(),
596                        c.reset(),
597                        c.dim(),
598                        &line,
599                        c.reset()
600                    )?;
601                    printer.flush()?;
602                }
603
604                // Parse the event once - parse_event handles malformed JSON by returning None
605                match self.parse_event(&line) {
606                    Some(output) => {
607                        monitor.record_parsed();
608                        // Write output to printer
609                        let mut printer = self.printer.borrow_mut();
610                        write!(printer, "{output}")?;
611                        printer.flush()?;
612                    }
613                    None => {
614                        // Check if this was a control event (state management with no user output)
615                        if trimmed.starts_with('{') {
616                            if let Ok(event) = serde_json::from_str::<GeminiEvent>(&line) {
617                                if Self::is_control_event(&event) {
618                                    monitor.record_control_event();
619                                } else {
620                                    // Valid JSON but not a control event - track as unknown
621                                    monitor.record_unknown_event();
622                                }
623                            } else {
624                                // Failed to deserialize - track as parse error
625                                monitor.record_parse_error();
626                            }
627                        } else {
628                            monitor.record_ignored();
629                        }
630                    }
631                }
632
633                // Log raw JSON to buffer if configured
634                if logging_enabled {
635                    writeln!(log_buffer, "{line}")?;
636                }
637            }
638        }
639
640        // Write accumulated log content to workspace
641        if let Some(log_path) = &self.log_path {
642            workspace.append_bytes(log_path, &log_buffer)?;
643        }
644        if let Some(warning) = monitor.check_and_warn(*c) {
645            let mut printer = self.printer.borrow_mut();
646            writeln!(printer, "{warning}\n")?;
647        }
648        Ok(())
649    }
650}