Skip to main content

ralph_workflow/json_parser/claude/
parser.rs

1// Claude parser implementation.
2//
3// Contains the ClaudeParser struct and its core methods.
4
5/// Claude event parser
6///
7/// Note: This parser is designed for single-threaded use only.
8/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
9/// Do not share this parser across threads.
10pub struct ClaudeParser {
11    colors: Colors,
12    pub(crate) verbosity: Verbosity,
13    /// Relative path to log file (if logging enabled)
14    log_path: Option<std::path::PathBuf>,
15    display_name: String,
16    /// Unified streaming session tracker
17    /// Provides single source of truth for streaming state across all content types
18    streaming_session: Rc<RefCell<StreamingSession>>,
19    /// Terminal mode for output formatting
20    /// Detected at parse time and cached for performance
21    terminal_mode: RefCell<TerminalMode>,
22    /// Whether to show streaming quality metrics
23    show_streaming_metrics: bool,
24    /// Output printer for capturing or displaying output
25    printer: SharedPrinter,
26
27    /// Tracks whether a thinking delta line is currently being streamed.
28    ///
29    /// - In `TerminalMode::Full`, thinking deltas use the append-only streaming pattern (no cursor
30    ///   movement during deltas) and must be finalized (emit the completion newline) before emitting
31    ///   other newline-based output.
32    /// - In `TerminalMode::Basic|None`, we suppress per-delta thinking output and flush accumulated
33    ///   thinking content once at the next output boundary (or at `message_stop`).
34    thinking_active_index: RefCell<Option<u64>>,
35
36    /// Tracks which thinking content block indices have streamed thinking content that is eligible
37    /// for non-TTY flushing.
38    ///
39    /// Some providers can emit multiple thinking blocks (multiple indices) within a single message.
40    /// In non-TTY modes we suppress per-delta output, so we must remember all indices that
41    /// accumulated thinking to flush them at `message_stop`.
42    thinking_non_tty_indices: RefCell<std::collections::BTreeSet<u64>>,
43
44    /// Once non-thinking output has started for the current message, suppress any
45    /// subsequent thinking deltas to avoid corrupting visible output.
46    ///
47    /// Claude/CCS can occasionally emit thinking deltas after text deltas. Because
48    /// both streams append on the current line in Full mode, allowing late thinking can
49    /// glue onto or visually corrupt previously-rendered text.
50    suppress_thinking_for_message: RefCell<bool>,
51
52    /// Tracks whether a text delta line is currently being streamed (Full mode).
53    ///
54    /// In the append-only streaming pattern, deltas do not move the cursor; they simply
55    /// append new suffixes on the current line. When true, any newline-based non-stream
56    /// output should ensure the streamed line is finalized (emit the completion newline)
57    /// before printing unrelated lines, to avoid "glued" output.
58    text_line_active: RefCell<bool>,
59
60    /// Defensive cursor state for legacy/inconsistent streams.
61    ///
62    /// The append-only streaming implementation should not emit cursor-up sequences,
63    /// but real-world logs can include raw passthrough output with escape codes.
64    /// When this flag is true, newline-based output should first emit a completion newline
65    /// to avoid overwriting/gluing onto visible content.
66    cursor_up_active: RefCell<bool>,
67
68    /// Tracks the last rendered content for append-only streaming in Full mode.
69    ///
70    /// In append-only mode, we emit the prefix once, then only emit new suffixes for subsequent deltas.
71    /// This map stores the last rendered content for each (ContentType, index) pair.
72    /// Key format: "{content_type}:{index}" (e.g., "text:0", "thinking:1")
73    last_rendered_content: RefCell<std::collections::HashMap<String, String>>,
74}
75
76impl ClaudeParser {
77    /// Create a new `ClaudeParser` with the given colors and verbosity.
78    ///
79    /// # Arguments
80    ///
81    /// * `colors` - Colors for terminal output
82    /// * `verbosity` - Verbosity level for output
83    ///
84    /// # Returns
85    ///
86    /// A new `ClaudeParser` instance
87    ///
88    /// # Example
89    ///
90    /// ```ignore
91    /// use ralph_workflow::json_parser::ClaudeParser;
92    /// use ralph_workflow::logger::Colors;
93    /// use ralph_workflow::config::Verbosity;
94    ///
95    /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
96    /// ```
97    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
98        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
99    }
100
101    /// Create a new `ClaudeParser` with a custom printer.
102    ///
103    /// # Arguments
104    ///
105    /// * `colors` - Colors for terminal output
106    /// * `verbosity` - Verbosity level for output
107    /// * `printer` - Shared printer for output
108    ///
109    /// # Returns
110    ///
111    /// A new `ClaudeParser` instance
112    pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
113        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
114        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
115
116        // Use the printer's is_terminal method to validate it's connected correctly
117        // This is a sanity check that also satisfies the compiler that the method is used
118        let _printer_is_terminal = printer.borrow().is_terminal();
119
120        Self {
121            colors,
122            verbosity,
123            log_path: None,
124            display_name: "Claude".to_string(),
125            streaming_session: Rc::new(RefCell::new(streaming_session)),
126            terminal_mode: RefCell::new(TerminalMode::detect()),
127            show_streaming_metrics: false,
128            printer,
129            thinking_active_index: RefCell::new(None),
130            thinking_non_tty_indices: RefCell::new(std::collections::BTreeSet::new()),
131            suppress_thinking_for_message: RefCell::new(false),
132            text_line_active: RefCell::new(false),
133            cursor_up_active: RefCell::new(false),
134            last_rendered_content: RefCell::new(std::collections::HashMap::new()),
135        }
136    }
137
138    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
139        self.show_streaming_metrics = show;
140        self
141    }
142
143    /// Set the display name for this parser.
144    ///
145    /// # Arguments
146    ///
147    /// * `display_name` - The name to display in output
148    ///
149    /// # Returns
150    ///
151    /// Self for builder pattern chaining
152    pub fn with_display_name(mut self, display_name: &str) -> Self {
153        self.display_name = display_name.to_string();
154        self
155    }
156
157    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
158        self.log_path = Some(std::path::PathBuf::from(path));
159        self
160    }
161
162    /// Set the terminal mode for this parser.
163    ///
164    /// # Arguments
165    ///
166    /// * `mode` - The terminal mode to use
167    ///
168    /// # Returns
169    ///
170    /// Self for builder pattern chaining
171    #[cfg(any(test, feature = "test-utils"))]
172    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
173        *self.terminal_mode.borrow_mut() = mode;
174        self
175    }
176
177    /// Get a shared reference to the printer.
178    ///
179    /// This allows tests, monitoring, and other code to access the printer after parsing
180    /// to verify output content, check for duplicates, or capture output for analysis.
181    ///
182    /// # Returns
183    ///
184    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
185    ///
186    /// # Example
187    ///
188    /// ```ignore
189    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
190    /// use std::rc::Rc;
191    /// use std::cell::RefCell;
192    ///
193    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
194    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
195    ///
196    /// // Parse events...
197    ///
198    /// // Now access the printer to verify output
199    /// let printer_ref = parser.printer().borrow();
200    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
201    /// ```
202    /// Get a clone of the printer used by this parser.
203    ///
204    /// This is primarily useful for integration tests and monitoring in this repository.
205    /// Only available with the `test-utils` feature.
206    ///
207    /// Note: downstream crates should avoid relying on this API in production builds.
208    #[cfg(any(test, feature = "test-utils"))]
209    pub fn printer(&self) -> SharedPrinter {
210        Rc::clone(&self.printer)
211    }
212
213    /// Get streaming quality metrics from the current session.
214    ///
215    /// This provides insight into the deduplication and streaming quality of the
216    /// parsing session, including:
217    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
218    /// - Number of large deltas (potential protocol violations)
219    /// - Total deltas processed
220    ///
221    /// Useful for testing, monitoring, and debugging streaming behavior.
222    /// Only available with the `test-utils` feature.
223    ///
224    /// # Returns
225    ///
226    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
227    ///
228    /// # Example
229    ///
230    /// ```ignore
231    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
232    /// use std::rc::Rc;
233    /// use std::cell::RefCell;
234    ///
235    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
236    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
237    ///
238    /// // Parse events...
239    ///
240    /// // Verify deduplication logic triggered
241    /// let metrics = parser.streaming_metrics();
242    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
243    /// ```
244    #[cfg(any(test, feature = "test-utils"))]
245    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
246        self.streaming_session
247            .borrow()
248            .get_streaming_quality_metrics()
249    }
250
251    /// Parse and display a single Claude JSON event
252    ///
253    /// Returns `Some(formatted_output)` for valid events, or None for:
254    /// - Malformed JSON (logged at debug level)
255    /// - Unknown event types
256    /// - Empty or whitespace-only output
257    pub fn parse_event(&self, line: &str) -> Option<String> {
258        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
259            e
260        } else {
261            // Non-JSON line - could be raw text output from agent
262            // Pass through as-is if it looks like real output (not empty)
263            let trimmed = line.trim();
264            if !trimmed.is_empty() && !trimmed.starts_with('{') {
265                // In full TTY mode, thinking deltas keep the cursor on the thinking line for
266                // in-place updates. Any other output must first finalize that cursor state.
267                let mut session = self.streaming_session.borrow_mut();
268                let finalize = self.finalize_in_place_full_mode(&mut session);
269                let out = format!("{finalize}{trimmed}\n");
270                if *self.terminal_mode.borrow() == TerminalMode::Full {
271                    // Only mutate cursor state based on explicit cursor controls.
272                    // Normal output may include newlines, but does not reliably indicate whether
273                    // we are still in the in-place streaming cursor-up position.
274                    let mut cursor_up_active = self.cursor_up_active.borrow_mut();
275                    if out.contains("\x1b[1B\n") {
276                        *cursor_up_active = false;
277                    }
278                    if out.contains("\x1b[1A") {
279                        *cursor_up_active = true;
280                    }
281                }
282                return Some(out);
283            }
284            return None;
285        };
286
287        // When a thinking/text line is being streamed in full TTY mode, the streaming
288        // implementation is append-only and keeps the cursor on the current line.
289        //
290        // Any non-stream event (system/user/assistant/result) must first finalize the
291        // active streaming line so the next output does not glue onto it.
292        let finalize = if matches!(&event, ClaudeEvent::StreamEvent { .. }) {
293            String::new()
294        } else {
295            let mut session = self.streaming_session.borrow_mut();
296            self.finalize_in_place_full_mode(&mut session)
297        };
298        let c = &self.colors;
299        let prefix = &self.display_name;
300
301        let output = match event {
302            ClaudeEvent::System {
303                subtype,
304                session_id,
305                cwd,
306            } => self.format_system_event(subtype.as_ref(), session_id, cwd),
307            ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
308            ClaudeEvent::User { message } => self.format_user_event(message),
309            ClaudeEvent::Result {
310                subtype,
311                duration_ms,
312                total_cost_usd,
313                num_turns,
314                result,
315                error,
316            } => self.format_result_event(
317                subtype,
318                duration_ms,
319                total_cost_usd,
320                num_turns,
321                result,
322                error,
323            ),
324            ClaudeEvent::StreamEvent { event } => {
325                // Handle streaming events for delta/partial updates
326                self.parse_stream_event(event)
327            }
328            ClaudeEvent::Unknown => {
329                // Use the generic unknown event formatter for consistent handling
330                // In verbose mode, this will show the event type and key fields
331                // In normal mode, this returns empty string
332                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
333            }
334        };
335
336        // IMPORTANT: We must preserve any completion output from `finalize_in_place_full_mode`
337        // even if the current event itself produces no visible output.
338        //
339        // Example: the final "assistant" event can be deduplicated (empty output) after
340        // streaming deltas have already been shown. If we drop `finalize` in that case,
341        // the streamed line never receives its completion newline and subsequent output
342        // (e.g., system `status`) can clear/overwrite it.
343        let output = if output.is_empty() {
344            finalize
345        } else {
346            format!("{finalize}{output}")
347        };
348
349        if output.is_empty() {
350            None
351        } else {
352            if *self.terminal_mode.borrow() == TerminalMode::Full {
353                // Keep a simple, output-driven model of cursor state.
354                //
355                // The streaming implementation is append-only and SHOULD NOT emit cursor-up
356                // sequences ("\x1b[1A") for deltas. We only treat explicit cursor completion
357                // sequences ("\x1b[1B\n") as authoritative for clearing the defensive flag.
358                //
359                // Note: raw passthrough output may include escape sequences; we avoid inferring
360                // "cursor is up" from output content to keep this logic robust.
361                let mut cursor_up_active = self.cursor_up_active.borrow_mut();
362                if output.contains("\x1b[1B\n") {
363                    *cursor_up_active = false;
364                }
365            }
366            Some(output)
367        }
368    }
369
370    /// Parse a streaming event for delta/partial updates
371    ///
372    /// Handles the nested events within `stream_event`:
373    /// - MessageStart/Stop: Manage session state
374    /// - `ContentBlockStart`: Initialize new content blocks
375    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
376    /// - `ContentBlockStop`: Finalize content blocks
377    /// - `MessageDelta`: Process message metadata without output
378    /// - Error: Display appropriately
379    ///
380    /// Returns String for display content, empty String for control events.
381    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
382        let mut session = self.streaming_session.borrow_mut();
383
384        match event {
385            StreamInnerEvent::MessageStart {
386                message,
387                message_id,
388            } => {
389                // Protocol violations happen in real streams: a new MessageStart can arrive
390                // while a previous streamed line is still "active" (we haven't yet emitted the
391                // completion newline). Finalize any active streaming line before resetting state
392                // so subsequent output doesn't glue onto the in-progress line.
393                let in_place_finalize = self.finalize_in_place_full_mode(&mut session);
394
395                // Reset any pending thinking line from a previous message.
396                *self.thinking_active_index.borrow_mut() = None;
397                self.thinking_non_tty_indices.borrow_mut().clear();
398                *self.suppress_thinking_for_message.borrow_mut() = false;
399                *self.text_line_active.borrow_mut() = false;
400                *self.cursor_up_active.borrow_mut() = false;
401
402                // Extract message_id from either the top-level field or nested message.id
403                // The Claude API typically puts the ID in message.id, not at the top level
404                let effective_message_id =
405                    message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
406                // Set message ID for tracking and clear session state on new message
407                session.set_current_message_id(effective_message_id);
408                session.on_message_start();
409                // Clear last rendered content for append-only pattern on new message
410                self.last_rendered_content.borrow_mut().clear();
411                in_place_finalize
412            }
413            StreamInnerEvent::ContentBlockStart {
414                index: Some(index),
415                content_block: Some(block),
416            } => {
417                // Initialize a new content block at this index
418                session.on_content_block_start(index);
419                match &block {
420                    ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
421                        // Initial text in ContentBlockStart - treat as first delta
422                        session.on_text_delta(index, t);
423                    }
424                    ContentBlock::ToolUse { name, input } => {
425                        // Track tool name for GLM/CCS deduplication.
426                        // IMPORTANT: Track the tool name when provided, even when input is None.
427                        // GLM may send ContentBlockStart with name but no input, then send input via delta.
428                        // We only store when we have a name to avoid overwriting a previous tool name with None.
429                        if let Some(n) = name {
430                            session.set_tool_name(index, Some(n.clone()));
431                        }
432
433                        // Initialize tool input accumulator only if input is present
434                        if let Some(i) = input {
435                            let input_str = if let serde_json::Value::String(s) = &i {
436                                s.clone()
437                            } else {
438                                format_tool_input(i)
439                            };
440                            session.on_tool_input_delta(index, &input_str);
441                        }
442                    }
443                    _ => {}
444                }
445                String::new()
446            }
447            StreamInnerEvent::ContentBlockStart {
448                index: Some(index),
449                content_block: None,
450            } => {
451                // Content block started but no initial content provided
452                session.on_content_block_start(index);
453                String::new()
454            }
455            StreamInnerEvent::ContentBlockStart { .. } => {
456                // Content block without index - ignore
457                String::new()
458            }
459            StreamInnerEvent::ContentBlockDelta {
460                index: Some(index),
461                delta: Some(delta),
462            } => self.handle_content_block_delta(&mut session, index, delta),
463            StreamInnerEvent::TextDelta { text: Some(text) } => {
464                self.handle_text_delta(&mut session, &text)
465            }
466            StreamInnerEvent::ContentBlockStop { .. } => {
467                // Content block completion event - no output needed
468                // This event marks the end of a content block but doesn't produce
469                // any displayable content. It's a control event for state management.
470                String::new()
471            }
472            StreamInnerEvent::MessageDelta { .. } => {
473                // Message delta event with usage/metadata - no output needed
474                // This event contains final message metadata (stop_reason, usage stats)
475                // but is used for tracking/monitoring purposes only, not display.
476                String::new()
477            }
478            StreamInnerEvent::ContentBlockDelta { .. }
479            | StreamInnerEvent::Ping
480            | StreamInnerEvent::TextDelta { text: None }
481            | StreamInnerEvent::Error { error: None } => String::new(),
482            StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
483            StreamInnerEvent::Error {
484                error: Some(err), ..
485            } => self.handle_error_event(err),
486            StreamInnerEvent::Unknown => self.handle_unknown_event(),
487        }
488    }
489}