Skip to main content

ralph_workflow/json_parser/claude/
parser.rs

1// Claude parser implementation.
2//
3// Contains the ClaudeParser struct and its core methods.
4
5/// Claude event parser
6///
7/// Note: This parser is designed for single-threaded use only.
8/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
9/// Do not share this parser across threads.
10pub struct ClaudeParser {
11    colors: Colors,
12    pub(crate) verbosity: Verbosity,
13    /// Relative path to log file (if logging enabled)
14    log_path: Option<std::path::PathBuf>,
15    display_name: String,
16    /// Unified streaming session tracker
17    /// Provides single source of truth for streaming state across all content types
18    streaming_session: Rc<RefCell<StreamingSession>>,
19    /// Terminal mode for output formatting
20    /// Detected at parse time and cached for performance
21    terminal_mode: RefCell<TerminalMode>,
22    /// Whether to show streaming quality metrics
23    show_streaming_metrics: bool,
24    /// Output printer for capturing or displaying output
25    printer: SharedPrinter,
26
27    /// Tracks whether a thinking delta line is currently being streamed.
28    ///
29    /// - In `TerminalMode::Full`, thinking deltas use the append-only streaming pattern (no cursor
30    ///   movement during deltas) and must be finalized (emit the completion newline) before emitting
31    ///   other newline-based output.
32    /// - In `TerminalMode::Basic|None`, we suppress per-delta thinking output and flush accumulated
33    ///   thinking content once at the next output boundary (or at `message_stop`).
34    thinking_active_index: RefCell<Option<u64>>,
35
36    /// Tracks which thinking content block indices have streamed thinking content that is eligible
37    /// for non-TTY flushing.
38    ///
39    /// Some providers can emit multiple thinking blocks (multiple indices) within a single message.
40    /// In non-TTY modes we suppress per-delta output, so we must remember all indices that
41    /// accumulated thinking to flush them at `message_stop`.
42    thinking_non_tty_indices: RefCell<std::collections::BTreeSet<u64>>,
43
44    /// Once non-thinking output has started for the current message, suppress any
45    /// subsequent thinking deltas to avoid corrupting visible output.
46    ///
47    /// Claude/CCS can occasionally emit thinking deltas after text deltas. Because
48    /// both streams append on the current line in Full mode, allowing late thinking can
49    /// glue onto or visually corrupt previously-rendered text.
50    suppress_thinking_for_message: RefCell<bool>,
51
52    /// Tracks whether a text delta line is currently being streamed (Full mode).
53    ///
54    /// In the append-only streaming pattern, deltas do not move the cursor; they simply
55    /// append new suffixes on the current line. When true, any newline-based non-stream
56    /// output should ensure the streamed line is finalized (emit the completion newline)
57    /// before printing unrelated lines, to avoid "glued" output.
58    text_line_active: RefCell<bool>,
59
60    /// Defensive cursor state for legacy/inconsistent streams.
61    ///
62    /// The append-only streaming implementation should not emit cursor-up sequences,
63    /// but real-world logs can include raw passthrough output with escape codes.
64    /// When this flag is true, newline-based output should first emit a completion newline
65    /// to avoid overwriting/gluing onto visible content.
66    cursor_up_active: RefCell<bool>,
67
68    /// Tracks the last rendered content for append-only streaming in Full mode.
69    ///
70    /// In append-only mode, we emit the prefix once, then only emit new suffixes for subsequent deltas.
71    /// This map stores the last rendered content for each (`ContentType`, index) pair.
72    /// Key format: "{`content_type}:{index`}" (e.g., "text:0", "thinking:1")
73    last_rendered_content: RefCell<std::collections::HashMap<String, String>>,
74}
75
76impl ClaudeParser {
77    /// Create a new `ClaudeParser` with the given colors and verbosity.
78    ///
79    /// # Arguments
80    ///
81    /// * `colors` - Colors for terminal output
82    /// * `verbosity` - Verbosity level for output
83    ///
84    /// # Returns
85    ///
86    /// A new `ClaudeParser` instance
87    ///
88    /// # Example
89    ///
90    /// ```ignore
91    /// use ralph_workflow::json_parser::ClaudeParser;
92    /// use ralph_workflow::logger::Colors;
93    /// use ralph_workflow::config::Verbosity;
94    ///
95    /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
96    /// ```
97    #[must_use] 
98    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
99        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
100    }
101
102    /// Create a new `ClaudeParser` with a custom printer.
103    ///
104    /// # Arguments
105    ///
106    /// * `colors` - Colors for terminal output
107    /// * `verbosity` - Verbosity level for output
108    /// * `printer` - Shared printer for output
109    ///
110    /// # Returns
111    ///
112    /// A new `ClaudeParser` instance
113    pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
114        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
115        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
116
117        // Use the printer's is_terminal method to validate it's connected correctly
118        // This is a sanity check that also satisfies the compiler that the method is used
119        let _printer_is_terminal = printer.borrow().is_terminal();
120
121        Self {
122            colors,
123            verbosity,
124            log_path: None,
125            display_name: "Claude".to_string(),
126            streaming_session: Rc::new(RefCell::new(streaming_session)),
127            terminal_mode: RefCell::new(TerminalMode::detect()),
128            show_streaming_metrics: false,
129            printer,
130            thinking_active_index: RefCell::new(None),
131            thinking_non_tty_indices: RefCell::new(std::collections::BTreeSet::new()),
132            suppress_thinking_for_message: RefCell::new(false),
133            text_line_active: RefCell::new(false),
134            cursor_up_active: RefCell::new(false),
135            last_rendered_content: RefCell::new(std::collections::HashMap::new()),
136        }
137    }
138
139    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
140        self.show_streaming_metrics = show;
141        self
142    }
143
144    /// Set the display name for this parser.
145    ///
146    /// # Arguments
147    ///
148    /// * `display_name` - The name to display in output
149    ///
150    /// # Returns
151    ///
152    /// Self for builder pattern chaining
153    #[must_use]
154    pub fn with_display_name(mut self, display_name: &str) -> Self {
155        self.display_name = display_name.to_string();
156        self
157    }
158
159    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
160        self.log_path = Some(std::path::PathBuf::from(path));
161        self
162    }
163
164    /// Set the terminal mode for this parser.
165    ///
166    /// # Arguments
167    ///
168    /// * `mode` - The terminal mode to use
169    ///
170    /// # Returns
171    ///
172    /// Self for builder pattern chaining
173    #[cfg(any(test, feature = "test-utils"))]
174    #[must_use]
175    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
176        *self.terminal_mode.borrow_mut() = mode;
177        self
178    }
179
180    /// Get a shared reference to the printer.
181    ///
182    /// This allows tests, monitoring, and other code to access the printer after parsing
183    /// to verify output content, check for duplicates, or capture output for analysis.
184    ///
185    /// # Returns
186    ///
187    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
188    ///
189    /// # Example
190    ///
191    /// ```ignore
192    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
193    /// use std::rc::Rc;
194    /// use std::cell::RefCell;
195    ///
196    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
197    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
198    ///
199    /// // Parse events...
200    ///
201    /// // Now access the printer to verify output
202    /// let printer_ref = parser.printer().borrow();
203    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
204    /// ```
205    /// Get a clone of the printer used by this parser.
206    ///
207    /// This is primarily useful for integration tests and monitoring in this repository.
208    /// Only available with the `test-utils` feature.
209    ///
210    /// Note: downstream crates should avoid relying on this API in production builds.
211    #[cfg(any(test, feature = "test-utils"))]
212    pub fn printer(&self) -> SharedPrinter {
213        Rc::clone(&self.printer)
214    }
215
216    /// Get streaming quality metrics from the current session.
217    ///
218    /// This provides insight into the deduplication and streaming quality of the
219    /// parsing session, including:
220    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
221    /// - Number of large deltas (potential protocol violations)
222    /// - Total deltas processed
223    ///
224    /// Useful for testing, monitoring, and debugging streaming behavior.
225    /// Only available with the `test-utils` feature.
226    ///
227    /// # Returns
228    ///
229    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
230    ///
231    /// # Example
232    ///
233    /// ```ignore
234    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
235    /// use std::rc::Rc;
236    /// use std::cell::RefCell;
237    ///
238    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
239    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
240    ///
241    /// // Parse events...
242    ///
243    /// // Verify deduplication logic triggered
244    /// let metrics = parser.streaming_metrics();
245    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
246    /// ```
247    #[cfg(any(test, feature = "test-utils"))]
248    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
249        self.streaming_session
250            .borrow()
251            .get_streaming_quality_metrics()
252    }
253
254    /// Parse and display a single Claude JSON event
255    ///
256    /// Returns `Some(formatted_output)` for valid events, or None for:
257    /// - Malformed JSON (logged at debug level)
258    /// - Unknown event types
259    /// - Empty or whitespace-only output
260    pub fn parse_event(&self, line: &str) -> Option<String> {
261        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
262            e
263        } else {
264            // Non-JSON line - could be raw text output from agent
265            // Pass through as-is if it looks like real output (not empty)
266            let trimmed = line.trim();
267            if !trimmed.is_empty() && !trimmed.starts_with('{') {
268                // In full TTY mode, thinking deltas keep the cursor on the thinking line for
269                // in-place updates. Any other output must first finalize that cursor state.
270                let session = self.streaming_session.borrow_mut();
271                let finalize = self.finalize_in_place_full_mode(&session);
272                let out = format!("{finalize}{trimmed}\n");
273                if *self.terminal_mode.borrow() == TerminalMode::Full {
274                    // Only mutate cursor state based on explicit cursor controls.
275                    // Normal output may include newlines, but does not reliably indicate whether
276                    // we are still in the in-place streaming cursor-up position.
277                    let mut cursor_up_active = self.cursor_up_active.borrow_mut();
278                    if out.contains("\x1b[1B\n") {
279                        *cursor_up_active = false;
280                    }
281                    if out.contains("\x1b[1A") {
282                        *cursor_up_active = true;
283                    }
284                }
285                return Some(out);
286            }
287            return None;
288        };
289
290        // When a thinking/text line is being streamed in full TTY mode, the streaming
291        // implementation is append-only and keeps the cursor on the current line.
292        //
293        // Any non-stream event (system/user/assistant/result) must first finalize the
294        // active streaming line so the next output does not glue onto it.
295        let finalize = if matches!(&event, ClaudeEvent::StreamEvent { .. }) {
296            String::new()
297        } else {
298            let session = self.streaming_session.borrow_mut();
299            self.finalize_in_place_full_mode(&session)
300        };
301        let c = &self.colors;
302        let prefix = &self.display_name;
303
304        let output = match event {
305            ClaudeEvent::System {
306                subtype,
307                session_id,
308                cwd,
309            } => self.format_system_event(subtype.as_ref(), session_id, cwd),
310            ClaudeEvent::Assistant { message } => self.format_assistant_event(message.as_ref()),
311            ClaudeEvent::User { message } => self.format_user_event(message),
312            ClaudeEvent::Result {
313                subtype,
314                duration_ms,
315                total_cost_usd,
316                num_turns,
317                result,
318                error,
319            } => self.format_result_event(
320                subtype,
321                duration_ms,
322                total_cost_usd,
323                num_turns,
324                result,
325                error,
326            ),
327            ClaudeEvent::StreamEvent { event } => {
328                // Handle streaming events for delta/partial updates
329                self.parse_stream_event(event)
330            }
331            ClaudeEvent::Unknown => {
332                // Use the generic unknown event formatter for consistent handling
333                // In verbose mode, this will show the event type and key fields
334                // In normal mode, this returns empty string
335                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
336            }
337        };
338
339        // IMPORTANT: We must preserve any completion output from `finalize_in_place_full_mode`
340        // even if the current event itself produces no visible output.
341        //
342        // Example: the final "assistant" event can be deduplicated (empty output) after
343        // streaming deltas have already been shown. If we drop `finalize` in that case,
344        // the streamed line never receives its completion newline and subsequent output
345        // (e.g., system `status`) can clear/overwrite it.
346        let output = if output.is_empty() {
347            finalize
348        } else {
349            format!("{finalize}{output}")
350        };
351
352        if output.is_empty() {
353            None
354        } else {
355            if *self.terminal_mode.borrow() == TerminalMode::Full {
356                // Keep a simple, output-driven model of cursor state.
357                //
358                // The streaming implementation is append-only and SHOULD NOT emit cursor-up
359                // sequences ("\x1b[1A") for deltas. We only treat explicit cursor completion
360                // sequences ("\x1b[1B\n") as authoritative for clearing the defensive flag.
361                //
362                // Note: raw passthrough output may include escape sequences; we avoid inferring
363                // "cursor is up" from output content to keep this logic robust.
364                let mut cursor_up_active = self.cursor_up_active.borrow_mut();
365                if output.contains("\x1b[1B\n") {
366                    *cursor_up_active = false;
367                }
368            }
369            Some(output)
370        }
371    }
372
373    /// Parse a streaming event for delta/partial updates
374    ///
375    /// Handles the nested events within `stream_event`:
376    /// - MessageStart/Stop: Manage session state
377    /// - `ContentBlockStart`: Initialize new content blocks
378    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
379    /// - `ContentBlockStop`: Finalize content blocks
380    /// - `MessageDelta`: Process message metadata without output
381    /// - Error: Display appropriately
382    ///
383    /// Returns String for display content, empty String for control events.
384    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
385        let mut session = self.streaming_session.borrow_mut();
386
387        match event {
388            StreamInnerEvent::MessageStart {
389                message,
390                message_id,
391            } => {
392                // Protocol violations happen in real streams: a new MessageStart can arrive
393                // while a previous streamed line is still "active" (we haven't yet emitted the
394                // completion newline). Finalize any active streaming line before resetting state
395                // so subsequent output doesn't glue onto the in-progress line.
396                let in_place_finalize = self.finalize_in_place_full_mode(&session);
397
398                // Reset any pending thinking line from a previous message.
399                *self.thinking_active_index.borrow_mut() = None;
400                self.thinking_non_tty_indices.borrow_mut().clear();
401                *self.suppress_thinking_for_message.borrow_mut() = false;
402                *self.text_line_active.borrow_mut() = false;
403                *self.cursor_up_active.borrow_mut() = false;
404
405                // Extract message_id from either the top-level field or nested message.id
406                // The Claude API typically puts the ID in message.id, not at the top level
407                let effective_message_id =
408                    message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
409                // Set message ID for tracking and clear session state on new message
410                session.set_current_message_id(effective_message_id);
411                session.on_message_start();
412                // Clear last rendered content for append-only pattern on new message
413                self.last_rendered_content.borrow_mut().clear();
414                in_place_finalize
415            }
416            StreamInnerEvent::ContentBlockStart {
417                index: Some(index),
418                content_block: Some(block),
419            } => {
420                // Initialize a new content block at this index
421                session.on_content_block_start(index);
422                match &block {
423                    ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
424                        // Initial text in ContentBlockStart - treat as first delta
425                        session.on_text_delta(index, t);
426                    }
427                    ContentBlock::ToolUse { name, input } => {
428                        // Track tool name for GLM/CCS deduplication.
429                        // IMPORTANT: Track the tool name when provided, even when input is None.
430                        // GLM may send ContentBlockStart with name but no input, then send input via delta.
431                        // We only store when we have a name to avoid overwriting a previous tool name with None.
432                        if let Some(n) = name {
433                            session.set_tool_name(index, Some(n.clone()));
434                        }
435
436                        // Initialize tool input accumulator only if input is present
437                        if let Some(i) = input {
438                            let input_str = if let serde_json::Value::String(s) = &i {
439                                s.clone()
440                            } else {
441                                format_tool_input(i)
442                            };
443                            session.on_tool_input_delta(index, &input_str);
444                        }
445                    }
446                    _ => {}
447                }
448                String::new()
449            }
450            StreamInnerEvent::ContentBlockStart {
451                index: Some(index),
452                content_block: None,
453            } => {
454                // Content block started but no initial content provided
455                session.on_content_block_start(index);
456                String::new()
457            }
458            StreamInnerEvent::ContentBlockStart { .. } => {
459                // Content block without index - ignore
460                String::new()
461            }
462            StreamInnerEvent::ContentBlockDelta {
463                index: Some(index),
464                delta: Some(delta),
465            } => self.handle_content_block_delta(&mut session, index, delta),
466            StreamInnerEvent::TextDelta { text: Some(text) } => {
467                self.handle_text_delta(&mut session, &text)
468            }
469            StreamInnerEvent::ContentBlockStop { .. } => {
470                // Content block completion event - no output needed
471                // This event marks the end of a content block but doesn't produce
472                // any displayable content. It's a control event for state management.
473                String::new()
474            }
475            StreamInnerEvent::MessageDelta { .. } => {
476                // Message delta event with usage/metadata - no output needed
477                // This event contains final message metadata (stop_reason, usage stats)
478                // but is used for tracking/monitoring purposes only, not display.
479                String::new()
480            }
481            StreamInnerEvent::ContentBlockDelta { .. }
482            | StreamInnerEvent::Ping
483            | StreamInnerEvent::TextDelta { text: None }
484            | StreamInnerEvent::Error { error: None } => String::new(),
485            StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
486            StreamInnerEvent::Error {
487                error: Some(err), ..
488            } => self.handle_error_event(err),
489            StreamInnerEvent::Unknown => self.handle_unknown_event(),
490        }
491    }
492}