ralph_workflow/json_parser/claude/parser.rs
1// Claude parser implementation.
2//
3// Contains the ClaudeParser struct and its core methods.
4
5/// Claude event parser
6///
7/// Note: This parser is designed for single-threaded use only.
8/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
9/// Do not share this parser across threads.
10pub struct ClaudeParser {
11 colors: Colors,
12 pub(crate) verbosity: Verbosity,
13 /// Relative path to log file (if logging enabled)
14 log_path: Option<std::path::PathBuf>,
15 display_name: String,
16 /// Unified streaming session tracker
17 /// Provides single source of truth for streaming state across all content types
18 streaming_session: Rc<RefCell<StreamingSession>>,
19 /// Terminal mode for output formatting
20 /// Detected at parse time and cached for performance
21 terminal_mode: RefCell<TerminalMode>,
22 /// Whether to show streaming quality metrics
23 show_streaming_metrics: bool,
24 /// Output printer for capturing or displaying output
25 printer: SharedPrinter,
26
27 /// Tracks whether a thinking delta line is currently being streamed.
28 ///
29 /// - In `TerminalMode::Full`, thinking deltas use the append-only streaming pattern (no cursor
30 /// movement during deltas) and must be finalized (emit the completion newline) before emitting
31 /// other newline-based output.
32 /// - In `TerminalMode::Basic|None`, we suppress per-delta thinking output and flush accumulated
33 /// thinking content once at the next output boundary (or at `message_stop`).
34 thinking_active_index: RefCell<Option<u64>>,
35
36 /// Tracks which thinking content block indices have streamed thinking content that is eligible
37 /// for non-TTY flushing.
38 ///
39 /// Some providers can emit multiple thinking blocks (multiple indices) within a single message.
40 /// In non-TTY modes we suppress per-delta output, so we must remember all indices that
41 /// accumulated thinking to flush them at `message_stop`.
42 thinking_non_tty_indices: RefCell<std::collections::BTreeSet<u64>>,
43
44 /// Once non-thinking output has started for the current message, suppress any
45 /// subsequent thinking deltas to avoid corrupting visible output.
46 ///
47 /// Claude/CCS can occasionally emit thinking deltas after text deltas. Because
48 /// both streams append on the current line in Full mode, allowing late thinking can
49 /// glue onto or visually corrupt previously-rendered text.
50 suppress_thinking_for_message: RefCell<bool>,
51
52 /// Tracks whether a text delta line is currently being streamed (Full mode).
53 ///
54 /// In the append-only streaming pattern, deltas do not move the cursor; they simply
55 /// append new suffixes on the current line. When true, any newline-based non-stream
56 /// output should ensure the streamed line is finalized (emit the completion newline)
57 /// before printing unrelated lines, to avoid "glued" output.
58 text_line_active: RefCell<bool>,
59
60 /// Defensive cursor state for legacy/inconsistent streams.
61 ///
62 /// The append-only streaming implementation should not emit cursor-up sequences,
63 /// but real-world logs can include raw passthrough output with escape codes.
64 /// When this flag is true, newline-based output should first emit a completion newline
65 /// to avoid overwriting/gluing onto visible content.
66 cursor_up_active: RefCell<bool>,
67
68 /// Tracks the last rendered content for append-only streaming in Full mode.
69 ///
70 /// In append-only mode, we emit the prefix once, then only emit new suffixes for subsequent deltas.
71 /// This map stores the last rendered content for each (`ContentType`, index) pair.
72 /// Key format: "{`content_type}:{index`}" (e.g., "text:0", "thinking:1")
73 last_rendered_content: RefCell<std::collections::HashMap<String, String>>,
74}
75
76impl ClaudeParser {
77 /// Create a new `ClaudeParser` with the given colors and verbosity.
78 ///
79 /// # Arguments
80 ///
81 /// * `colors` - Colors for terminal output
82 /// * `verbosity` - Verbosity level for output
83 ///
84 /// # Returns
85 ///
86 /// A new `ClaudeParser` instance
87 ///
88 /// # Example
89 ///
90 /// ```ignore
91 /// use ralph_workflow::json_parser::ClaudeParser;
92 /// use ralph_workflow::logger::Colors;
93 /// use ralph_workflow::config::Verbosity;
94 ///
95 /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
96 /// ```
97 #[must_use]
98 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
99 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
100 }
101
102 /// Create a new `ClaudeParser` with a custom printer.
103 ///
104 /// # Arguments
105 ///
106 /// * `colors` - Colors for terminal output
107 /// * `verbosity` - Verbosity level for output
108 /// * `printer` - Shared printer for output
109 ///
110 /// # Returns
111 ///
112 /// A new `ClaudeParser` instance
113 pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
114 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
115 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
116
117 // Use the printer's is_terminal method to validate it's connected correctly
118 // This is a sanity check that also satisfies the compiler that the method is used
119 let _printer_is_terminal = printer.borrow().is_terminal();
120
121 Self {
122 colors,
123 verbosity,
124 log_path: None,
125 display_name: "Claude".to_string(),
126 streaming_session: Rc::new(RefCell::new(streaming_session)),
127 terminal_mode: RefCell::new(TerminalMode::detect()),
128 show_streaming_metrics: false,
129 printer,
130 thinking_active_index: RefCell::new(None),
131 thinking_non_tty_indices: RefCell::new(std::collections::BTreeSet::new()),
132 suppress_thinking_for_message: RefCell::new(false),
133 text_line_active: RefCell::new(false),
134 cursor_up_active: RefCell::new(false),
135 last_rendered_content: RefCell::new(std::collections::HashMap::new()),
136 }
137 }
138
139 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
140 self.show_streaming_metrics = show;
141 self
142 }
143
144 /// Set the display name for this parser.
145 ///
146 /// # Arguments
147 ///
148 /// * `display_name` - The name to display in output
149 ///
150 /// # Returns
151 ///
152 /// Self for builder pattern chaining
153 #[must_use]
154 pub fn with_display_name(mut self, display_name: &str) -> Self {
155 self.display_name = display_name.to_string();
156 self
157 }
158
159 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
160 self.log_path = Some(std::path::PathBuf::from(path));
161 self
162 }
163
164 /// Set the terminal mode for this parser.
165 ///
166 /// # Arguments
167 ///
168 /// * `mode` - The terminal mode to use
169 ///
170 /// # Returns
171 ///
172 /// Self for builder pattern chaining
173 #[cfg(any(test, feature = "test-utils"))]
174 #[must_use]
175 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
176 *self.terminal_mode.borrow_mut() = mode;
177 self
178 }
179
180 /// Get a shared reference to the printer.
181 ///
182 /// This allows tests, monitoring, and other code to access the printer after parsing
183 /// to verify output content, check for duplicates, or capture output for analysis.
184 ///
185 /// # Returns
186 ///
187 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
188 ///
189 /// # Example
190 ///
191 /// ```ignore
192 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
193 /// use std::rc::Rc;
194 /// use std::cell::RefCell;
195 ///
196 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
197 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
198 ///
199 /// // Parse events...
200 ///
201 /// // Now access the printer to verify output
202 /// let printer_ref = parser.printer().borrow();
203 /// assert!(!printer_ref.has_duplicate_consecutive_lines());
204 /// ```
205 /// Get a clone of the printer used by this parser.
206 ///
207 /// This is primarily useful for integration tests and monitoring in this repository.
208 /// Only available with the `test-utils` feature.
209 ///
210 /// Note: downstream crates should avoid relying on this API in production builds.
211 #[cfg(any(test, feature = "test-utils"))]
212 pub fn printer(&self) -> SharedPrinter {
213 Rc::clone(&self.printer)
214 }
215
216 /// Get streaming quality metrics from the current session.
217 ///
218 /// This provides insight into the deduplication and streaming quality of the
219 /// parsing session, including:
220 /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
221 /// - Number of large deltas (potential protocol violations)
222 /// - Total deltas processed
223 ///
224 /// Useful for testing, monitoring, and debugging streaming behavior.
225 /// Only available with the `test-utils` feature.
226 ///
227 /// # Returns
228 ///
229 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
230 ///
231 /// # Example
232 ///
233 /// ```ignore
234 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
235 /// use std::rc::Rc;
236 /// use std::cell::RefCell;
237 ///
238 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
239 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
240 ///
241 /// // Parse events...
242 ///
243 /// // Verify deduplication logic triggered
244 /// let metrics = parser.streaming_metrics();
245 /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
246 /// ```
247 #[cfg(any(test, feature = "test-utils"))]
248 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
249 self.streaming_session
250 .borrow()
251 .get_streaming_quality_metrics()
252 }
253
254 /// Parse and display a single Claude JSON event
255 ///
256 /// Returns `Some(formatted_output)` for valid events, or None for:
257 /// - Malformed JSON (logged at debug level)
258 /// - Unknown event types
259 /// - Empty or whitespace-only output
260 pub fn parse_event(&self, line: &str) -> Option<String> {
261 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
262 e
263 } else {
264 // Non-JSON line - could be raw text output from agent
265 // Pass through as-is if it looks like real output (not empty)
266 let trimmed = line.trim();
267 if !trimmed.is_empty() && !trimmed.starts_with('{') {
268 // In full TTY mode, thinking deltas keep the cursor on the thinking line for
269 // in-place updates. Any other output must first finalize that cursor state.
270 let session = self.streaming_session.borrow_mut();
271 let finalize = self.finalize_in_place_full_mode(&session);
272 let out = format!("{finalize}{trimmed}\n");
273 if *self.terminal_mode.borrow() == TerminalMode::Full {
274 // Only mutate cursor state based on explicit cursor controls.
275 // Normal output may include newlines, but does not reliably indicate whether
276 // we are still in the in-place streaming cursor-up position.
277 let mut cursor_up_active = self.cursor_up_active.borrow_mut();
278 if out.contains("\x1b[1B\n") {
279 *cursor_up_active = false;
280 }
281 if out.contains("\x1b[1A") {
282 *cursor_up_active = true;
283 }
284 }
285 return Some(out);
286 }
287 return None;
288 };
289
290 // When a thinking/text line is being streamed in full TTY mode, the streaming
291 // implementation is append-only and keeps the cursor on the current line.
292 //
293 // Any non-stream event (system/user/assistant/result) must first finalize the
294 // active streaming line so the next output does not glue onto it.
295 let finalize = if matches!(&event, ClaudeEvent::StreamEvent { .. }) {
296 String::new()
297 } else {
298 let session = self.streaming_session.borrow_mut();
299 self.finalize_in_place_full_mode(&session)
300 };
301 let c = &self.colors;
302 let prefix = &self.display_name;
303
304 let output = match event {
305 ClaudeEvent::System {
306 subtype,
307 session_id,
308 cwd,
309 } => self.format_system_event(subtype.as_ref(), session_id, cwd),
310 ClaudeEvent::Assistant { message } => self.format_assistant_event(message.as_ref()),
311 ClaudeEvent::User { message } => self.format_user_event(message),
312 ClaudeEvent::Result {
313 subtype,
314 duration_ms,
315 total_cost_usd,
316 num_turns,
317 result,
318 error,
319 } => self.format_result_event(
320 subtype,
321 duration_ms,
322 total_cost_usd,
323 num_turns,
324 result,
325 error,
326 ),
327 ClaudeEvent::StreamEvent { event } => {
328 // Handle streaming events for delta/partial updates
329 self.parse_stream_event(event)
330 }
331 ClaudeEvent::Unknown => {
332 // Use the generic unknown event formatter for consistent handling
333 // In verbose mode, this will show the event type and key fields
334 // In normal mode, this returns empty string
335 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
336 }
337 };
338
339 // IMPORTANT: We must preserve any completion output from `finalize_in_place_full_mode`
340 // even if the current event itself produces no visible output.
341 //
342 // Example: the final "assistant" event can be deduplicated (empty output) after
343 // streaming deltas have already been shown. If we drop `finalize` in that case,
344 // the streamed line never receives its completion newline and subsequent output
345 // (e.g., system `status`) can clear/overwrite it.
346 let output = if output.is_empty() {
347 finalize
348 } else {
349 format!("{finalize}{output}")
350 };
351
352 if output.is_empty() {
353 None
354 } else {
355 if *self.terminal_mode.borrow() == TerminalMode::Full {
356 // Keep a simple, output-driven model of cursor state.
357 //
358 // The streaming implementation is append-only and SHOULD NOT emit cursor-up
359 // sequences ("\x1b[1A") for deltas. We only treat explicit cursor completion
360 // sequences ("\x1b[1B\n") as authoritative for clearing the defensive flag.
361 //
362 // Note: raw passthrough output may include escape sequences; we avoid inferring
363 // "cursor is up" from output content to keep this logic robust.
364 let mut cursor_up_active = self.cursor_up_active.borrow_mut();
365 if output.contains("\x1b[1B\n") {
366 *cursor_up_active = false;
367 }
368 }
369 Some(output)
370 }
371 }
372
373 /// Parse a streaming event for delta/partial updates
374 ///
375 /// Handles the nested events within `stream_event`:
376 /// - MessageStart/Stop: Manage session state
377 /// - `ContentBlockStart`: Initialize new content blocks
378 /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
379 /// - `ContentBlockStop`: Finalize content blocks
380 /// - `MessageDelta`: Process message metadata without output
381 /// - Error: Display appropriately
382 ///
383 /// Returns String for display content, empty String for control events.
384 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
385 let mut session = self.streaming_session.borrow_mut();
386
387 match event {
388 StreamInnerEvent::MessageStart {
389 message,
390 message_id,
391 } => {
392 // Protocol violations happen in real streams: a new MessageStart can arrive
393 // while a previous streamed line is still "active" (we haven't yet emitted the
394 // completion newline). Finalize any active streaming line before resetting state
395 // so subsequent output doesn't glue onto the in-progress line.
396 let in_place_finalize = self.finalize_in_place_full_mode(&session);
397
398 // Reset any pending thinking line from a previous message.
399 *self.thinking_active_index.borrow_mut() = None;
400 self.thinking_non_tty_indices.borrow_mut().clear();
401 *self.suppress_thinking_for_message.borrow_mut() = false;
402 *self.text_line_active.borrow_mut() = false;
403 *self.cursor_up_active.borrow_mut() = false;
404
405 // Extract message_id from either the top-level field or nested message.id
406 // The Claude API typically puts the ID in message.id, not at the top level
407 let effective_message_id =
408 message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
409 // Set message ID for tracking and clear session state on new message
410 session.set_current_message_id(effective_message_id);
411 session.on_message_start();
412 // Clear last rendered content for append-only pattern on new message
413 self.last_rendered_content.borrow_mut().clear();
414 in_place_finalize
415 }
416 StreamInnerEvent::ContentBlockStart {
417 index: Some(index),
418 content_block: Some(block),
419 } => {
420 // Initialize a new content block at this index
421 session.on_content_block_start(index);
422 match &block {
423 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
424 // Initial text in ContentBlockStart - treat as first delta
425 session.on_text_delta(index, t);
426 }
427 ContentBlock::ToolUse { name, input } => {
428 // Track tool name for GLM/CCS deduplication.
429 // IMPORTANT: Track the tool name when provided, even when input is None.
430 // GLM may send ContentBlockStart with name but no input, then send input via delta.
431 // We only store when we have a name to avoid overwriting a previous tool name with None.
432 if let Some(n) = name {
433 session.set_tool_name(index, Some(n.clone()));
434 }
435
436 // Initialize tool input accumulator only if input is present
437 if let Some(i) = input {
438 let input_str = if let serde_json::Value::String(s) = &i {
439 s.clone()
440 } else {
441 format_tool_input(i)
442 };
443 session.on_tool_input_delta(index, &input_str);
444 }
445 }
446 _ => {}
447 }
448 String::new()
449 }
450 StreamInnerEvent::ContentBlockStart {
451 index: Some(index),
452 content_block: None,
453 } => {
454 // Content block started but no initial content provided
455 session.on_content_block_start(index);
456 String::new()
457 }
458 StreamInnerEvent::ContentBlockStart { .. } => {
459 // Content block without index - ignore
460 String::new()
461 }
462 StreamInnerEvent::ContentBlockDelta {
463 index: Some(index),
464 delta: Some(delta),
465 } => self.handle_content_block_delta(&mut session, index, delta),
466 StreamInnerEvent::TextDelta { text: Some(text) } => {
467 self.handle_text_delta(&mut session, &text)
468 }
469 StreamInnerEvent::ContentBlockStop { .. } => {
470 // Content block completion event - no output needed
471 // This event marks the end of a content block but doesn't produce
472 // any displayable content. It's a control event for state management.
473 String::new()
474 }
475 StreamInnerEvent::MessageDelta { .. } => {
476 // Message delta event with usage/metadata - no output needed
477 // This event contains final message metadata (stop_reason, usage stats)
478 // but is used for tracking/monitoring purposes only, not display.
479 String::new()
480 }
481 StreamInnerEvent::ContentBlockDelta { .. }
482 | StreamInnerEvent::Ping
483 | StreamInnerEvent::TextDelta { text: None }
484 | StreamInnerEvent::Error { error: None } => String::new(),
485 StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
486 StreamInnerEvent::Error {
487 error: Some(err), ..
488 } => self.handle_error_event(err),
489 StreamInnerEvent::Unknown => self.handle_unknown_event(),
490 }
491 }
492}