ralph_workflow/json_parser/claude/parser.rs
1// Claude parser implementation.
2//
3// Contains the ClaudeParser struct and its core methods.
4
5/// Claude event parser
6///
7/// Note: This parser is designed for single-threaded use only.
8/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
9/// Do not share this parser across threads.
10pub struct ClaudeParser {
11 colors: Colors,
12 pub(crate) verbosity: Verbosity,
13 /// Relative path to log file (if logging enabled)
14 log_path: Option<std::path::PathBuf>,
15 display_name: String,
16 /// Unified streaming session tracker
17 /// Provides single source of truth for streaming state across all content types
18 streaming_session: Rc<RefCell<StreamingSession>>,
19 /// Terminal mode for output formatting
20 /// Detected at parse time and cached for performance
21 terminal_mode: RefCell<TerminalMode>,
22 /// Whether to show streaming quality metrics
23 show_streaming_metrics: bool,
24 /// Output printer for capturing or displaying output
25 printer: SharedPrinter,
26
27 /// Tracks whether a thinking delta line is currently being streamed.
28 ///
29 /// - In `TerminalMode::Full`, thinking deltas use the append-only streaming pattern (no cursor
30 /// movement during deltas) and must be finalized (emit the completion newline) before emitting
31 /// other newline-based output.
32 /// - In `TerminalMode::Basic|None`, we suppress per-delta thinking output and flush accumulated
33 /// thinking content once at the next output boundary (or at `message_stop`).
34 thinking_active_index: RefCell<Option<u64>>,
35
36 /// Tracks which thinking content block indices have streamed thinking content that is eligible
37 /// for non-TTY flushing.
38 ///
39 /// Some providers can emit multiple thinking blocks (multiple indices) within a single message.
40 /// In non-TTY modes we suppress per-delta output, so we must remember all indices that
41 /// accumulated thinking to flush them at `message_stop`.
42 thinking_non_tty_indices: RefCell<std::collections::BTreeSet<u64>>,
43
44 /// Once non-thinking output has started for the current message, suppress any
45 /// subsequent thinking deltas to avoid corrupting visible output.
46 ///
47 /// Claude/CCS can occasionally emit thinking deltas after text deltas. Because
48 /// both streams append on the current line in Full mode, allowing late thinking can
49 /// glue onto or visually corrupt previously-rendered text.
50 suppress_thinking_for_message: RefCell<bool>,
51
52 /// Tracks whether a text delta line is currently being streamed (Full mode).
53 ///
54 /// In the append-only streaming pattern, deltas do not move the cursor; they simply
55 /// append new suffixes on the current line. When true, any newline-based non-stream
56 /// output should ensure the streamed line is finalized (emit the completion newline)
57 /// before printing unrelated lines, to avoid "glued" output.
58 text_line_active: RefCell<bool>,
59
60 /// Defensive cursor state for legacy/inconsistent streams.
61 ///
62 /// The append-only streaming implementation should not emit cursor-up sequences,
63 /// but real-world logs can include raw passthrough output with escape codes.
64 /// When this flag is true, newline-based output should first emit a completion newline
65 /// to avoid overwriting/gluing onto visible content.
66 cursor_up_active: RefCell<bool>,
67
68 /// Tracks the last rendered content for append-only streaming in Full mode.
69 ///
70 /// In append-only mode, we emit the prefix once, then only emit new suffixes for subsequent deltas.
71 /// This map stores the last rendered content for each (ContentType, index) pair.
72 /// Key format: "{content_type}:{index}" (e.g., "text:0", "thinking:1")
73 last_rendered_content: RefCell<std::collections::HashMap<String, String>>,
74}
75
76impl ClaudeParser {
77 /// Create a new `ClaudeParser` with the given colors and verbosity.
78 ///
79 /// # Arguments
80 ///
81 /// * `colors` - Colors for terminal output
82 /// * `verbosity` - Verbosity level for output
83 ///
84 /// # Returns
85 ///
86 /// A new `ClaudeParser` instance
87 ///
88 /// # Example
89 ///
90 /// ```ignore
91 /// use ralph_workflow::json_parser::ClaudeParser;
92 /// use ralph_workflow::logger::Colors;
93 /// use ralph_workflow::config::Verbosity;
94 ///
95 /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
96 /// ```
97 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
98 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
99 }
100
101 /// Create a new `ClaudeParser` with a custom printer.
102 ///
103 /// # Arguments
104 ///
105 /// * `colors` - Colors for terminal output
106 /// * `verbosity` - Verbosity level for output
107 /// * `printer` - Shared printer for output
108 ///
109 /// # Returns
110 ///
111 /// A new `ClaudeParser` instance
112 pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
113 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
114 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
115
116 // Use the printer's is_terminal method to validate it's connected correctly
117 // This is a sanity check that also satisfies the compiler that the method is used
118 let _printer_is_terminal = printer.borrow().is_terminal();
119
120 Self {
121 colors,
122 verbosity,
123 log_path: None,
124 display_name: "Claude".to_string(),
125 streaming_session: Rc::new(RefCell::new(streaming_session)),
126 terminal_mode: RefCell::new(TerminalMode::detect()),
127 show_streaming_metrics: false,
128 printer,
129 thinking_active_index: RefCell::new(None),
130 thinking_non_tty_indices: RefCell::new(std::collections::BTreeSet::new()),
131 suppress_thinking_for_message: RefCell::new(false),
132 text_line_active: RefCell::new(false),
133 cursor_up_active: RefCell::new(false),
134 last_rendered_content: RefCell::new(std::collections::HashMap::new()),
135 }
136 }
137
138 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
139 self.show_streaming_metrics = show;
140 self
141 }
142
143 /// Set the display name for this parser.
144 ///
145 /// # Arguments
146 ///
147 /// * `display_name` - The name to display in output
148 ///
149 /// # Returns
150 ///
151 /// Self for builder pattern chaining
152 pub fn with_display_name(mut self, display_name: &str) -> Self {
153 self.display_name = display_name.to_string();
154 self
155 }
156
157 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
158 self.log_path = Some(std::path::PathBuf::from(path));
159 self
160 }
161
162 /// Set the terminal mode for this parser.
163 ///
164 /// # Arguments
165 ///
166 /// * `mode` - The terminal mode to use
167 ///
168 /// # Returns
169 ///
170 /// Self for builder pattern chaining
171 #[cfg(any(test, feature = "test-utils"))]
172 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
173 *self.terminal_mode.borrow_mut() = mode;
174 self
175 }
176
177 /// Get a shared reference to the printer.
178 ///
179 /// This allows tests, monitoring, and other code to access the printer after parsing
180 /// to verify output content, check for duplicates, or capture output for analysis.
181 ///
182 /// # Returns
183 ///
184 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
185 ///
186 /// # Example
187 ///
188 /// ```ignore
189 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
190 /// use std::rc::Rc;
191 /// use std::cell::RefCell;
192 ///
193 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
194 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
195 ///
196 /// // Parse events...
197 ///
198 /// // Now access the printer to verify output
199 /// let printer_ref = parser.printer().borrow();
200 /// assert!(!printer_ref.has_duplicate_consecutive_lines());
201 /// ```
202 /// Get a clone of the printer used by this parser.
203 ///
204 /// This is primarily useful for integration tests and monitoring in this repository.
205 /// Only available with the `test-utils` feature.
206 ///
207 /// Note: downstream crates should avoid relying on this API in production builds.
208 #[cfg(any(test, feature = "test-utils"))]
209 pub fn printer(&self) -> SharedPrinter {
210 Rc::clone(&self.printer)
211 }
212
213 /// Get streaming quality metrics from the current session.
214 ///
215 /// This provides insight into the deduplication and streaming quality of the
216 /// parsing session, including:
217 /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
218 /// - Number of large deltas (potential protocol violations)
219 /// - Total deltas processed
220 ///
221 /// Useful for testing, monitoring, and debugging streaming behavior.
222 /// Only available with the `test-utils` feature.
223 ///
224 /// # Returns
225 ///
226 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
227 ///
228 /// # Example
229 ///
230 /// ```ignore
231 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
232 /// use std::rc::Rc;
233 /// use std::cell::RefCell;
234 ///
235 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
236 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
237 ///
238 /// // Parse events...
239 ///
240 /// // Verify deduplication logic triggered
241 /// let metrics = parser.streaming_metrics();
242 /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
243 /// ```
244 #[cfg(any(test, feature = "test-utils"))]
245 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
246 self.streaming_session
247 .borrow()
248 .get_streaming_quality_metrics()
249 }
250
251 /// Parse and display a single Claude JSON event
252 ///
253 /// Returns `Some(formatted_output)` for valid events, or None for:
254 /// - Malformed JSON (logged at debug level)
255 /// - Unknown event types
256 /// - Empty or whitespace-only output
257 pub fn parse_event(&self, line: &str) -> Option<String> {
258 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
259 e
260 } else {
261 // Non-JSON line - could be raw text output from agent
262 // Pass through as-is if it looks like real output (not empty)
263 let trimmed = line.trim();
264 if !trimmed.is_empty() && !trimmed.starts_with('{') {
265 // In full TTY mode, thinking deltas keep the cursor on the thinking line for
266 // in-place updates. Any other output must first finalize that cursor state.
267 let mut session = self.streaming_session.borrow_mut();
268 let finalize = self.finalize_in_place_full_mode(&mut session);
269 let out = format!("{finalize}{trimmed}\n");
270 if *self.terminal_mode.borrow() == TerminalMode::Full {
271 // Only mutate cursor state based on explicit cursor controls.
272 // Normal output may include newlines, but does not reliably indicate whether
273 // we are still in the in-place streaming cursor-up position.
274 let mut cursor_up_active = self.cursor_up_active.borrow_mut();
275 if out.contains("\x1b[1B\n") {
276 *cursor_up_active = false;
277 }
278 if out.contains("\x1b[1A") {
279 *cursor_up_active = true;
280 }
281 }
282 return Some(out);
283 }
284 return None;
285 };
286
287 // When a thinking/text line is being streamed in full TTY mode, the streaming
288 // implementation is append-only and keeps the cursor on the current line.
289 //
290 // Any non-stream event (system/user/assistant/result) must first finalize the
291 // active streaming line so the next output does not glue onto it.
292 let finalize = if matches!(&event, ClaudeEvent::StreamEvent { .. }) {
293 String::new()
294 } else {
295 let mut session = self.streaming_session.borrow_mut();
296 self.finalize_in_place_full_mode(&mut session)
297 };
298 let c = &self.colors;
299 let prefix = &self.display_name;
300
301 let output = match event {
302 ClaudeEvent::System {
303 subtype,
304 session_id,
305 cwd,
306 } => self.format_system_event(subtype.as_ref(), session_id, cwd),
307 ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
308 ClaudeEvent::User { message } => self.format_user_event(message),
309 ClaudeEvent::Result {
310 subtype,
311 duration_ms,
312 total_cost_usd,
313 num_turns,
314 result,
315 error,
316 } => self.format_result_event(
317 subtype,
318 duration_ms,
319 total_cost_usd,
320 num_turns,
321 result,
322 error,
323 ),
324 ClaudeEvent::StreamEvent { event } => {
325 // Handle streaming events for delta/partial updates
326 self.parse_stream_event(event)
327 }
328 ClaudeEvent::Unknown => {
329 // Use the generic unknown event formatter for consistent handling
330 // In verbose mode, this will show the event type and key fields
331 // In normal mode, this returns empty string
332 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
333 }
334 };
335
336 // IMPORTANT: We must preserve any completion output from `finalize_in_place_full_mode`
337 // even if the current event itself produces no visible output.
338 //
339 // Example: the final "assistant" event can be deduplicated (empty output) after
340 // streaming deltas have already been shown. If we drop `finalize` in that case,
341 // the streamed line never receives its completion newline and subsequent output
342 // (e.g., system `status`) can clear/overwrite it.
343 let output = if output.is_empty() {
344 finalize
345 } else {
346 format!("{finalize}{output}")
347 };
348
349 if output.is_empty() {
350 None
351 } else {
352 if *self.terminal_mode.borrow() == TerminalMode::Full {
353 // Keep a simple, output-driven model of cursor state.
354 //
355 // The streaming implementation is append-only and SHOULD NOT emit cursor-up
356 // sequences ("\x1b[1A") for deltas. We only treat explicit cursor completion
357 // sequences ("\x1b[1B\n") as authoritative for clearing the defensive flag.
358 //
359 // Note: raw passthrough output may include escape sequences; we avoid inferring
360 // "cursor is up" from output content to keep this logic robust.
361 let mut cursor_up_active = self.cursor_up_active.borrow_mut();
362 if output.contains("\x1b[1B\n") {
363 *cursor_up_active = false;
364 }
365 }
366 Some(output)
367 }
368 }
369
370 /// Parse a streaming event for delta/partial updates
371 ///
372 /// Handles the nested events within `stream_event`:
373 /// - MessageStart/Stop: Manage session state
374 /// - `ContentBlockStart`: Initialize new content blocks
375 /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
376 /// - `ContentBlockStop`: Finalize content blocks
377 /// - `MessageDelta`: Process message metadata without output
378 /// - Error: Display appropriately
379 ///
380 /// Returns String for display content, empty String for control events.
381 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
382 let mut session = self.streaming_session.borrow_mut();
383
384 match event {
385 StreamInnerEvent::MessageStart {
386 message,
387 message_id,
388 } => {
389 // Protocol violations happen in real streams: a new MessageStart can arrive
390 // while a previous streamed line is still "active" (we haven't yet emitted the
391 // completion newline). Finalize any active streaming line before resetting state
392 // so subsequent output doesn't glue onto the in-progress line.
393 let in_place_finalize = self.finalize_in_place_full_mode(&mut session);
394
395 // Reset any pending thinking line from a previous message.
396 *self.thinking_active_index.borrow_mut() = None;
397 self.thinking_non_tty_indices.borrow_mut().clear();
398 *self.suppress_thinking_for_message.borrow_mut() = false;
399 *self.text_line_active.borrow_mut() = false;
400 *self.cursor_up_active.borrow_mut() = false;
401
402 // Extract message_id from either the top-level field or nested message.id
403 // The Claude API typically puts the ID in message.id, not at the top level
404 let effective_message_id =
405 message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
406 // Set message ID for tracking and clear session state on new message
407 session.set_current_message_id(effective_message_id);
408 session.on_message_start();
409 // Clear last rendered content for append-only pattern on new message
410 self.last_rendered_content.borrow_mut().clear();
411 in_place_finalize
412 }
413 StreamInnerEvent::ContentBlockStart {
414 index: Some(index),
415 content_block: Some(block),
416 } => {
417 // Initialize a new content block at this index
418 session.on_content_block_start(index);
419 match &block {
420 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
421 // Initial text in ContentBlockStart - treat as first delta
422 session.on_text_delta(index, t);
423 }
424 ContentBlock::ToolUse { name, input } => {
425 // Track tool name for GLM/CCS deduplication.
426 // IMPORTANT: Track the tool name when provided, even when input is None.
427 // GLM may send ContentBlockStart with name but no input, then send input via delta.
428 // We only store when we have a name to avoid overwriting a previous tool name with None.
429 if let Some(n) = name {
430 session.set_tool_name(index, Some(n.clone()));
431 }
432
433 // Initialize tool input accumulator only if input is present
434 if let Some(i) = input {
435 let input_str = if let serde_json::Value::String(s) = &i {
436 s.clone()
437 } else {
438 format_tool_input(i)
439 };
440 session.on_tool_input_delta(index, &input_str);
441 }
442 }
443 _ => {}
444 }
445 String::new()
446 }
447 StreamInnerEvent::ContentBlockStart {
448 index: Some(index),
449 content_block: None,
450 } => {
451 // Content block started but no initial content provided
452 session.on_content_block_start(index);
453 String::new()
454 }
455 StreamInnerEvent::ContentBlockStart { .. } => {
456 // Content block without index - ignore
457 String::new()
458 }
459 StreamInnerEvent::ContentBlockDelta {
460 index: Some(index),
461 delta: Some(delta),
462 } => self.handle_content_block_delta(&mut session, index, delta),
463 StreamInnerEvent::TextDelta { text: Some(text) } => {
464 self.handle_text_delta(&mut session, &text)
465 }
466 StreamInnerEvent::ContentBlockStop { .. } => {
467 // Content block completion event - no output needed
468 // This event marks the end of a content block but doesn't produce
469 // any displayable content. It's a control event for state management.
470 String::new()
471 }
472 StreamInnerEvent::MessageDelta { .. } => {
473 // Message delta event with usage/metadata - no output needed
474 // This event contains final message metadata (stop_reason, usage stats)
475 // but is used for tracking/monitoring purposes only, not display.
476 String::new()
477 }
478 StreamInnerEvent::ContentBlockDelta { .. }
479 | StreamInnerEvent::Ping
480 | StreamInnerEvent::TextDelta { text: None }
481 | StreamInnerEvent::Error { error: None } => String::new(),
482 StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
483 StreamInnerEvent::Error {
484 error: Some(err), ..
485 } => self.handle_error_event(err),
486 StreamInnerEvent::Unknown => self.handle_unknown_event(),
487 }
488 }
489}