ralph_workflow/json_parser/claude.rs
1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//! creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46#[cfg(feature = "test-utils")]
47use super::health::StreamingQualityMetrics;
48use super::printer::SharedPrinter;
49use super::streaming_state::StreamingSession;
50use super::terminal::TerminalMode;
51use super::types::{
52 format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
53 ContentType, StreamInnerEvent,
54};
55
56/// Claude event parser
57///
58/// Note: This parser is designed for single-threaded use only.
59/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
60/// Do not share this parser across threads.
61pub struct ClaudeParser {
62 colors: Colors,
63 pub(crate) verbosity: Verbosity,
64 log_file: Option<String>,
65 display_name: String,
66 /// Unified streaming session tracker
67 /// Provides single source of truth for streaming state across all content types
68 streaming_session: Rc<RefCell<StreamingSession>>,
69 /// Terminal mode for output formatting
70 /// Detected at parse time and cached for performance
71 terminal_mode: RefCell<TerminalMode>,
72 /// Whether to show streaming quality metrics
73 show_streaming_metrics: bool,
74 /// Output printer for capturing or displaying output
75 printer: SharedPrinter,
76}
77
78impl ClaudeParser {
79 /// Create a new `ClaudeParser` with the given colors and verbosity.
80 ///
81 /// # Arguments
82 ///
83 /// * `colors` - Colors for terminal output
84 /// * `verbosity` - Verbosity level for output
85 ///
86 /// # Returns
87 ///
88 /// A new `ClaudeParser` instance
89 ///
90 /// # Example
91 ///
92 /// ```ignore
93 /// use ralph_workflow::json_parser::ClaudeParser;
94 /// use ralph_workflow::logger::Colors;
95 /// use ralph_workflow::config::Verbosity;
96 ///
97 /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
98 /// ```
99 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
100 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
101 }
102
103 /// Create a new `ClaudeParser` with a custom printer.
104 ///
105 /// # Arguments
106 ///
107 /// * `colors` - Colors for terminal output
108 /// * `verbosity` - Verbosity level for output
109 /// * `printer` - Shared printer for output
110 ///
111 /// # Returns
112 ///
113 /// A new `ClaudeParser` instance
114 pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
115 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
116 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
117
118 // Use the printer's is_terminal method to validate it's connected correctly
119 // This is a sanity check that also satisfies the compiler that the method is used
120 let _printer_is_terminal = printer.borrow().is_terminal();
121
122 Self {
123 colors,
124 verbosity,
125 log_file: None,
126 display_name: "Claude".to_string(),
127 streaming_session: Rc::new(RefCell::new(streaming_session)),
128 terminal_mode: RefCell::new(TerminalMode::detect()),
129 show_streaming_metrics: false,
130 printer,
131 }
132 }
133
134 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
135 self.show_streaming_metrics = show;
136 self
137 }
138
139 /// Set the display name for this parser.
140 ///
141 /// # Arguments
142 ///
143 /// * `display_name` - The name to display in output
144 ///
145 /// # Returns
146 ///
147 /// Self for builder pattern chaining
148 pub fn with_display_name(mut self, display_name: &str) -> Self {
149 self.display_name = display_name.to_string();
150 self
151 }
152
153 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
154 self.log_file = Some(path.to_string());
155 self
156 }
157
158 /// Set the terminal mode for this parser.
159 ///
160 /// # Arguments
161 ///
162 /// * `mode` - The terminal mode to use
163 ///
164 /// # Returns
165 ///
166 /// Self for builder pattern chaining
167 #[cfg(any(test, feature = "test-utils"))]
168 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
169 *self.terminal_mode.borrow_mut() = mode;
170 self
171 }
172
173 /// Get a shared reference to the printer.
174 ///
175 /// This allows tests, monitoring, and other code to access the printer after parsing
176 /// to verify output content, check for duplicates, or capture output for analysis.
177 ///
178 /// # Returns
179 ///
180 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
181 ///
182 /// # Example
183 ///
184 /// ```ignore
185 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
186 /// use std::rc::Rc;
187 /// use std::cell::RefCell;
188 ///
189 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
190 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
191 ///
192 /// // Parse events...
193 ///
194 /// // Now access the printer to verify output
195 /// let printer_ref = parser.printer().borrow();
196 /// assert!(!printer_ref.has_duplicate_consecutive_lines());
197 /// ```
198 /// Get a clone of the printer used by this parser.
199 ///
200 /// This is primarily useful for testing and monitoring.
201 /// Only available with the `test-utils` feature.
202 #[cfg(feature = "test-utils")]
203 pub fn printer(&self) -> SharedPrinter {
204 Rc::clone(&self.printer)
205 }
206
207 /// Get streaming quality metrics from the current session.
208 ///
209 /// This provides insight into the deduplication and streaming quality of the
210 /// parsing session, including:
211 /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
212 /// - Number of large deltas (potential protocol violations)
213 /// - Total deltas processed
214 ///
215 /// Useful for testing, monitoring, and debugging streaming behavior.
216 /// Only available with the `test-utils` feature.
217 ///
218 /// # Returns
219 ///
220 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
221 ///
222 /// # Example
223 ///
224 /// ```ignore
225 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
226 /// use std::rc::Rc;
227 /// use std::cell::RefCell;
228 ///
229 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
230 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
231 ///
232 /// // Parse events...
233 ///
234 /// // Verify deduplication logic triggered
235 /// let metrics = parser.streaming_metrics();
236 /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
237 /// ```
238 #[cfg(feature = "test-utils")]
239 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
240 self.streaming_session
241 .borrow()
242 .get_streaming_quality_metrics()
243 }
244
245 /// Parse and display a single Claude JSON event
246 ///
247 /// Returns `Some(formatted_output)` for valid events, or None for:
248 /// - Malformed JSON (logged at debug level)
249 /// - Unknown event types
250 /// - Empty or whitespace-only output
251 pub fn parse_event(&self, line: &str) -> Option<String> {
252 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
253 e
254 } else {
255 // Non-JSON line - could be raw text output from agent
256 // Pass through as-is if it looks like real output (not empty)
257 let trimmed = line.trim();
258 if !trimmed.is_empty() && !trimmed.starts_with('{') {
259 return Some(format!("{trimmed}\n"));
260 }
261 return None;
262 };
263 let c = &self.colors;
264 let prefix = &self.display_name;
265
266 let output = match event {
267 ClaudeEvent::System {
268 subtype,
269 session_id,
270 cwd,
271 } => self.format_system_event(subtype.as_ref(), session_id, cwd),
272 ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
273 ClaudeEvent::User { message } => self.format_user_event(message),
274 ClaudeEvent::Result {
275 subtype,
276 duration_ms,
277 total_cost_usd,
278 num_turns,
279 result,
280 error,
281 } => self.format_result_event(
282 subtype,
283 duration_ms,
284 total_cost_usd,
285 num_turns,
286 result,
287 error,
288 ),
289 ClaudeEvent::StreamEvent { event } => {
290 // Handle streaming events for delta/partial updates
291 self.parse_stream_event(event)
292 }
293 ClaudeEvent::Unknown => {
294 // Use the generic unknown event formatter for consistent handling
295 // In verbose mode, this will show the event type and key fields
296 // In normal mode, this returns empty string
297 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
298 }
299 };
300
301 if output.is_empty() {
302 None
303 } else {
304 Some(output)
305 }
306 }
307
308 /// Parse a streaming event for delta/partial updates
309 ///
310 /// Handles the nested events within `stream_event`:
311 /// - MessageStart/Stop: Manage session state
312 /// - `ContentBlockStart`: Initialize new content blocks
313 /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
314 /// - `ContentBlockStop`: Finalize content blocks
315 /// - `MessageDelta`: Process message metadata without output
316 /// - Error: Display appropriately
317 ///
318 /// Returns String for display content, empty String for control events.
319 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
320 let mut session = self.streaming_session.borrow_mut();
321
322 match event {
323 StreamInnerEvent::MessageStart {
324 message,
325 message_id,
326 } => {
327 // Extract message_id from either the top-level field or nested message.id
328 // The Claude API typically puts the ID in message.id, not at the top level
329 let effective_message_id =
330 message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
331 // Set message ID for tracking and clear session state on new message
332 session.set_current_message_id(effective_message_id);
333 session.on_message_start();
334 String::new()
335 }
336 StreamInnerEvent::ContentBlockStart {
337 index: Some(index),
338 content_block: Some(block),
339 } => {
340 // Initialize a new content block at this index
341 session.on_content_block_start(index);
342 match &block {
343 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
344 // Initial text in ContentBlockStart - treat as first delta
345 session.on_text_delta(index, t);
346 }
347 ContentBlock::ToolUse {
348 name: _,
349 input: Some(i),
350 } => {
351 // Initialize tool input accumulator
352 let input_str = if let serde_json::Value::String(s) = &i {
353 s.clone()
354 } else {
355 format_tool_input(i)
356 };
357 session.on_tool_input_delta(index, &input_str);
358 }
359 _ => {}
360 }
361 String::new()
362 }
363 StreamInnerEvent::ContentBlockStart {
364 index: Some(index),
365 content_block: None,
366 } => {
367 // Content block started but no initial content provided
368 session.on_content_block_start(index);
369 String::new()
370 }
371 StreamInnerEvent::ContentBlockStart { .. } => {
372 // Content block without index - ignore
373 String::new()
374 }
375 StreamInnerEvent::ContentBlockDelta {
376 index: Some(index),
377 delta: Some(delta),
378 } => self.handle_content_block_delta(&mut session, index, delta),
379 StreamInnerEvent::TextDelta { text: Some(text) } => {
380 self.handle_text_delta(&mut session, &text)
381 }
382 StreamInnerEvent::ContentBlockStop { .. } => {
383 // Content block completion event - no output needed
384 // This event marks the end of a content block but doesn't produce
385 // any displayable content. It's a control event for state management.
386 String::new()
387 }
388 StreamInnerEvent::MessageDelta { .. } => {
389 // Message delta event with usage/metadata - no output needed
390 // This event contains final message metadata (stop_reason, usage stats)
391 // but is used for tracking/monitoring purposes only, not display.
392 String::new()
393 }
394 StreamInnerEvent::ContentBlockDelta { .. }
395 | StreamInnerEvent::Ping
396 | StreamInnerEvent::TextDelta { text: None }
397 | StreamInnerEvent::Error { error: None } => String::new(),
398 StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
399 StreamInnerEvent::Error {
400 error: Some(err), ..
401 } => self.handle_error_event(err),
402 StreamInnerEvent::Unknown => self.handle_unknown_event(),
403 }
404 }
405
406 /// Format a system event
407 fn format_system_event(
408 &self,
409 subtype: Option<&String>,
410 session_id: Option<String>,
411 cwd: Option<String>,
412 ) -> String {
413 let c = &self.colors;
414 let prefix = &self.display_name;
415
416 if subtype.map(std::string::String::as_str) == Some("init") {
417 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
418 let mut out = format!(
419 "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
420 c.dim(),
421 prefix,
422 c.reset(),
423 c.cyan(),
424 c.reset(),
425 c.dim(),
426 sid,
427 c.reset()
428 );
429 if let Some(cwd) = cwd {
430 let _ = writeln!(
431 out,
432 "{}[{}]{} {}Working dir: {}{}",
433 c.dim(),
434 prefix,
435 c.reset(),
436 c.dim(),
437 cwd,
438 c.reset()
439 );
440 }
441 out
442 } else {
443 format!(
444 "{}[{}]{} {}{}{}\n",
445 c.dim(),
446 prefix,
447 c.reset(),
448 c.cyan(),
449 subtype.map_or("system", |s| s.as_str()),
450 c.reset()
451 )
452 }
453 }
454
455 /// Extract text content from a message for hash-based deduplication.
456 fn extract_text_content_for_hash(
457 message: Option<&crate::json_parser::types::AssistantMessage>,
458 ) -> Option<String> {
459 message?.content.as_ref().map(|content| {
460 content
461 .iter()
462 .filter_map(|block| {
463 if let ContentBlock::Text { text } = block {
464 text.as_deref()
465 } else {
466 None
467 }
468 })
469 .collect::<Vec<_>>()
470 .join("")
471 })
472 }
473
474 /// Check if this assistant message is a duplicate of already-streamed content.
475 fn is_duplicate_assistant_message(
476 &self,
477 message: Option<&crate::json_parser::types::AssistantMessage>,
478 ) -> bool {
479 let session = self.streaming_session.borrow();
480
481 // Extract message_id from the assistant message
482 let assistant_msg_id = message.and_then(|m| m.id.as_ref());
483
484 // Check if this assistant event has a message_id that matches the current streaming message
485 // If it does, and we have streamed content, then this assistant event is a duplicate
486 // because the content was already streamed via deltas.
487 if let Some(ast_msg_id) = assistant_msg_id {
488 // Check if message was already marked as displayed (after message_stop)
489 if session.is_duplicate_final_message(ast_msg_id) {
490 return true;
491 }
492
493 // Check if the assistant message_id matches the current streaming message_id
494 if session.get_current_message_id() == Some(ast_msg_id) {
495 // Same message - check if we have streamed any content
496 // If yes, the assistant event is a duplicate
497 if session.has_any_streamed_content() {
498 return true;
499 }
500 }
501 }
502
503 // If no message_id match, fall back to hash-based deduplication
504 let text_content_for_hash = Self::extract_text_content_for_hash(message);
505 if let Some(ref text_content) = text_content_for_hash {
506 if !text_content.is_empty() {
507 return session.is_duplicate_by_hash(text_content);
508 }
509 }
510
511 // Fallback to coarse check
512 session.has_any_streamed_content()
513 }
514
515 /// Format a text content block for assistant output.
516 fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
517 let limit = self.verbosity.truncate_limit("text");
518 let preview = truncate_text(text, limit);
519 let _ = writeln!(
520 out,
521 "{}[{}]{} {}{}{}",
522 colors.dim(),
523 prefix,
524 colors.reset(),
525 colors.white(),
526 preview,
527 colors.reset()
528 );
529 }
530
531 /// Format a tool use content block for assistant output.
532 fn format_tool_use_block(
533 &self,
534 out: &mut String,
535 tool: Option<&String>,
536 input: Option<&serde_json::Value>,
537 prefix: &str,
538 colors: Colors,
539 ) {
540 let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
541 let _ = writeln!(
542 out,
543 "{}[{}]{} {}Tool{}: {}{}{}",
544 colors.dim(),
545 prefix,
546 colors.reset(),
547 colors.magenta(),
548 colors.reset(),
549 colors.bold(),
550 tool_name,
551 colors.reset(),
552 );
553
554 // Show tool input details at Normal and above (not just Verbose)
555 // Tool inputs provide crucial context for understanding agent actions
556 if self.verbosity.show_tool_input() {
557 if let Some(input_val) = input {
558 let input_str = format_tool_input(input_val);
559 let limit = self.verbosity.truncate_limit("tool_input");
560 let preview = truncate_text(&input_str, limit);
561 if !preview.is_empty() {
562 let _ = writeln!(
563 out,
564 "{}[{}]{} {} └─ {}{}",
565 colors.dim(),
566 prefix,
567 colors.reset(),
568 colors.dim(),
569 preview,
570 colors.reset()
571 );
572 }
573 }
574 }
575 }
576
577 /// Format a tool result content block for assistant output.
578 fn format_tool_result_block(
579 &self,
580 out: &mut String,
581 content: &serde_json::Value,
582 prefix: &str,
583 colors: Colors,
584 ) {
585 let content_str = match content {
586 serde_json::Value::String(s) => s.clone(),
587 other => other.to_string(),
588 };
589 let limit = self.verbosity.truncate_limit("tool_result");
590 let preview = truncate_text(&content_str, limit);
591 let _ = writeln!(
592 out,
593 "{}[{}]{} {}Result:{} {}",
594 colors.dim(),
595 prefix,
596 colors.reset(),
597 colors.dim(),
598 colors.reset(),
599 preview
600 );
601 }
602
603 /// Format all content blocks from an assistant message.
604 fn format_content_blocks(
605 &self,
606 out: &mut String,
607 content: &[ContentBlock],
608 prefix: &str,
609 colors: Colors,
610 ) {
611 for block in content {
612 match block {
613 ContentBlock::Text { text } => {
614 if let Some(text) = text {
615 self.format_text_block(out, text, prefix, colors);
616 }
617 }
618 ContentBlock::ToolUse { name, input } => {
619 self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
620 }
621 ContentBlock::ToolResult { content } => {
622 if let Some(content) = content {
623 self.format_tool_result_block(out, content, prefix, colors);
624 }
625 }
626 ContentBlock::Unknown => {}
627 }
628 }
629 }
630
631 /// Format an assistant event
632 fn format_assistant_event(
633 &self,
634 message: Option<crate::json_parser::types::AssistantMessage>,
635 ) -> String {
636 // CRITICAL FIX: When ANY content has been streamed via deltas,
637 // the Assistant event should NOT display it again.
638 // The Assistant event represents the "complete" message, but if we've
639 // already shown the streaming deltas, showing it again causes duplication.
640 if self.is_duplicate_assistant_message(message.as_ref()) {
641 return String::new();
642 }
643
644 let mut out = String::new();
645 if let Some(ref msg) = message {
646 if let Some(ref content) = msg.content {
647 self.format_content_blocks(&mut out, content, &self.display_name, self.colors);
648
649 // If we successfully rendered content, mark the message as pre-rendered
650 // so that ALL subsequent streaming deltas for this message are suppressed.
651 // This handles the case where assistant event arrives BEFORE streaming starts.
652 if !out.is_empty() {
653 if let Some(ref message_id) = msg.id {
654 let mut session = self.streaming_session.borrow_mut();
655 session.mark_message_pre_rendered(message_id);
656 }
657 }
658 }
659 }
660 out
661 }
662
663 /// Format a user event
664 fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
665 let c = &self.colors;
666 let prefix = &self.display_name;
667
668 if let Some(msg) = message {
669 if let Some(content) = msg.content {
670 if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
671 let limit = self.verbosity.truncate_limit("user");
672 let preview = truncate_text(text, limit);
673 return format!(
674 "{}[{}]{} {}User{}: {}{}{}\n",
675 c.dim(),
676 prefix,
677 c.reset(),
678 c.blue(),
679 c.reset(),
680 c.dim(),
681 preview,
682 c.reset()
683 );
684 }
685 }
686 }
687 String::new()
688 }
689
690 /// Format a result event
691 fn format_result_event(
692 &self,
693 subtype: Option<String>,
694 duration_ms: Option<u64>,
695 total_cost_usd: Option<f64>,
696 num_turns: Option<u32>,
697 result: Option<String>,
698 error: Option<String>,
699 ) -> String {
700 let c = &self.colors;
701 let prefix = &self.display_name;
702
703 let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
704 let duration_m = duration_total_secs / 60;
705 let duration_s_rem = duration_total_secs % 60;
706 let cost = total_cost_usd.unwrap_or(0.0);
707 let turns = num_turns.unwrap_or(0);
708
709 let mut out = if subtype.as_deref() == Some("success") {
710 format!(
711 "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
712 c.dim(),
713 prefix,
714 c.reset(),
715 c.green(),
716 CHECK,
717 c.reset(),
718 c.dim(),
719 duration_m,
720 duration_s_rem,
721 turns,
722 cost,
723 c.reset()
724 )
725 } else {
726 let err = error.unwrap_or_else(|| "unknown error".to_string());
727 format!(
728 "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
729 c.dim(),
730 prefix,
731 c.reset(),
732 c.red(),
733 CROSS,
734 subtype.unwrap_or_else(|| "error".to_string()),
735 c.reset(),
736 err,
737 c.dim(),
738 duration_m,
739 duration_s_rem,
740 c.reset()
741 )
742 };
743
744 if let Some(result) = result {
745 let limit = self.verbosity.truncate_limit("result");
746 let preview = truncate_text(&result, limit);
747 let _ = writeln!(
748 out,
749 "\n{}Result summary:{}\n{}{}{}",
750 c.bold(),
751 c.reset(),
752 c.dim(),
753 preview,
754 c.reset()
755 );
756 }
757 out
758 }
759
760 /// Handle content block delta events
761 fn handle_content_block_delta(
762 &self,
763 session: &mut std::cell::RefMut<'_, StreamingSession>,
764 index: u64,
765 delta: ContentBlockDelta,
766 ) -> String {
767 let c = &self.colors;
768 let prefix = &self.display_name;
769
770 match delta {
771 ContentBlockDelta::TextDelta { text: Some(text) } => {
772 let index_str = index.to_string();
773
774 // Track this delta with StreamingSession for state management.
775 //
776 // StreamingSession handles protocol/streaming quality concerns (including
777 // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
778 // whether a prefix should be displayed for this stream.
779 //
780 // The parser layer still applies additional deduplication:
781 // - Skip whitespace-only accumulated output
782 // - Hash-based deduplication after sanitization (whitespace-insensitive)
783 let show_prefix = session.on_text_delta(index, &text);
784
785 // Get accumulated text for streaming display
786 let accumulated_text = session
787 .get_accumulated(ContentType::Text, &index_str)
788 .unwrap_or("");
789
790 // Check if this message was pre-rendered from an assistant event.
791 // When an assistant event arrives BEFORE streaming deltas, we render it
792 // and mark the message_id as pre-rendered. ALL subsequent streaming deltas
793 // for this message should be suppressed to prevent duplication.
794 if let Some(message_id) = session.get_current_message_id() {
795 if session.is_message_pre_rendered(message_id) {
796 return String::new();
797 }
798 }
799
800 // Sanitize the accumulated text to check if it's empty
801 // This is needed to skip rendering when the accumulated content is just whitespace
802 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
803
804 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
805 // This prevents rendering empty lines when the accumulated content is just whitespace
806 if sanitized_text.is_empty() {
807 return String::new();
808 }
809
810 // Check if this sanitized content has already been rendered
811 // This prevents duplicates when accumulated content differs only by whitespace
812 if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
813 {
814 return String::new();
815 }
816
817 // Use TextDeltaRenderer for consistent rendering
818 let terminal_mode = *self.terminal_mode.borrow();
819
820 // Use prefix trie to detect if new content extends previously rendered content
821 // If yes, we do an in-place update (carriage return + new content)
822 let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
823
824 let output = if show_prefix && !has_prefix {
825 // First delta with no prefix match - use the renderer with prefix
826 TextDeltaRenderer::render_first_delta(
827 accumulated_text,
828 prefix,
829 *c,
830 terminal_mode,
831 )
832 } else {
833 // Either continuation OR prefix match - use renderer for in-place update
834 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
835 TextDeltaRenderer::render_subsequent_delta(
836 accumulated_text,
837 prefix,
838 *c,
839 terminal_mode,
840 )
841 };
842
843 // Mark this sanitized content as rendered for future duplicate detection
844 // We use the sanitized text (not the rendered output) to avoid false positives
845 // when the same accumulated text is rendered with different terminal modes
846 session.mark_rendered(ContentType::Text, &index_str);
847 session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
848
849 output
850 }
851 ContentBlockDelta::ThinkingDelta {
852 thinking: Some(text),
853 } => {
854 // Track thinking deltas
855 session.on_thinking_delta(index, &text);
856 // Display thinking with visual distinction
857 Self::formatter().format_thinking(text.as_str(), prefix, *c)
858 }
859 ContentBlockDelta::ToolUseDelta {
860 tool_use: Some(tool_delta),
861 } => {
862 // Handle tool input streaming
863 // Extract the tool input from the delta
864 let input_str =
865 tool_delta
866 .get("input")
867 .map_or_else(String::new, |input| match input {
868 serde_json::Value::String(s) => s.clone(),
869 other => format_tool_input(other),
870 });
871
872 if input_str.is_empty() {
873 String::new()
874 } else {
875 // Accumulate tool input
876 session.on_tool_input_delta(index, &input_str);
877
878 // Show partial tool input in real-time
879 let formatter = DeltaDisplayFormatter::new();
880 formatter.format_tool_input(&input_str, prefix, *c)
881 }
882 }
883 _ => String::new(),
884 }
885 }
886
887 /// Handle text delta events
888 fn handle_text_delta(
889 &self,
890 session: &mut std::cell::RefMut<'_, StreamingSession>,
891 text: &str,
892 ) -> String {
893 let c = &self.colors;
894 let prefix = &self.display_name;
895
896 // Standalone text delta (not part of content block)
897 // Use default index "0" for standalone text
898 let default_index = 0u64;
899 let default_index_str = "0";
900
901 // Track this delta with StreamingSession for state management.
902 //
903 // StreamingSession handles protocol/streaming quality concerns (including
904 // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
905 // whether a prefix should be displayed for this stream.
906 //
907 // The parser layer still applies additional deduplication:
908 // - Skip whitespace-only accumulated output
909 // - Hash-based deduplication after sanitization (whitespace-insensitive)
910 let show_prefix = session.on_text_delta(default_index, text);
911
912 // Get accumulated text for streaming display
913 let accumulated_text = session
914 .get_accumulated(ContentType::Text, default_index_str)
915 .unwrap_or("");
916
917 // Sanitize the accumulated text to check if it's empty
918 // This is needed to skip rendering when the accumulated content is just whitespace
919 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
920
921 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
922 // This prevents rendering empty lines when the accumulated content is just whitespace
923 if sanitized_text.is_empty() {
924 return String::new();
925 }
926
927 // Check if this sanitized content has already been rendered
928 // This prevents duplicates when accumulated content differs only by whitespace
929 if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
930 return String::new();
931 }
932
933 // Use TextDeltaRenderer for consistent rendering across all parsers
934 let terminal_mode = *self.terminal_mode.borrow();
935
936 // Use prefix trie to detect if new content extends previously rendered content
937 // If yes, we do an in-place update (carriage return + new content)
938 let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
939
940 let output = if show_prefix && !has_prefix {
941 // First delta with no prefix match - use the renderer with prefix
942 TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
943 } else {
944 // Either continuation OR prefix match - use renderer for in-place update
945 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
946 TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
947 };
948
949 // Mark this sanitized content as rendered for future duplicate detection
950 // We use the sanitized text (not the rendered output) to avoid false positives
951 // when the same accumulated text is rendered with different terminal modes
952 session.mark_rendered(ContentType::Text, default_index_str);
953 session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
954
955 output
956 }
957
958 /// Handle message stop events
959 fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
960 let c = &self.colors;
961
962 // Message complete - add final newline if we were in a content block
963 // OR if any content was streamed (handles edge cases where block state
964 // may not have been set but content was still streamed)
965 let metrics = session.get_streaming_quality_metrics();
966 let was_in_block = session.on_message_stop();
967 let had_content = session.has_any_streamed_content();
968 if was_in_block || had_content {
969 // Use TextDeltaRenderer for completion - adds final newline
970 let terminal_mode = *self.terminal_mode.borrow();
971 let completion = format!(
972 "{}{}",
973 c.reset(),
974 TextDeltaRenderer::render_completion(terminal_mode)
975 );
976 // Show streaming quality metrics in debug mode or when flag is set
977 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
978 && metrics.total_deltas > 0;
979 if show_metrics {
980 format!("{}\n{}", completion, metrics.format(*c))
981 } else {
982 completion
983 }
984 } else {
985 String::new()
986 }
987 }
988
989 /// Handle error events
990 fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
991 let c = &self.colors;
992 let prefix = &self.display_name;
993
994 let msg = err
995 .message
996 .unwrap_or_else(|| "Unknown streaming error".to_string());
997 format!(
998 "{}[{}]{} {}Error: {}{}\n",
999 c.dim(),
1000 prefix,
1001 c.reset(),
1002 c.red(),
1003 msg,
1004 c.reset()
1005 )
1006 }
1007
1008 /// Handle unknown events
1009 fn handle_unknown_event(&self) -> String {
1010 let c = &self.colors;
1011 let prefix = &self.display_name;
1012
1013 // Unknown stream event - in debug mode, log it
1014 if self.verbosity.is_debug() {
1015 format!(
1016 "{}[{}]{} {}Unknown streaming event{}\n",
1017 c.dim(),
1018 prefix,
1019 c.reset(),
1020 c.dim(),
1021 c.reset()
1022 )
1023 } else {
1024 String::new()
1025 }
1026 }
1027
1028 /// Check if a Claude event is a control event (state management with no user output)
1029 ///
1030 /// Control events are valid JSON that represent state transitions rather than
1031 /// user-facing content. They should be tracked separately from "ignored" events
1032 /// to avoid false health warnings.
1033 const fn is_control_event(event: &ClaudeEvent) -> bool {
1034 match event {
1035 // Stream events that are control events
1036 ClaudeEvent::StreamEvent { event } => matches!(
1037 event,
1038 StreamInnerEvent::MessageStart { .. }
1039 | StreamInnerEvent::ContentBlockStart { .. }
1040 | StreamInnerEvent::ContentBlockStop { .. }
1041 | StreamInnerEvent::MessageDelta { .. }
1042 | StreamInnerEvent::MessageStop
1043 | StreamInnerEvent::Ping
1044 ),
1045 _ => false,
1046 }
1047 }
1048
1049 /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1050 ///
1051 /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1052 /// tool input deltas) that are shown to the user in real-time. These should be
1053 /// tracked separately to avoid inflating "ignored" percentages.
1054 const fn is_partial_event(event: &ClaudeEvent) -> bool {
1055 match event {
1056 // Stream events that produce incremental content
1057 ClaudeEvent::StreamEvent { event } => matches!(
1058 event,
1059 StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1060 ),
1061 _ => false,
1062 }
1063 }
1064
1065 /// Get a shared delta display formatter
1066 const fn formatter() -> DeltaDisplayFormatter {
1067 DeltaDisplayFormatter::new()
1068 }
1069
1070 /// Parse a stream of Claude NDJSON events
1071 pub fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
1072 use super::incremental_parser::IncrementalNdjsonParser;
1073
1074 let c = &self.colors;
1075 let monitor = HealthMonitor::new("Claude");
1076 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
1077 std::fs::OpenOptions::new()
1078 .create(true)
1079 .append(true)
1080 .open(log_path)
1081 .ok()
1082 .map(std::io::BufWriter::new)
1083 });
1084
1085 // Use incremental parser for true real-time streaming
1086 // This processes JSON as soon as it's complete, not waiting for newlines
1087 let mut incremental_parser = IncrementalNdjsonParser::new();
1088 let mut byte_buffer = Vec::new();
1089
1090 // Track whether we've seen a success result event for GLM/ccs-glm compatibility
1091 // Some agents (GLM via CCS) emit both a success result and an error_during_execution
1092 // result when they exit with code 1 despite producing valid output. We suppress
1093 // the spurious error event to avoid confusing duplicate output.
1094 let mut seen_success_result = false;
1095
1096 loop {
1097 // Read available bytes
1098 byte_buffer.clear();
1099 let chunk = reader.fill_buf()?;
1100 if chunk.is_empty() {
1101 break;
1102 }
1103
1104 // Process all bytes immediately
1105 byte_buffer.extend_from_slice(chunk);
1106 let consumed = chunk.len();
1107 reader.consume(consumed);
1108
1109 // Feed bytes to incremental parser
1110 let json_events = incremental_parser.feed(&byte_buffer);
1111
1112 // Process each complete JSON event immediately
1113 for line in json_events {
1114 let trimmed = line.trim();
1115 if trimmed.is_empty() {
1116 continue;
1117 }
1118
1119 // Check for Result events to handle GLM/ccs-glm duplicate event bug
1120 // Some agents emit both success and error_during_execution results
1121 let should_skip_result = if trimmed.starts_with('{') {
1122 // First, check if the JSON has an 'errors' field with actual error messages.
1123 // This is important because Claude events can have either 'error' (string)
1124 // or 'errors' (array of strings), and we need to check both.
1125 let has_errors_with_content =
1126 if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
1127 // Check for 'errors' array with at least one non-empty string
1128 json.get("errors")
1129 .and_then(|v| v.as_array())
1130 .is_some_and(|arr| {
1131 arr.iter()
1132 .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
1133 })
1134 } else {
1135 false
1136 };
1137
1138 if let Ok(ClaudeEvent::Result {
1139 subtype,
1140 duration_ms,
1141 error,
1142 ..
1143 }) = serde_json::from_str::<ClaudeEvent>(trimmed)
1144 {
1145 let is_error_result = subtype.as_deref() != Some("success");
1146
1147 // Suppress spurious GLM error events based on these characteristics:
1148 // 1. Error event (subtype != "success")
1149 // 2. duration_ms is 0 or very small (< 100ms, indicating synthetic event)
1150 // 3. error field is null or empty (no actual error message)
1151 // 4. NO 'errors' field with actual error messages (this indicates a real error)
1152 //
1153 // These criteria identify the spurious error_during_execution events
1154 // that GLM emits when exiting with code 1 despite producing valid output.
1155 //
1156 // We DON'T suppress if there's an 'errors' array with content, because
1157 // that indicates a real error condition that the user should see.
1158 let is_spurious_glm_error = is_error_result
1159 && duration_ms.unwrap_or(0) < 100
1160 && (error.is_none() || error.as_ref().is_some_and(|e| e.is_empty()))
1161 && !has_errors_with_content;
1162
1163 if is_spurious_glm_error && seen_success_result {
1164 // Error after success - suppress (original fix)
1165 true
1166 } else if subtype.as_deref() == Some("success") {
1167 seen_success_result = true;
1168 false
1169 } else if is_spurious_glm_error {
1170 // Spurious error BEFORE success - still suppress based on characteristics
1171 // This handles the reverse-order case where error arrives first
1172 true
1173 } else {
1174 false
1175 }
1176 } else {
1177 false
1178 }
1179 } else {
1180 false
1181 };
1182
1183 // In debug mode, also show the raw JSON
1184 if self.verbosity.is_debug() {
1185 eprintln!(
1186 "{}[DEBUG]{} {}{}{}",
1187 c.dim(),
1188 c.reset(),
1189 c.dim(),
1190 &line,
1191 c.reset()
1192 );
1193 }
1194
1195 // Skip suppressed result events but still log them
1196 if should_skip_result {
1197 if let Some(ref mut file) = log_writer {
1198 writeln!(file, "{line}")?;
1199 file.get_mut().sync_all()?;
1200 }
1201 monitor.record_control_event();
1202 continue;
1203 }
1204
1205 // Parse the event once - parse_event handles malformed JSON by returning None
1206 match self.parse_event(&line) {
1207 Some(output) => {
1208 // Check if this is a partial/delta event (streaming content)
1209 if trimmed.starts_with('{') {
1210 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1211 if Self::is_partial_event(&event) {
1212 monitor.record_partial_event();
1213 } else {
1214 monitor.record_parsed();
1215 }
1216 } else {
1217 monitor.record_parsed();
1218 }
1219 } else {
1220 monitor.record_parsed();
1221 }
1222 // Write output to printer
1223 let mut printer = self.printer.borrow_mut();
1224 write!(printer, "{output}")?;
1225 printer.flush()?;
1226 }
1227 None => {
1228 // Check if this was a control event (state management with no user output)
1229 // Control events are valid JSON that return empty output but aren't "ignored"
1230 if trimmed.starts_with('{') {
1231 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1232 if Self::is_control_event(&event) {
1233 monitor.record_control_event();
1234 } else {
1235 // Valid JSON but not a control event - track as unknown
1236 monitor.record_unknown_event();
1237 }
1238 } else {
1239 // Failed to deserialize - track as parse error
1240 monitor.record_parse_error();
1241 }
1242 } else {
1243 monitor.record_ignored();
1244 }
1245 }
1246 }
1247
1248 // Log raw JSON to file if configured
1249 if let Some(ref mut file) = log_writer {
1250 writeln!(file, "{line}")?;
1251 }
1252 }
1253 }
1254
1255 if let Some(ref mut file) = log_writer {
1256 file.flush()?;
1257 // Ensure data is written to disk before continuing
1258 // This prevents race conditions where extraction runs before OS commits writes
1259 let _ = file.get_mut().sync_all();
1260 }
1261 if let Some(warning) = monitor.check_and_warn(*c) {
1262 let mut printer = self.printer.borrow_mut();
1263 writeln!(printer, "{warning}")?;
1264 printer.flush()?;
1265 }
1266 Ok(())
1267 }
1268}
1269
1270#[cfg(all(test, feature = "test-utils"))]
1271mod tests {
1272 use super::*;
1273 use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1274
1275 #[test]
1276 fn test_printer_method_accessible() {
1277 // Test that the printer() method is accessible and returns a SharedPrinter
1278 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1279 let parser =
1280 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1281
1282 // This test verifies the printer() method is accessible
1283 let _printer_ref = parser.printer();
1284 }
1285
1286 #[test]
1287 fn test_streaming_metrics_method_accessible() {
1288 // Test that the streaming_metrics() method is accessible
1289 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1290 let parser =
1291 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1292
1293 // This test verifies the streaming_metrics() method is accessible
1294 let _metrics = parser.streaming_metrics();
1295 }
1296}