Skip to main content

ralph_workflow/json_parser/claude/
parser.rs

1// Claude parser implementation.
2//
3// Contains the ClaudeParser struct and its core methods.
4
5use std::cell::RefCell;
6use std::io::BufRead;
7use std::rc::Rc;
8
9use crate::json_parser::claude::io::ParserState;
10#[cfg(any(test, feature = "test-utils"))]
11use crate::json_parser::health::StreamingQualityMetrics;
12use crate::json_parser::incremental_parser::IncrementalNdjsonParser;
13use crate::json_parser::printer::Printable;
14use crate::json_parser::printer::StdoutPrinter;
15use crate::json_parser::types::{ContentBlock, ContentBlockDelta};
16
17/// Claude event parser
18///
19/// Note: This parser is designed for single-threaded use only.
20/// Do not share this parser across threads.
21pub struct ClaudeParser {
22    colors: Colors,
23    pub(crate) verbosity: Verbosity,
24    log_path: Option<std::path::PathBuf>,
25    display_name: String,
26    state: ParserState,
27    show_streaming_metrics: bool,
28    printer: Rc<RefCell<dyn Printable>>,
29}
30
31impl ClaudeParser {
32    #[must_use]
33    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
34        Self::with_printer(
35            colors,
36            verbosity,
37            Rc::new(RefCell::new(StdoutPrinter::new())),
38        )
39    }
40
41    pub fn with_printer(
42        colors: Colors,
43        verbosity: Verbosity,
44        printer: Rc<RefCell<dyn Printable>>,
45    ) -> Self {
46        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
47
48        Self {
49            colors,
50            verbosity,
51            log_path: None,
52            display_name: "Claude".to_string(),
53            state: ParserState::new(verbose_warnings),
54            show_streaming_metrics: false,
55            printer,
56        }
57    }
58
59    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
60        self.show_streaming_metrics = show;
61        self
62    }
63
64    #[must_use]
65    pub fn with_display_name(mut self, display_name: &str) -> Self {
66        self.display_name = display_name.to_string();
67        self
68    }
69
70    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
71        self.log_path = Some(std::path::PathBuf::from(path));
72        self
73    }
74
75    /// Set the terminal mode for this parser.
76    ///
77    /// # Arguments
78    ///
79    /// * `mode` - The terminal mode to use
80    ///
81    /// # Returns
82    ///
83    /// Self for builder pattern chaining
84    #[cfg(any(test, feature = "test-utils"))]
85    #[must_use]
86    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
87        self.state.terminal_mode.replace(mode);
88        self
89    }
90
91    /// Get a shared reference to the printer.
92    ///
93    /// This allows tests, monitoring, and other code to access the printer after parsing
94    /// to verify output content, check for duplicates, or capture output for analysis.
95    ///
96    /// # Returns
97    ///
98    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
99    ///
100    /// # Example
101    ///
102    /// ```ignore
103    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
104    /// use std::rc::Rc;
105    /// use std::cell::RefCell;
106    ///
107    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
108    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
109    ///
110    /// // Parse events...
111    ///
112    /// // Now access the printer to verify output
113    /// let printer_ref = parser.printer().borrow();
114    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
115    /// ```
116    /// Get a clone of the printer used by this parser.
117    ///
118    /// This is primarily useful for integration tests and monitoring in this repository.
119    /// Only available with the `test-utils` feature.
120    ///
121    /// Note: downstream crates should avoid relying on this API in production builds.
122    #[cfg(any(test, feature = "test-utils"))]
123    pub fn printer(&self) -> Rc<RefCell<dyn Printable>> {
124        self.printer.clone()
125    }
126
127    pub(crate) fn with_printer_mut<R>(&mut self, f: impl FnOnce(&mut dyn Printable) -> R) -> R {
128        let mut printer_ref = self.printer.borrow_mut();
129        f(&mut *printer_ref)
130    }
131
132    /// Get streaming quality metrics from the current session.
133    ///
134    /// This provides insight into the deduplication and streaming quality of the
135    /// parsing session, including:
136    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
137    /// - Number of large deltas (potential protocol violations)
138    /// - Total deltas processed
139    ///
140    /// Useful for testing, monitoring, and debugging streaming behavior.
141    /// Only available with the `test-utils` feature.
142    ///
143    /// # Returns
144    ///
145    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
146    ///
147    /// # Example
148    ///
149    /// ```ignore
150    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
151    /// use std::rc::Rc;
152    /// use std::cell::RefCell;
153    ///
154    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
155    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
156    ///
157    /// // Parse events...
158    ///
159    /// // Verify deduplication logic triggered
160    /// let metrics = parser.streaming_metrics();
161    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
162    /// ```
163    #[cfg(any(test, feature = "test-utils"))]
164    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
165        self.state
166            .streaming_session
167            .borrow()
168            .get_streaming_quality_metrics()
169    }
170
171    /// Update cursor-up tracking state based on output content in Full terminal mode.
172    fn update_cursor_up_state(&self, output: &str) {
173        if *self.state.terminal_mode.borrow() == TerminalMode::Full {
174            self.state.with_cursor_up_active_mut(|cursor_up_active| {
175                if output.contains("\x1b[1B\n") {
176                    *cursor_up_active = false;
177                }
178                if output.contains("\x1b[1A") {
179                    *cursor_up_active = true;
180                }
181            });
182        }
183    }
184
185    /// Handle a non-JSON line during stream parsing (e.g. plain text output from agent).
186    fn handle_non_json_line(&self, trimmed: &str) -> Option<String> {
187        if trimmed.is_empty() || trimmed.starts_with('{') {
188            return None;
189        }
190        let finalize = self
191            .state
192            .with_session_mut(|session| self.finalize_in_place_full_mode(session));
193        let out = format!("{finalize}{trimmed}\n");
194        self.update_cursor_up_state(&out);
195        Some(out)
196    }
197
198    /// Dispatch a parsed ClaudeEvent to the appropriate formatter.
199    fn dispatch_event(&self, event: ClaudeEvent, line: &str) -> String {
200        match event {
201            ClaudeEvent::System { subtype, session_id, cwd } => {
202                self.format_system_event(subtype.as_ref(), session_id, cwd)
203            }
204            ClaudeEvent::Assistant { message } => self.format_assistant_event(message.as_ref()),
205            ClaudeEvent::User { message } => self.format_user_event(message),
206            ClaudeEvent::Result { subtype, duration_ms, total_cost_usd, num_turns, result, error } => {
207                self.format_result_event(subtype, duration_ms, total_cost_usd, num_turns, result, error)
208            }
209            ClaudeEvent::StreamEvent { event } => self.parse_stream_event(event),
210            ClaudeEvent::Unknown => self.format_unknown_event(line),
211        }
212    }
213
214    fn format_unknown_event(&self, line: &str) -> String {
215        format_unknown_json_event(line, &self.display_name, self.colors, self.verbosity.is_verbose())
216    }
217
218    /// Parse and display a single Claude JSON event
219    ///
220    /// Returns `Some(formatted_output)` for valid events, or None for:
221    /// - Malformed JSON (logged at debug level)
222    /// - Unknown event types
223    /// - Empty or whitespace-only output
224    pub fn parse_event(&self, line: &str) -> Option<String> {
225        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
226            e
227        } else {
228            return self.handle_non_json_line(line.trim());
229        };
230        let finalize = self.compute_finalize_for_event(&event);
231        let output = self.dispatch_event(event, line);
232        let combined = combine_finalize_and_output(finalize, output);
233        combined.inspect(|out| {
234            self.update_cursor_up_state(out);
235        })
236    }
237
238    fn compute_finalize_for_event(&self, event: &ClaudeEvent) -> String {
239        if matches!(event, ClaudeEvent::StreamEvent { .. }) {
240            String::new()
241        } else {
242            self.state
243                .with_session_mut(|session| self.finalize_in_place_full_mode(session))
244        }
245    }
246
247    /// Reset all per-message streaming state at message start.
248    fn reset_message_state(&self) {
249        self.state.with_thinking_active_index_mut(|idx| *idx = None);
250        self.state
251            .with_thinking_non_tty_indices_mut(|indices| indices.clear());
252        self.state
253            .with_suppress_thinking_for_message_mut(|v| *v = false);
254        self.state.with_text_line_active_mut(|v| *v = false);
255        self.state.with_cursor_up_active_mut(|v| *v = false);
256        self.state.with_last_rendered_content_mut(|v| v.clear());
257    }
258
259    /// Handle a MessageStart stream event.
260    fn handle_message_start(
261        &self,
262        message: Option<crate::json_parser::types::AssistantMessage>,
263        message_id: Option<String>,
264    ) -> String {
265        let in_place_finalize = self
266            .state
267            .with_session_mut(|session| self.finalize_in_place_full_mode(session));
268        self.reset_message_state();
269        let effective_message_id =
270            message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
271        self.state.with_session_mut(|session| {
272            session.set_current_message_id(effective_message_id);
273            session.on_message_start();
274        });
275        in_place_finalize
276    }
277
278    /// Parse a streaming event for delta/partial updates
279    ///
280    /// Handles the nested events within `stream_event`:
281    /// - MessageStart/Stop: Manage session state
282    /// - `ContentBlockStart`: Initialize new content blocks
283    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
284    /// - `ContentBlockStop`: Finalize content blocks
285    /// - `MessageDelta`: Process message metadata without output
286    /// - Error: Display appropriately
287    ///
288    /// Returns String for display content, empty String for control events.
289    fn handle_content_block_start_no_block(&self, index: u64) -> String {
290        self.state.with_session_mut(|session| { session.on_content_block_start(index); });
291        String::new()
292    }
293
294    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
295        match event {
296            StreamInnerEvent::MessageStart { message, message_id } => self.handle_message_start(message, message_id),
297            StreamInnerEvent::ContentBlockStart { index: Some(index), content_block: Some(block) } => self.handle_content_block_start_with_block(index, block),
298            StreamInnerEvent::ContentBlockStart { index: Some(index), content_block: None } => self.handle_content_block_start_no_block(index),
299            StreamInnerEvent::ContentBlockStart { .. } => String::new(),
300            StreamInnerEvent::ContentBlockDelta { index: Some(index), delta: Some(delta) } => self.handle_content_block_delta_inner(index, delta),
301            StreamInnerEvent::TextDelta { text: Some(text) } => self.handle_text_delta_inner(&text),
302            StreamInnerEvent::ContentBlockStop { .. } | StreamInnerEvent::MessageDelta { .. } | StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::Ping | StreamInnerEvent::TextDelta { text: None } | StreamInnerEvent::Error { error: None } => String::new(),
303            StreamInnerEvent::MessageStop => self.handle_message_stop_inner(),
304            StreamInnerEvent::Error { error: Some(err), .. } => self.handle_error_event(err),
305            StreamInnerEvent::Unknown => self.handle_unknown_event(),
306        }
307    }
308
309    fn handle_content_block_start_with_block(&self, index: u64, block: ContentBlock) -> String {
310        self.state.with_session_mut(|session| {
311            session.on_content_block_start(index);
312            apply_content_block_start_to_session(session, index, &block);
313        });
314        String::new()
315    }
316
317    fn handle_content_block_delta_inner(&self, index: u64, delta: ContentBlockDelta) -> String {
318        self.state
319            .with_session_mut(|session| self.handle_content_block_delta(session, index, delta))
320    }
321
322    fn handle_text_delta_inner(&self, text: &str) -> String {
323        self.state
324            .with_session_mut(|session| self.handle_text_delta(session, text))
325    }
326
327    fn handle_message_stop_inner(&self) -> String {
328        self.state
329            .with_session_mut(|session| self.handle_message_stop(session))
330    }
331}
332
333struct StreamLoopState {
334    incremental_parser: IncrementalNdjsonParser,
335    log_buffer: Vec<u8>,
336    seen_success_result: std::cell::Cell<bool>,
337}
338
339impl StreamLoopState {
340    fn new() -> Self {
341        Self {
342            incremental_parser: IncrementalNdjsonParser::new(),
343            log_buffer: Vec::new(),
344            seen_success_result: std::cell::Cell::new(false),
345        }
346    }
347}
348
349impl ClaudeParser {
350    pub fn parse_stream<R: BufRead>(
351        &mut self,
352        mut reader: R,
353        workspace: &dyn crate::workspace::Workspace,
354    ) -> std::io::Result<()> {
355        let c = self.colors;
356        let monitor = HealthMonitor::new("Claude");
357        let mut state = StreamLoopState::new();
358        self.run_stream_loop(&mut reader, c, &monitor, &mut state)?;
359        self.finalize_parse_stream(workspace, &monitor, c, &state.log_buffer)
360    }
361
362    fn run_stream_loop<R: BufRead>(
363        &mut self, reader: &mut R, c: Colors,
364        monitor: &HealthMonitor, state: &mut StreamLoopState,
365    ) -> std::io::Result<()> {
366        let logging_enabled = self.log_path.is_some();
367        loop {
368            let chunk = reader.fill_buf()?;
369            if chunk.is_empty() { break; }
370            let data = chunk.to_vec(); reader.consume(data.len());
371            let (new_parser, events) = std::mem::take(&mut state.incremental_parser).feed_and_get_events(&data);
372            state.incremental_parser = new_parser;
373            events.into_iter().for_each(|line| { self.process_stream_line(&line, c, monitor, &mut state.log_buffer, logging_enabled, &state.seen_success_result); });
374        }
375        Ok(())
376    }
377
378    #[expect(
379        clippy::print_stderr,
380        reason = "debug-only output for verbose debugging"
381    )]
382    fn process_stream_line(
383        &mut self,
384        line: &str,
385        c: Colors,
386        monitor: &HealthMonitor,
387        log_buffer: &mut Vec<u8>,
388        logging_enabled: bool,
389        seen_success_result: &std::cell::Cell<bool>,
390    ) {
391        let trimmed = line.trim();
392        if trimmed.is_empty() { return; }
393        if self.verbosity.is_debug() {
394            eprintln!("{}[DEBUG]{} {}{}{}", c.dim(), c.reset(), c.dim(), line, c.reset());
395        }
396        self.process_parsed_line(trimmed, line, monitor, log_buffer, logging_enabled, seen_success_result);
397    }
398
399    fn process_parsed_line(
400        &mut self,
401        trimmed: &str,
402        line: &str,
403        monitor: &HealthMonitor,
404        log_buffer: &mut Vec<u8>,
405        logging_enabled: bool,
406        seen_success_result: &std::cell::Cell<bool>,
407    ) {
408        if should_skip_result_event(trimmed, seen_success_result) {
409            log_line_if_enabled(log_buffer, line, logging_enabled);
410            monitor.record_control_event();
411            return;
412        }
413        match self.parse_event(line) {
414            Some(output) => {
415                record_monitor_for_parsed_output(trimmed, line, monitor);
416                self.with_printer_mut(|printer| {
417                    if write!(printer, "{output}").is_ok() { printer.flush().ok(); }
418                });
419            }
420            None => record_monitor_for_no_output(trimmed, line, monitor),
421        }
422        log_line_if_enabled(log_buffer, line, logging_enabled);
423    }
424
425    fn finalize_parse_stream(
426        &mut self,
427        workspace: &dyn crate::workspace::Workspace,
428        monitor: &HealthMonitor,
429        c: Colors,
430        log_buffer: &[u8],
431    ) -> std::io::Result<()> {
432        if let Some(log_path) = &self.log_path {
433            workspace.append_bytes(log_path, log_buffer)?;
434        }
435        if let Some(warning) = monitor.check_and_warn(c) {
436            self.with_printer_mut(|printer| {
437                writeln!(printer, "{warning}").ok();
438                printer.flush().ok();
439            });
440        }
441        Ok(())
442    }
443}
444
445fn log_line_if_enabled(log_buffer: &mut Vec<u8>, line: &str, logging_enabled: bool) {
446    if logging_enabled { let _ = writeln!(log_buffer, "{line}"); }
447}
448
449fn combine_finalize_and_output(finalize: String, output: String) -> Option<String> {
450    let combined = if output.is_empty() {
451        finalize
452    } else {
453        format!("{finalize}{output}")
454    };
455    if combined.is_empty() { None } else { Some(combined) }
456}
457
458fn apply_content_block_start_to_session(
459    session: &mut crate::json_parser::streaming_state::StreamingSession,
460    index: u64,
461    block: &ContentBlock,
462) {
463    match block {
464        ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
465            session.on_text_delta(index, t);
466        }
467        ContentBlock::ToolUse { name, input } => {
468            apply_tool_use_start_to_session(session, index, name.as_deref(), input.as_ref());
469        }
470        _ => {}
471    }
472}
473
474fn json_value_to_tool_input_str(v: &serde_json::Value) -> String {
475    if let serde_json::Value::String(s) = v { s.clone() } else { format_tool_input(v) }
476}
477
478fn apply_tool_use_start_to_session(
479    session: &mut crate::json_parser::streaming_state::StreamingSession,
480    index: u64,
481    name: Option<&str>,
482    input: Option<&serde_json::Value>,
483) {
484    if let Some(n) = name {
485        session.set_tool_name(index, Some(n.to_string()));
486    }
487    if let Some(i) = input {
488        session.on_tool_input_delta(index, &json_value_to_tool_input_str(i));
489    }
490}
491
492fn has_errors_array_with_content(trimmed: &str) -> bool {
493    serde_json::from_str::<serde_json::Value>(trimmed).is_ok_and(|json| {
494        json.get("errors")
495            .and_then(|v| v.as_array())
496            .is_some_and(|arr| {
497                arr.iter()
498                    .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
499            })
500    })
501}
502
503fn is_spurious_glm_error(
504    subtype: &Option<String>,
505    duration_ms: Option<u64>,
506    error: &Option<String>,
507    has_errors: bool,
508) -> bool {
509    subtype.as_deref() != Some("success")
510        && duration_ms.unwrap_or(0) < 100
511        && (error.is_none() || error.as_ref().is_some_and(std::string::String::is_empty))
512        && !has_errors
513}
514
515fn should_skip_result_event(trimmed: &str, seen_success: &std::cell::Cell<bool>) -> bool {
516    if !trimmed.starts_with('{') { return false; }
517    let has_errors = has_errors_array_with_content(trimmed);
518    let Ok(ClaudeEvent::Result { subtype, duration_ms, error, .. }) =
519        serde_json::from_str::<ClaudeEvent>(trimmed)
520    else {
521        return false;
522    };
523    let spurious = is_spurious_glm_error(&subtype, duration_ms, &error, has_errors);
524    if subtype.as_deref() == Some("success") {
525        seen_success.set(true);
526        false
527    } else {
528        spurious
529    }
530}
531
532fn record_monitor_for_parsed_output(trimmed: &str, line: &str, monitor: &HealthMonitor) {
533    let is_partial = trimmed.starts_with('{')
534        && serde_json::from_str::<ClaudeEvent>(line)
535            .is_ok_and(|e| ClaudeParser::is_partial_event(&e));
536    if is_partial {
537        monitor.record_partial_event();
538    } else {
539        monitor.record_parsed();
540    }
541}
542
543fn record_monitor_for_no_output(trimmed: &str, line: &str, monitor: &HealthMonitor) {
544    if !trimmed.starts_with('{') { return monitor.record_ignored(); }
545    match serde_json::from_str::<ClaudeEvent>(line) {
546        Ok(event) if ClaudeParser::is_control_event(&event) => monitor.record_control_event(),
547        Ok(_) => monitor.record_unknown_event(),
548        Err(_) => monitor.record_parse_error(),
549    }
550}