ralph_workflow/json_parser/
gemini.rs

1//! Gemini CLI JSON parser.
2//!
3//! Parses NDJSON output from Gemini CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `message` events with `delta: true`), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Gemini] Hello\r         (first delta with prefix, no newline)
19//! \x1b[2K\r[Gemini] Hello World\r  (second delta clears line, rewrites with accumulated)
20//! [Gemini] Hello World\n   (final non-delta message shows complete result)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32
33use crate::common::truncate_text;
34use crate::config::Verbosity;
35use crate::logger::{Colors, CHECK, CROSS};
36use std::cell::RefCell;
37use std::fmt::Write as _;
38use std::io::{self, BufRead, Write};
39use std::rc::Rc;
40
41use super::delta_display::{DeltaRenderer, TextDeltaRenderer};
42use super::health::HealthMonitor;
43use super::health::StreamingQualityMetrics;
44use super::printer::SharedPrinter;
45use super::streaming_state::StreamingSession;
46use super::terminal::TerminalMode;
47use super::types::{format_tool_input, format_unknown_json_event, ContentType, GeminiEvent};
48
49/// Gemini event parser
50pub struct GeminiParser {
51    colors: Colors,
52    verbosity: Verbosity,
53    log_file: Option<String>,
54    display_name: String,
55    /// Unified streaming session for state tracking
56    streaming_session: Rc<RefCell<StreamingSession>>,
57    /// Terminal mode for output formatting
58    terminal_mode: RefCell<TerminalMode>,
59    /// Whether to show streaming quality metrics
60    show_streaming_metrics: bool,
61    /// Output printer for capturing or displaying output
62    printer: SharedPrinter,
63}
64
65impl GeminiParser {
66    pub(crate) fn new(colors: Colors, verbosity: Verbosity) -> Self {
67        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
68    }
69
70    /// Create a new `GeminiParser` with a custom printer.
71    ///
72    /// # Arguments
73    ///
74    /// * `colors` - Colors for terminal output
75    /// * `verbosity` - Verbosity level for output
76    /// * `printer` - Shared printer for output
77    ///
78    /// # Returns
79    ///
80    /// A new `GeminiParser` instance
81    pub(crate) fn with_printer(
82        colors: Colors,
83        verbosity: Verbosity,
84        printer: SharedPrinter,
85    ) -> Self {
86        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
87        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
88
89        // Use the printer's is_terminal method to validate it's connected correctly
90        let _printer_is_terminal = printer.borrow().is_terminal();
91
92        Self {
93            colors,
94            verbosity,
95            log_file: None,
96            display_name: "Gemini".to_string(),
97            streaming_session: Rc::new(RefCell::new(streaming_session)),
98            terminal_mode: RefCell::new(TerminalMode::detect()),
99            show_streaming_metrics: false,
100            printer,
101        }
102    }
103
104    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
105        self.show_streaming_metrics = show;
106        self
107    }
108
109    pub(crate) fn with_display_name(mut self, display_name: &str) -> Self {
110        self.display_name = display_name.to_string();
111        self
112    }
113
114    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
115        self.log_file = Some(path.to_string());
116        self
117    }
118
119    #[cfg(test)]
120    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
121        *self.terminal_mode.borrow_mut() = mode;
122        self
123    }
124
125    /// Get a shared reference to the printer.
126    ///
127    /// This allows tests, monitoring, and other code to access the printer after parsing
128    /// to verify output content, check for duplicates, or capture output for analysis.
129    ///
130    /// # Returns
131    ///
132    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
133    #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
134    pub fn printer(&self) -> SharedPrinter {
135        Rc::clone(&self.printer)
136    }
137
138    /// Get streaming quality metrics from the current session.
139    ///
140    /// This provides insight into the deduplication and streaming quality of the
141    /// parsing session.
142    ///
143    /// # Returns
144    ///
145    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
146    #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
147    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
148        self.streaming_session
149            .borrow()
150            .get_streaming_quality_metrics()
151    }
152
153    /// Parse and display a single Gemini JSON event
154    ///
155    /// Returns `Some(formatted_output)` for valid events, or None for:
156    /// - Malformed JSON (non-JSON text passed through if meaningful)
157    /// - Unknown event types
158    /// - Empty or whitespace-only output
159    pub(crate) fn parse_event(&self, line: &str) -> Option<String> {
160        let event: GeminiEvent = if let Ok(e) = serde_json::from_str(line) {
161            e
162        } else {
163            // Non-JSON line - pass through as-is if meaningful
164            let trimmed = line.trim();
165            if !trimmed.is_empty() && !trimmed.starts_with('{') {
166                return Some(format!("{trimmed}\n"));
167            }
168            return None;
169        };
170        let c = &self.colors;
171        let prefix = &self.display_name;
172
173        let output = match event {
174            GeminiEvent::Init {
175                session_id, model, ..
176            } => self.format_init_event(session_id, model),
177            GeminiEvent::Message {
178                role,
179                content,
180                delta,
181            } => self.format_message_event(role, content, delta),
182            GeminiEvent::ToolUse {
183                tool_name,
184                parameters,
185                ..
186            } => self.format_tool_use_event(tool_name, parameters.as_ref()),
187            GeminiEvent::ToolResult { status, output, .. } => {
188                self.format_tool_result_event(status, output.as_ref())
189            }
190            GeminiEvent::Error { message, code, .. } => self.format_error_event(message, code),
191            GeminiEvent::Result { status, stats, .. } => self.format_result_event(status, stats),
192            GeminiEvent::Unknown => {
193                // Use the generic unknown event formatter for consistent handling
194                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
195            }
196        };
197
198        if output.is_empty() {
199            None
200        } else {
201            Some(output)
202        }
203    }
204
205    /// Format an Init event
206    fn format_init_event(&self, session_id: Option<String>, model: Option<String>) -> String {
207        let c = &self.colors;
208        let prefix = &self.display_name;
209
210        // Reset streaming state on new session
211        self.streaming_session.borrow_mut().on_message_start();
212        let sid = session_id.unwrap_or_else(|| "unknown".to_string());
213        // Set the current message ID for duplicate detection
214        self.streaming_session
215            .borrow_mut()
216            .set_current_message_id(Some(sid.clone()));
217        let model_str = model.unwrap_or_else(|| "unknown".to_string());
218        format!(
219            "{}[{}]{} {}Session started{} {}({:.8}..., {}){}\n",
220            c.dim(),
221            prefix,
222            c.reset(),
223            c.cyan(),
224            c.reset(),
225            c.dim(),
226            sid,
227            model_str,
228            c.reset()
229        )
230    }
231
232    /// Format a Message event
233    fn format_message_event(
234        &self,
235        role: Option<String>,
236        content: Option<String>,
237        delta: Option<bool>,
238    ) -> String {
239        let c = &self.colors;
240        let prefix = &self.display_name;
241
242        let role_str = role.unwrap_or_else(|| "unknown".to_string());
243        let is_delta = delta.unwrap_or(false);
244
245        if let Some(text) = content {
246            if is_delta && role_str == "assistant" {
247                // Accumulate delta content using StreamingSession
248                let (show_prefix, accumulated_text) = {
249                    let mut session = self.streaming_session.borrow_mut();
250                    let show_prefix = session.on_text_delta_key("main", &text);
251                    // Get accumulated text for streaming display
252                    let accumulated_text = session
253                        .get_accumulated(ContentType::Text, "main")
254                        .unwrap_or("")
255                        .to_string();
256                    (show_prefix, accumulated_text)
257                };
258
259                // Use TextDeltaRenderer for consistent rendering across all parsers
260                let terminal_mode = *self.terminal_mode.borrow();
261                if show_prefix {
262                    // First delta: use renderer with prefix
263                    return TextDeltaRenderer::render_first_delta(
264                        &accumulated_text,
265                        prefix,
266                        *c,
267                        terminal_mode,
268                    );
269                }
270                // Subsequent deltas: use renderer for in-place update
271                return TextDeltaRenderer::render_subsequent_delta(
272                    &accumulated_text,
273                    prefix,
274                    *c,
275                    terminal_mode,
276                );
277            } else if !is_delta && role_str == "assistant" {
278                // Non-delta message - check for duplicate using message ID or fallback to streaming content check
279                let session = self.streaming_session.borrow();
280                let is_duplicate = session.get_current_message_id().map_or_else(
281                    || session.has_any_streamed_content(),
282                    |message_id| session.is_duplicate_final_message(message_id),
283                );
284                let was_streaming = session.has_any_streamed_content();
285                let metrics = session.get_streaming_quality_metrics();
286                drop(session);
287
288                // Finalize the message (this marks it as displayed)
289                let _was_in_block = self.streaming_session.borrow_mut().on_message_stop();
290
291                // If this is a duplicate or content was streamed, use TextDeltaRenderer for completion
292                if is_duplicate || was_streaming {
293                    let terminal_mode = *self.terminal_mode.borrow();
294                    let completion = TextDeltaRenderer::render_completion(terminal_mode);
295                    let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
296                        && metrics.total_deltas > 0;
297                    if show_metrics {
298                        return format!("{}\n{}", completion, metrics.format(*c));
299                    }
300                    return completion;
301                }
302
303                // Otherwise, show the full content (non-streaming path)
304                let limit = self.verbosity.truncate_limit("text");
305                let preview = truncate_text(&text, limit);
306
307                return format!(
308                    "{}[{}]{} {}{}{}\n",
309                    c.dim(),
310                    prefix,
311                    c.reset(),
312                    c.white(),
313                    preview,
314                    c.reset()
315                );
316            }
317            // User or other role messages
318            let limit = self.verbosity.truncate_limit("text");
319            let preview = truncate_text(&text, limit);
320            return format!(
321                "{}[{}]{} {}{}:{} {}{}{}\n",
322                c.dim(),
323                prefix,
324                c.reset(),
325                c.blue(),
326                role_str,
327                c.reset(),
328                c.dim(),
329                preview,
330                c.reset()
331            );
332        }
333        String::new()
334    }
335
336    /// Format a `ToolUse` event
337    fn format_tool_use_event(
338        &self,
339        tool_name: Option<String>,
340        parameters: Option<&serde_json::Value>,
341    ) -> String {
342        let c = &self.colors;
343        let prefix = &self.display_name;
344
345        let tool_name = tool_name.unwrap_or_else(|| "unknown".to_string());
346        let mut out = format!(
347            "{}[{}]{} {}Tool{}: {}{}{}\n",
348            c.dim(),
349            prefix,
350            c.reset(),
351            c.magenta(),
352            c.reset(),
353            c.bold(),
354            tool_name,
355            c.reset()
356        );
357        if self.verbosity.show_tool_input() {
358            if let Some(params) = parameters {
359                let params_str = format_tool_input(params);
360                let limit = self.verbosity.truncate_limit("tool_input");
361                let preview = truncate_text(&params_str, limit);
362                if !preview.is_empty() {
363                    let _ = writeln!(
364                        out,
365                        "{}[{}]{} {}  └─ {}{}",
366                        c.dim(),
367                        prefix,
368                        c.reset(),
369                        c.dim(),
370                        preview,
371                        c.reset()
372                    );
373                }
374            }
375        }
376        out
377    }
378
379    /// Format a `ToolResult` event
380    fn format_tool_result_event(&self, status: Option<String>, output: Option<&String>) -> String {
381        let c = &self.colors;
382        let prefix = &self.display_name;
383
384        let status_str = status.unwrap_or_else(|| "unknown".to_string());
385        let is_success = status_str == "success";
386        let icon = if is_success { CHECK } else { CROSS };
387        let color = if is_success { c.green() } else { c.red() };
388
389        let mut out = format!(
390            "{}[{}]{} {}{} Tool result{}\n",
391            c.dim(),
392            prefix,
393            c.reset(),
394            color,
395            icon,
396            c.reset()
397        );
398
399        if self.verbosity.is_verbose() {
400            if let Some(output_text) = output {
401                let limit = self.verbosity.truncate_limit("tool_result");
402                let preview = truncate_text(output_text, limit);
403                let _ = writeln!(
404                    out,
405                    "{}[{}]{} {}  └─ {}{}",
406                    c.dim(),
407                    prefix,
408                    c.reset(),
409                    c.dim(),
410                    preview,
411                    c.reset()
412                );
413            }
414        }
415        out
416    }
417
418    /// Format an `Error` event
419    fn format_error_event(&self, message: Option<String>, code: Option<String>) -> String {
420        let c = &self.colors;
421        let prefix = &self.display_name;
422
423        let msg = message.unwrap_or_else(|| "unknown error".to_string());
424        let code_str = code.map_or_else(String::new, |c| format!(" ({c})"));
425        format!(
426            "{}[{}]{} {}{} Error{}:{} {}\n",
427            c.dim(),
428            prefix,
429            c.reset(),
430            c.red(),
431            CROSS,
432            code_str,
433            c.reset(),
434            msg
435        )
436    }
437
438    /// Format a `Result` event
439    fn format_result_event(
440        &self,
441        status: Option<String>,
442        event_stats: Option<crate::json_parser::types::GeminiStats>,
443    ) -> String {
444        let c = &self.colors;
445        let prefix = &self.display_name;
446
447        let status_result = status.unwrap_or_else(|| "unknown".to_string());
448        let is_success = status_result == "success";
449        let icon = if is_success { CHECK } else { CROSS };
450        let color = if is_success { c.green() } else { c.red() };
451
452        let stats_display = event_stats.map_or_else(String::new, |s| {
453            let duration_s = s.duration_ms.unwrap_or(0) / 1000;
454            let duration_m = duration_s / 60;
455            let duration_s_rem = duration_s % 60;
456            let input = s.input_tokens.unwrap_or(0);
457            let output = s.output_tokens.unwrap_or(0);
458            let tools = s.tool_calls.unwrap_or(0);
459            format!("({duration_m}m {duration_s_rem}s, in:{input} out:{output}, {tools} tools)")
460        });
461
462        format!(
463            "{}[{}]{} {}{} {}{} {}{}{}\n",
464            c.dim(),
465            prefix,
466            c.reset(),
467            color,
468            icon,
469            status_result,
470            c.reset(),
471            c.dim(),
472            stats_display,
473            c.reset()
474        )
475    }
476
477    /// Check if a Gemini event is a control event (state management with no user output)
478    ///
479    /// Control events are valid JSON that represent state transitions rather than
480    /// user-facing content. They should be tracked separately from "ignored" events
481    /// to avoid false health warnings.
482    const fn is_control_event(event: &GeminiEvent) -> bool {
483        match event {
484            // Init and Result events are control events
485            GeminiEvent::Init { .. } | GeminiEvent::Result { .. } => true,
486            _ => false,
487        }
488    }
489
490    /// Parse a stream of Gemini NDJSON events
491    pub(crate) fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
492        use super::incremental_parser::IncrementalNdjsonParser;
493
494        let c = &self.colors;
495        let monitor = HealthMonitor::new("Gemini");
496        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
497            std::fs::OpenOptions::new()
498                .create(true)
499                .append(true)
500                .open(log_path)
501                .ok()
502                .map(std::io::BufWriter::new)
503        });
504
505        // Use incremental parser for true real-time streaming
506        // This processes JSON as soon as it's complete, not waiting for newlines
507        let mut incremental_parser = IncrementalNdjsonParser::new();
508        let mut byte_buffer = Vec::new();
509
510        loop {
511            // Read available bytes
512            byte_buffer.clear();
513            let chunk = reader.fill_buf()?;
514            if chunk.is_empty() {
515                break;
516            }
517
518            // Process all bytes immediately
519            byte_buffer.extend_from_slice(chunk);
520            let consumed = chunk.len();
521            reader.consume(consumed);
522
523            // Feed bytes to incremental parser
524            let json_events = incremental_parser.feed(&byte_buffer);
525
526            // Process each complete JSON event immediately
527            for line in json_events {
528                let trimmed = line.trim();
529                if trimmed.is_empty() {
530                    continue;
531                }
532
533                // In debug mode, also show the raw JSON
534                if self.verbosity.is_debug() {
535                    let mut printer = self.printer.borrow_mut();
536                    writeln!(
537                        printer,
538                        "{}[DEBUG]{} {}{}{}",
539                        c.dim(),
540                        c.reset(),
541                        c.dim(),
542                        &line,
543                        c.reset()
544                    )?;
545                    printer.flush()?;
546                }
547
548                // Parse the event once - parse_event handles malformed JSON by returning None
549                match self.parse_event(&line) {
550                    Some(output) => {
551                        monitor.record_parsed();
552                        // Write output to printer
553                        let mut printer = self.printer.borrow_mut();
554                        write!(printer, "{output}")?;
555                        printer.flush()?;
556                    }
557                    None => {
558                        // Check if this was a control event (state management with no user output)
559                        if trimmed.starts_with('{') {
560                            if let Ok(event) = serde_json::from_str::<GeminiEvent>(&line) {
561                                if Self::is_control_event(&event) {
562                                    monitor.record_control_event();
563                                } else {
564                                    // Valid JSON but not a control event - track as unknown
565                                    monitor.record_unknown_event();
566                                }
567                            } else {
568                                // Failed to deserialize - track as parse error
569                                monitor.record_parse_error();
570                            }
571                        } else {
572                            monitor.record_ignored();
573                        }
574                    }
575                }
576
577                // Log raw JSON to file if configured
578                if let Some(ref mut file) = log_writer {
579                    writeln!(file, "{line}")?;
580                }
581            }
582        }
583
584        if let Some(ref mut file) = log_writer {
585            file.flush()?;
586        }
587        if let Some(warning) = monitor.check_and_warn(*c) {
588            let mut printer = self.printer.borrow_mut();
589            writeln!(printer, "{warning}\n")?;
590        }
591        Ok(())
592    }
593}