Skip to main content

ralph_workflow/json_parser/claude/
parser.rs

1// Claude parser implementation.
2//
3// Contains the ClaudeParser struct and its core methods.
4
5use std::cell::RefCell;
6use std::io::BufRead;
7use std::rc::Rc;
8
9use crate::json_parser::claude::io::ParserState;
10#[cfg(any(test, feature = "test-utils"))]
11use crate::json_parser::health::StreamingQualityMetrics;
12use crate::json_parser::incremental_parser::IncrementalNdjsonParser;
13use crate::json_parser::printer::Printable;
14use crate::json_parser::printer::StdoutPrinter;
15use crate::json_parser::types::{ContentBlock, ContentBlockDelta, ToolActivityTracker};
16
17/// Claude event parser
18///
19/// Note: This parser is designed for single-threaded use only.
20/// Do not share this parser across threads.
21pub struct ClaudeParser {
22    colors: Colors,
23    pub(crate) verbosity: Verbosity,
24    log_path: Option<std::path::PathBuf>,
25    display_name: String,
26    state: ParserState,
27    show_streaming_metrics: bool,
28    printer: Rc<RefCell<dyn Printable>>,
29    /// Tracks active tool `ContentBlock`s in progress for idle-timeout suppression.
30    /// Incremented when a `ToolUse` block starts, saturating-decremented on `MessageStart`
31    /// (tool result received) or when the stream ends. Correctly tracks concurrent parallel
32    /// tool calls (multiple `ToolUse` blocks per message).
33    tool_activity_tracker: ToolActivityTracker,
34}
35
36impl ClaudeParser {
37    #[must_use]
38    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
39        Self::with_printer(
40            colors,
41            verbosity,
42            Rc::new(RefCell::new(StdoutPrinter::new())),
43        )
44    }
45
46    pub fn with_printer(
47        colors: Colors,
48        verbosity: Verbosity,
49        printer: Rc<RefCell<dyn Printable>>,
50    ) -> Self {
51        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
52
53        Self {
54            colors,
55            verbosity,
56            log_path: None,
57            display_name: "Claude".to_string(),
58            state: ParserState::new(verbose_warnings),
59            show_streaming_metrics: false,
60            printer,
61            tool_activity_tracker: ToolActivityTracker::new(),
62        }
63    }
64
65    /// Register a shared counter that the idle-timeout monitor polls to detect active tool
66    /// execution. Incremented when a `ToolUse` content block starts (supporting parallel tool
67    /// calls); saturating-decremented on `MessageStart` or stream end. Prevents idle-timeout kills
68    /// during long tool operations (e.g. `Write` tool executes after `MessageStop`).
69    pub(crate) fn with_tool_activity_tracker(
70        mut self,
71        tracker: std::sync::Arc<std::sync::atomic::AtomicU32>,
72    ) -> Self {
73        self.tool_activity_tracker = ToolActivityTracker::with_tracker(tracker);
74        self
75    }
76
77    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
78        self.show_streaming_metrics = show;
79        self
80    }
81
82    #[must_use]
83    pub fn with_display_name(mut self, display_name: &str) -> Self {
84        self.display_name = display_name.to_string();
85        self
86    }
87
88    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
89        self.log_path = Some(std::path::PathBuf::from(path));
90        self
91    }
92
93    /// Set the terminal mode for this parser.
94    ///
95    /// # Arguments
96    ///
97    /// * `mode` - The terminal mode to use
98    ///
99    /// # Returns
100    ///
101    /// Self for builder pattern chaining
102    #[cfg(any(test, feature = "test-utils"))]
103    #[must_use]
104    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
105        self.state.terminal_mode.replace(mode);
106        self
107    }
108
109    /// Get a shared reference to the printer.
110    ///
111    /// This allows tests, monitoring, and other code to access the printer after parsing
112    /// to verify output content, check for duplicates, or capture output for analysis.
113    ///
114    /// # Returns
115    ///
116    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
117    ///
118    /// # Example
119    ///
120    /// ```ignore
121    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
122    /// use std::rc::Rc;
123    /// use std::cell::RefCell;
124    ///
125    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
126    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
127    ///
128    /// // Parse events...
129    ///
130    /// // Now access the printer to verify output
131    /// let printer_ref = parser.printer().borrow();
132    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
133    /// ```
134    /// Get a clone of the printer used by this parser.
135    ///
136    /// This is primarily useful for integration tests and monitoring in this repository.
137    /// Only available with the `test-utils` feature.
138    ///
139    /// Note: downstream crates should avoid relying on this API in production builds.
140    #[cfg(any(test, feature = "test-utils"))]
141    pub fn printer(&self) -> Rc<RefCell<dyn Printable>> {
142        self.printer.clone()
143    }
144
145    pub(crate) fn with_printer_mut<R>(&mut self, f: impl FnOnce(&mut dyn Printable) -> R) -> R {
146        let mut printer_ref = self.printer.borrow_mut();
147        f(&mut *printer_ref)
148    }
149
150    /// Get streaming quality metrics from the current session.
151    ///
152    /// This provides insight into the deduplication and streaming quality of the
153    /// parsing session, including:
154    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
155    /// - Number of large deltas (potential protocol violations)
156    /// - Total deltas processed
157    ///
158    /// Useful for testing, monitoring, and debugging streaming behavior.
159    /// Only available with the `test-utils` feature.
160    ///
161    /// # Returns
162    ///
163    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
164    ///
165    /// # Example
166    ///
167    /// ```ignore
168    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
169    /// use std::rc::Rc;
170    /// use std::cell::RefCell;
171    ///
172    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
173    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
174    ///
175    /// // Parse events...
176    ///
177    /// // Verify deduplication logic triggered
178    /// let metrics = parser.streaming_metrics();
179    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
180    /// ```
181    #[cfg(any(test, feature = "test-utils"))]
182    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
183        self.state
184            .streaming_session
185            .borrow()
186            .get_streaming_quality_metrics()
187    }
188
189    /// Update cursor-up tracking state based on output content in Full terminal mode.
190    fn update_cursor_up_state(&self, output: &str) {
191        if *self.state.terminal_mode.borrow() == TerminalMode::Full {
192            self.state.with_cursor_up_active_mut(|cursor_up_active| {
193                if output.contains("\x1b[1B\n") {
194                    *cursor_up_active = false;
195                }
196                if output.contains("\x1b[1A") {
197                    *cursor_up_active = true;
198                }
199            });
200        }
201    }
202
203    /// Handle a non-JSON line during stream parsing (e.g. plain text output from agent).
204    fn handle_non_json_line(&self, trimmed: &str) -> Option<String> {
205        if trimmed.is_empty() || trimmed.starts_with('{') {
206            return None;
207        }
208        let finalize = self
209            .state
210            .with_session_mut(|session| self.finalize_in_place_full_mode(session));
211        let out = format!("{finalize}{trimmed}\n");
212        self.update_cursor_up_state(&out);
213        Some(out)
214    }
215
216    /// Dispatch a parsed ClaudeEvent to the appropriate formatter.
217    fn dispatch_event(&self, event: ClaudeEvent, line: &str) -> String {
218        match event {
219            ClaudeEvent::System { subtype, session_id, cwd } => {
220                self.format_system_event(subtype.as_ref(), session_id, cwd)
221            }
222            ClaudeEvent::Assistant { message } => self.format_assistant_event(message.as_ref()),
223            ClaudeEvent::User { message } => self.format_user_event(message),
224            ClaudeEvent::Result { subtype, duration_ms, total_cost_usd, num_turns, result, error } => {
225                self.format_result_event(subtype, duration_ms, total_cost_usd, num_turns, result, error)
226            }
227            ClaudeEvent::StreamEvent { event } => self.parse_stream_event(event),
228            ClaudeEvent::Unknown => self.format_unknown_event(line),
229        }
230    }
231
232    fn format_unknown_event(&self, line: &str) -> String {
233        format_unknown_json_event(line, &self.display_name, self.colors, self.verbosity.is_verbose())
234    }
235
236    /// Parse and display a single Claude JSON event
237    ///
238    /// Returns `Some(formatted_output)` for valid events, or None for:
239    /// - Malformed JSON (logged at debug level)
240    /// - Unknown event types
241    /// - Empty or whitespace-only output
242    pub fn parse_event(&self, line: &str) -> Option<String> {
243        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
244            e
245        } else {
246            return self.handle_non_json_line(line.trim());
247        };
248        let finalize = self.compute_finalize_for_event(&event);
249        let output = self.dispatch_event(event, line);
250        let combined = combine_finalize_and_output(finalize, output);
251        combined.inspect(|out| {
252            self.update_cursor_up_state(out);
253        })
254    }
255
256    fn compute_finalize_for_event(&self, event: &ClaudeEvent) -> String {
257        if matches!(event, ClaudeEvent::StreamEvent { .. }) {
258            String::new()
259        } else {
260            self.state
261                .with_session_mut(|session| self.finalize_in_place_full_mode(session))
262        }
263    }
264
265    /// Reset all per-message streaming state at message start.
266    fn reset_message_state(&self) {
267        self.state.with_thinking_active_index_mut(|idx| *idx = None);
268        self.state
269            .with_thinking_non_tty_indices_mut(|indices| indices.clear());
270        self.state
271            .with_suppress_thinking_for_message_mut(|v| *v = false);
272        self.state.with_text_line_active_mut(|v| *v = false);
273        self.state.with_cursor_up_active_mut(|v| *v = false);
274        self.state.with_last_rendered_content_mut(|v| v.clear());
275    }
276
277    /// Handle a MessageStart stream event.
278    fn handle_message_start(
279        &self,
280        message: Option<crate::json_parser::types::AssistantMessage>,
281        message_id: Option<String>,
282    ) -> String {
283        // Tool result has been delivered — the prior tool's execution window is now complete.
284        // Clearing here (rather than at MessageStop) ensures the idle-timeout suppressor stays
285        // active while the Write tool (or any tool) executes between MessageStop and MessageStart.
286        self.tool_activity_tracker.clear_active();
287        let in_place_finalize = self
288            .state
289            .with_session_mut(|session| self.finalize_in_place_full_mode(session));
290        self.reset_message_state();
291        let effective_message_id =
292            message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
293        self.state.with_session_mut(|session| {
294            session.set_current_message_id(effective_message_id);
295            session.on_message_start();
296        });
297        in_place_finalize
298    }
299
300    /// Parse a streaming event for delta/partial updates
301    ///
302    /// Handles the nested events within `stream_event`:
303    /// - MessageStart/Stop: Manage session state
304    /// - `ContentBlockStart`: Initialize new content blocks
305    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
306    /// - `ContentBlockStop`: Finalize content blocks
307    /// - `MessageDelta`: Process message metadata without output
308    /// - Error: Display appropriately
309    ///
310    /// Returns String for display content, empty String for control events.
311    fn handle_content_block_start_no_block(&self, index: u64) -> String {
312        self.state.with_session_mut(|session| { session.on_content_block_start(index); });
313        String::new()
314    }
315
316    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
317        match event {
318            StreamInnerEvent::MessageStart { message, message_id } => self.handle_message_start(message, message_id),
319            StreamInnerEvent::ContentBlockStart { index: Some(index), content_block: Some(block) } => self.handle_content_block_start_with_block(index, block),
320            StreamInnerEvent::ContentBlockStart { index: Some(index), content_block: None } => self.handle_content_block_start_no_block(index),
321            StreamInnerEvent::ContentBlockStart { .. } => String::new(),
322            StreamInnerEvent::ContentBlockDelta { index: Some(index), delta: Some(delta) } => self.handle_content_block_delta_inner(index, delta),
323            StreamInnerEvent::TextDelta { text: Some(text) } => self.handle_text_delta_inner(&text),
324            StreamInnerEvent::ContentBlockStop { .. } | StreamInnerEvent::MessageDelta { .. } | StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::Ping | StreamInnerEvent::TextDelta { text: None } | StreamInnerEvent::Error { error: None } => String::new(),
325            StreamInnerEvent::MessageStop => self.handle_message_stop_inner(),
326            StreamInnerEvent::Error { error: Some(err), .. } => self.handle_error_event(err),
327            StreamInnerEvent::Unknown => self.handle_unknown_event(),
328        }
329    }
330
331    fn handle_content_block_start_with_block(&self, index: u64, block: ContentBlock) -> String {
332        // A ToolUse content block means the agent is actively calling a tool — suppress
333        // idle timeout for the duration. Other block types (Text, etc.) do not set the flag.
334        if matches!(block, ContentBlock::ToolUse { .. }) {
335            self.tool_activity_tracker.set_active();
336        }
337        self.state.with_session_mut(|session| {
338            session.on_content_block_start(index);
339            apply_content_block_start_to_session(session, index, &block);
340        });
341        String::new()
342    }
343
344    fn handle_content_block_delta_inner(&self, index: u64, delta: ContentBlockDelta) -> String {
345        self.state
346            .with_session_mut(|session| self.handle_content_block_delta(session, index, delta))
347    }
348
349    fn handle_text_delta_inner(&self, text: &str) -> String {
350        self.state
351            .with_session_mut(|session| self.handle_text_delta(session, text))
352    }
353
354    fn handle_message_stop_inner(&self) -> String {
355        // MessageStop arrives BEFORE Claude Code executes the Write tool. Do NOT clear
356        // tool_active here — the tool's execution window spans MessageStop → MessageStart.
357        // Clearing happens in handle_message_start once the tool result has been delivered.
358        self.state
359            .with_session_mut(|session| self.handle_message_stop(session))
360    }
361}
362
363struct StreamLoopState {
364    incremental_parser: IncrementalNdjsonParser,
365    log_buffer: Vec<u8>,
366    seen_success_result: std::cell::Cell<bool>,
367}
368
369impl StreamLoopState {
370    fn new() -> Self {
371        Self {
372            incremental_parser: IncrementalNdjsonParser::new(),
373            log_buffer: Vec::new(),
374            seen_success_result: std::cell::Cell::new(false),
375        }
376    }
377}
378
379impl ClaudeParser {
380    pub fn parse_stream<R: BufRead>(
381        &mut self,
382        mut reader: R,
383        workspace: &dyn crate::workspace::Workspace,
384    ) -> std::io::Result<()> {
385        let c = self.colors;
386        let monitor = HealthMonitor::new("Claude");
387        let mut state = StreamLoopState::new();
388        self.run_stream_loop(&mut reader, c, &monitor, &mut state)?;
389        self.finalize_parse_stream(workspace, &monitor, c, &state.log_buffer)
390    }
391
392    fn run_stream_loop<R: BufRead>(
393        &mut self, reader: &mut R, c: Colors,
394        monitor: &HealthMonitor, state: &mut StreamLoopState,
395    ) -> std::io::Result<()> {
396        let logging_enabled = self.log_path.is_some();
397        loop {
398            let chunk = reader.fill_buf()?;
399            if chunk.is_empty() { break; }
400            let data = chunk.to_vec(); reader.consume(data.len());
401            let (new_parser, events) = std::mem::take(&mut state.incremental_parser).feed_and_get_events(&data);
402            state.incremental_parser = new_parser;
403            events.into_iter().for_each(|line| { self.process_stream_line(&line, c, monitor, &mut state.log_buffer, logging_enabled, &state.seen_success_result); });
404        }
405        Ok(())
406    }
407
408    #[expect(
409        clippy::print_stderr,
410        reason = "debug-only output for verbose debugging"
411    )]
412    fn process_stream_line(
413        &mut self,
414        line: &str,
415        c: Colors,
416        monitor: &HealthMonitor,
417        log_buffer: &mut Vec<u8>,
418        logging_enabled: bool,
419        seen_success_result: &std::cell::Cell<bool>,
420    ) {
421        let trimmed = line.trim();
422        if trimmed.is_empty() { return; }
423        if self.verbosity.is_debug() {
424            eprintln!("{}[DEBUG]{} {}{}{}", c.dim(), c.reset(), c.dim(), line, c.reset());
425        }
426        self.process_parsed_line(trimmed, line, monitor, log_buffer, logging_enabled, seen_success_result);
427    }
428
429    fn process_parsed_line(
430        &mut self,
431        trimmed: &str,
432        line: &str,
433        monitor: &HealthMonitor,
434        log_buffer: &mut Vec<u8>,
435        logging_enabled: bool,
436        seen_success_result: &std::cell::Cell<bool>,
437    ) {
438        if should_skip_result_event(trimmed, seen_success_result) {
439            log_line_if_enabled(log_buffer, line, logging_enabled);
440            monitor.record_control_event();
441            return;
442        }
443        match self.parse_event(line) {
444            Some(output) => {
445                record_monitor_for_parsed_output(trimmed, line, monitor);
446                self.with_printer_mut(|printer| {
447                    if write!(printer, "{output}").is_ok() { printer.flush().ok(); }
448                });
449            }
450            None => record_monitor_for_no_output(trimmed, line, monitor),
451        }
452        log_line_if_enabled(log_buffer, line, logging_enabled);
453    }
454
455    fn finalize_parse_stream(
456        &mut self,
457        workspace: &dyn crate::workspace::Workspace,
458        monitor: &HealthMonitor,
459        c: Colors,
460        log_buffer: &[u8],
461    ) -> std::io::Result<()> {
462        self.tool_activity_tracker.reset(); // hard-reset at stream end — stdout is closed, no more tool events
463        if let Some(log_path) = &self.log_path {
464            workspace.append_bytes(log_path, log_buffer)?;
465        }
466        if let Some(warning) = monitor.check_and_warn(c) {
467            self.with_printer_mut(|printer| {
468                writeln!(printer, "{warning}").ok();
469                printer.flush().ok();
470            });
471        }
472        Ok(())
473    }
474}
475
476fn log_line_if_enabled(log_buffer: &mut Vec<u8>, line: &str, logging_enabled: bool) {
477    if logging_enabled { let _ = writeln!(log_buffer, "{line}"); }
478}
479
480fn combine_finalize_and_output(finalize: String, output: String) -> Option<String> {
481    let combined = if output.is_empty() {
482        finalize
483    } else {
484        format!("{finalize}{output}")
485    };
486    if combined.is_empty() { None } else { Some(combined) }
487}
488
489fn apply_content_block_start_to_session(
490    session: &mut crate::json_parser::streaming_state::StreamingSession,
491    index: u64,
492    block: &ContentBlock,
493) {
494    match block {
495        ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
496            session.on_text_delta(index, t);
497        }
498        ContentBlock::ToolUse { name, input } => {
499            apply_tool_use_start_to_session(session, index, name.as_deref(), input.as_ref());
500        }
501        _ => {}
502    }
503}
504
505fn json_value_to_tool_input_str(v: &serde_json::Value) -> String {
506    if let serde_json::Value::String(s) = v { s.clone() } else { format_tool_input(v) }
507}
508
509fn apply_tool_use_start_to_session(
510    session: &mut crate::json_parser::streaming_state::StreamingSession,
511    index: u64,
512    name: Option<&str>,
513    input: Option<&serde_json::Value>,
514) {
515    if let Some(n) = name {
516        session.set_tool_name(index, Some(n.to_string()));
517    }
518    if let Some(i) = input {
519        session.on_tool_input_delta(index, &json_value_to_tool_input_str(i));
520    }
521}
522
523fn has_errors_array_with_content(trimmed: &str) -> bool {
524    serde_json::from_str::<serde_json::Value>(trimmed).is_ok_and(|json| {
525        json.get("errors")
526            .and_then(|v| v.as_array())
527            .is_some_and(|arr| {
528                arr.iter()
529                    .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
530            })
531    })
532}
533
534fn is_spurious_glm_error(
535    subtype: &Option<String>,
536    duration_ms: Option<u64>,
537    error: &Option<String>,
538    has_errors: bool,
539) -> bool {
540    subtype.as_deref() != Some("success")
541        && duration_ms.unwrap_or(0) < 100
542        && (error.is_none() || error.as_ref().is_some_and(std::string::String::is_empty))
543        && !has_errors
544}
545
546fn should_skip_result_event(trimmed: &str, seen_success: &std::cell::Cell<bool>) -> bool {
547    if !trimmed.starts_with('{') { return false; }
548    let has_errors = has_errors_array_with_content(trimmed);
549    let Ok(ClaudeEvent::Result { subtype, duration_ms, error, .. }) =
550        serde_json::from_str::<ClaudeEvent>(trimmed)
551    else {
552        return false;
553    };
554    let spurious = is_spurious_glm_error(&subtype, duration_ms, &error, has_errors);
555    if subtype.as_deref() == Some("success") {
556        seen_success.set(true);
557        false
558    } else {
559        spurious
560    }
561}
562
563fn record_monitor_for_parsed_output(trimmed: &str, line: &str, monitor: &HealthMonitor) {
564    let is_partial = trimmed.starts_with('{')
565        && serde_json::from_str::<ClaudeEvent>(line)
566            .is_ok_and(|e| ClaudeParser::is_partial_event(&e));
567    if is_partial {
568        monitor.record_partial_event();
569    } else {
570        monitor.record_parsed();
571    }
572}
573
574fn record_monitor_for_no_output(trimmed: &str, line: &str, monitor: &HealthMonitor) {
575    if !trimmed.starts_with('{') { return monitor.record_ignored(); }
576    match serde_json::from_str::<ClaudeEvent>(line) {
577        Ok(event) if ClaudeParser::is_control_event(&event) => monitor.record_control_event(),
578        Ok(_) => monitor.record_unknown_event(),
579        Err(_) => monitor.record_parse_error(),
580    }
581}