ralph_workflow/json_parser/claude.rs
1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//! creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46#[cfg(feature = "test-utils")]
47use super::health::StreamingQualityMetrics;
48use super::printer::SharedPrinter;
49use super::streaming_state::StreamingSession;
50use super::terminal::TerminalMode;
51use super::types::{
52 format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
53 ContentType, StreamInnerEvent,
54};
55
56/// Claude event parser
57///
58/// Note: This parser is designed for single-threaded use only.
59/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
60/// Do not share this parser across threads.
61pub struct ClaudeParser {
62 colors: Colors,
63 pub(crate) verbosity: Verbosity,
64 log_file: Option<String>,
65 display_name: String,
66 /// Unified streaming session tracker
67 /// Provides single source of truth for streaming state across all content types
68 streaming_session: Rc<RefCell<StreamingSession>>,
69 /// Terminal mode for output formatting
70 /// Detected at parse time and cached for performance
71 terminal_mode: RefCell<TerminalMode>,
72 /// Whether to show streaming quality metrics
73 show_streaming_metrics: bool,
74 /// Output printer for capturing or displaying output
75 printer: SharedPrinter,
76}
77
78impl ClaudeParser {
79 /// Create a new `ClaudeParser` with the given colors and verbosity.
80 ///
81 /// # Arguments
82 ///
83 /// * `colors` - Colors for terminal output
84 /// * `verbosity` - Verbosity level for output
85 ///
86 /// # Returns
87 ///
88 /// A new `ClaudeParser` instance
89 ///
90 /// # Example
91 ///
92 /// ```ignore
93 /// use ralph_workflow::json_parser::ClaudeParser;
94 /// use ralph_workflow::logger::Colors;
95 /// use ralph_workflow::config::Verbosity;
96 ///
97 /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
98 /// ```
99 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
100 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
101 }
102
103 /// Create a new `ClaudeParser` with a custom printer.
104 ///
105 /// # Arguments
106 ///
107 /// * `colors` - Colors for terminal output
108 /// * `verbosity` - Verbosity level for output
109 /// * `printer` - Shared printer for output
110 ///
111 /// # Returns
112 ///
113 /// A new `ClaudeParser` instance
114 pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
115 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
116 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
117
118 // Use the printer's is_terminal method to validate it's connected correctly
119 // This is a sanity check that also satisfies the compiler that the method is used
120 let _printer_is_terminal = printer.borrow().is_terminal();
121
122 Self {
123 colors,
124 verbosity,
125 log_file: None,
126 display_name: "Claude".to_string(),
127 streaming_session: Rc::new(RefCell::new(streaming_session)),
128 terminal_mode: RefCell::new(TerminalMode::detect()),
129 show_streaming_metrics: false,
130 printer,
131 }
132 }
133
134 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
135 self.show_streaming_metrics = show;
136 self
137 }
138
139 /// Set the display name for this parser.
140 ///
141 /// # Arguments
142 ///
143 /// * `display_name` - The name to display in output
144 ///
145 /// # Returns
146 ///
147 /// Self for builder pattern chaining
148 pub fn with_display_name(mut self, display_name: &str) -> Self {
149 self.display_name = display_name.to_string();
150 self
151 }
152
153 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
154 self.log_file = Some(path.to_string());
155 self
156 }
157
158 /// Set the terminal mode for this parser.
159 ///
160 /// # Arguments
161 ///
162 /// * `mode` - The terminal mode to use
163 ///
164 /// # Returns
165 ///
166 /// Self for builder pattern chaining
167 #[cfg(test)]
168 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
169 *self.terminal_mode.borrow_mut() = mode;
170 self
171 }
172
173 /// Get a shared reference to the printer.
174 ///
175 /// This allows tests, monitoring, and other code to access the printer after parsing
176 /// to verify output content, check for duplicates, or capture output for analysis.
177 ///
178 /// # Returns
179 ///
180 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
181 ///
182 /// # Example
183 ///
184 /// ```ignore
185 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
186 /// use std::rc::Rc;
187 /// use std::cell::RefCell;
188 ///
189 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
190 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
191 ///
192 /// // Parse events...
193 ///
194 /// // Now access the printer to verify output
195 /// let printer_ref = parser.printer().borrow();
196 /// assert!(!printer_ref.has_duplicate_consecutive_lines());
197 /// ```
198 /// Get a clone of the printer used by this parser.
199 ///
200 /// This is primarily useful for testing and monitoring.
201 /// Only available with the `test-utils` feature.
202 #[cfg(feature = "test-utils")]
203 pub fn printer(&self) -> SharedPrinter {
204 Rc::clone(&self.printer)
205 }
206
207 /// Get streaming quality metrics from the current session.
208 ///
209 /// This provides insight into the deduplication and streaming quality of the
210 /// parsing session, including:
211 /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
212 /// - Number of large deltas (potential protocol violations)
213 /// - Total deltas processed
214 ///
215 /// Useful for testing, monitoring, and debugging streaming behavior.
216 /// Only available with the `test-utils` feature.
217 ///
218 /// # Returns
219 ///
220 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
221 ///
222 /// # Example
223 ///
224 /// ```ignore
225 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
226 /// use std::rc::Rc;
227 /// use std::cell::RefCell;
228 ///
229 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
230 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
231 ///
232 /// // Parse events...
233 ///
234 /// // Verify deduplication logic triggered
235 /// let metrics = parser.streaming_metrics();
236 /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
237 /// ```
238 #[cfg(feature = "test-utils")]
239 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
240 self.streaming_session
241 .borrow()
242 .get_streaming_quality_metrics()
243 }
244
245 /// Parse and display a single Claude JSON event
246 ///
247 /// Returns `Some(formatted_output)` for valid events, or None for:
248 /// - Malformed JSON (logged at debug level)
249 /// - Unknown event types
250 /// - Empty or whitespace-only output
251 pub fn parse_event(&self, line: &str) -> Option<String> {
252 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
253 e
254 } else {
255 // Non-JSON line - could be raw text output from agent
256 // Pass through as-is if it looks like real output (not empty)
257 let trimmed = line.trim();
258 if !trimmed.is_empty() && !trimmed.starts_with('{') {
259 return Some(format!("{trimmed}\n"));
260 }
261 return None;
262 };
263 let c = &self.colors;
264 let prefix = &self.display_name;
265
266 let output = match event {
267 ClaudeEvent::System {
268 subtype,
269 session_id,
270 cwd,
271 } => self.format_system_event(subtype.as_ref(), session_id, cwd),
272 ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
273 ClaudeEvent::User { message } => self.format_user_event(message),
274 ClaudeEvent::Result {
275 subtype,
276 duration_ms,
277 total_cost_usd,
278 num_turns,
279 result,
280 error,
281 } => self.format_result_event(
282 subtype,
283 duration_ms,
284 total_cost_usd,
285 num_turns,
286 result,
287 error,
288 ),
289 ClaudeEvent::StreamEvent { event } => {
290 // Handle streaming events for delta/partial updates
291 self.parse_stream_event(event)
292 }
293 ClaudeEvent::Unknown => {
294 // Use the generic unknown event formatter for consistent handling
295 // In verbose mode, this will show the event type and key fields
296 // In normal mode, this returns empty string
297 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
298 }
299 };
300
301 if output.is_empty() {
302 None
303 } else {
304 Some(output)
305 }
306 }
307
308 /// Parse a streaming event for delta/partial updates
309 ///
310 /// Handles the nested events within `stream_event`:
311 /// - MessageStart/Stop: Manage session state
312 /// - `ContentBlockStart`: Initialize new content blocks
313 /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
314 /// - `ContentBlockStop`: Finalize content blocks
315 /// - `MessageDelta`: Process message metadata without output
316 /// - Error: Display appropriately
317 ///
318 /// Returns String for display content, empty String for control events.
319 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
320 let mut session = self.streaming_session.borrow_mut();
321
322 match event {
323 StreamInnerEvent::MessageStart {
324 message: _,
325 message_id,
326 } => {
327 // Set message ID for tracking and clear session state on new message
328 session.set_current_message_id(message_id);
329 session.on_message_start();
330 String::new()
331 }
332 StreamInnerEvent::ContentBlockStart {
333 index: Some(index),
334 content_block: Some(block),
335 } => {
336 // Initialize a new content block at this index
337 session.on_content_block_start(index);
338 match &block {
339 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
340 // Initial text in ContentBlockStart - treat as first delta
341 session.on_text_delta(index, t);
342 }
343 ContentBlock::ToolUse {
344 name: _,
345 input: Some(i),
346 } => {
347 // Initialize tool input accumulator
348 let input_str = if let serde_json::Value::String(s) = &i {
349 s.clone()
350 } else {
351 format_tool_input(i)
352 };
353 session.on_tool_input_delta(index, &input_str);
354 }
355 _ => {}
356 }
357 String::new()
358 }
359 StreamInnerEvent::ContentBlockStart {
360 index: Some(index),
361 content_block: None,
362 } => {
363 // Content block started but no initial content provided
364 session.on_content_block_start(index);
365 String::new()
366 }
367 StreamInnerEvent::ContentBlockStart { .. } => {
368 // Content block without index - ignore
369 String::new()
370 }
371 StreamInnerEvent::ContentBlockDelta {
372 index: Some(index),
373 delta: Some(delta),
374 } => self.handle_content_block_delta(&mut session, index, delta),
375 StreamInnerEvent::TextDelta { text: Some(text) } => {
376 self.handle_text_delta(&mut session, &text)
377 }
378 StreamInnerEvent::ContentBlockStop { .. } => {
379 // Content block completion event - no output needed
380 // This event marks the end of a content block but doesn't produce
381 // any displayable content. It's a control event for state management.
382 String::new()
383 }
384 StreamInnerEvent::MessageDelta { .. } => {
385 // Message delta event with usage/metadata - no output needed
386 // This event contains final message metadata (stop_reason, usage stats)
387 // but is used for tracking/monitoring purposes only, not display.
388 String::new()
389 }
390 StreamInnerEvent::ContentBlockDelta { .. }
391 | StreamInnerEvent::Ping
392 | StreamInnerEvent::TextDelta { text: None }
393 | StreamInnerEvent::Error { error: None } => String::new(),
394 StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
395 StreamInnerEvent::Error {
396 error: Some(err), ..
397 } => self.handle_error_event(err),
398 StreamInnerEvent::Unknown => self.handle_unknown_event(),
399 }
400 }
401
402 /// Format a system event
403 fn format_system_event(
404 &self,
405 subtype: Option<&String>,
406 session_id: Option<String>,
407 cwd: Option<String>,
408 ) -> String {
409 let c = &self.colors;
410 let prefix = &self.display_name;
411
412 if subtype.map(std::string::String::as_str) == Some("init") {
413 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
414 let mut out = format!(
415 "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
416 c.dim(),
417 prefix,
418 c.reset(),
419 c.cyan(),
420 c.reset(),
421 c.dim(),
422 sid,
423 c.reset()
424 );
425 if let Some(cwd) = cwd {
426 let _ = writeln!(
427 out,
428 "{}[{}]{} {}Working dir: {}{}",
429 c.dim(),
430 prefix,
431 c.reset(),
432 c.dim(),
433 cwd,
434 c.reset()
435 );
436 }
437 out
438 } else {
439 format!(
440 "{}[{}]{} {}{}{}\n",
441 c.dim(),
442 prefix,
443 c.reset(),
444 c.cyan(),
445 subtype.map_or("system", |s| s.as_str()),
446 c.reset()
447 )
448 }
449 }
450
451 /// Extract text content from a message for hash-based deduplication.
452 fn extract_text_content_for_hash(
453 message: Option<&crate::json_parser::types::AssistantMessage>,
454 ) -> Option<String> {
455 message?.content.as_ref().map(|content| {
456 content
457 .iter()
458 .filter_map(|block| {
459 if let ContentBlock::Text { text } = block {
460 text.as_deref()
461 } else {
462 None
463 }
464 })
465 .collect::<Vec<_>>()
466 .join("")
467 })
468 }
469
470 /// Check if this assistant message is a duplicate of already-streamed content.
471 fn is_duplicate_assistant_message(
472 &self,
473 message: Option<&crate::json_parser::types::AssistantMessage>,
474 ) -> bool {
475 let session = self.streaming_session.borrow();
476
477 // Extract message_id from the assistant message
478 let assistant_msg_id = message.and_then(|m| m.id.as_ref());
479
480 // Check if this assistant event has a message_id that matches the current streaming message
481 // If it does, and we have streamed content, then this assistant event is a duplicate
482 // because the content was already streamed via deltas.
483 if let Some(ast_msg_id) = assistant_msg_id {
484 // Check if message was already marked as displayed (after message_stop)
485 if session.is_duplicate_final_message(ast_msg_id) {
486 return true;
487 }
488
489 // Check if the assistant message_id matches the current streaming message_id
490 if session.get_current_message_id() == Some(ast_msg_id) {
491 // Same message - check if we have streamed any content
492 // If yes, the assistant event is a duplicate
493 if session.has_any_streamed_content() {
494 return true;
495 }
496 }
497 }
498
499 // If no message_id match, fall back to hash-based deduplication
500 let text_content_for_hash = Self::extract_text_content_for_hash(message);
501 if let Some(ref text_content) = text_content_for_hash {
502 if !text_content.is_empty() {
503 return session.is_duplicate_by_hash(text_content);
504 }
505 }
506
507 // Fallback to coarse check
508 session.has_any_streamed_content()
509 }
510
511 /// Format a text content block for assistant output.
512 fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
513 let limit = self.verbosity.truncate_limit("text");
514 let preview = truncate_text(text, limit);
515 let _ = writeln!(
516 out,
517 "{}[{}]{} {}{}{}",
518 colors.dim(),
519 prefix,
520 colors.reset(),
521 colors.white(),
522 preview,
523 colors.reset()
524 );
525 }
526
527 /// Format a tool use content block for assistant output.
528 fn format_tool_use_block(
529 &self,
530 out: &mut String,
531 tool: Option<&String>,
532 input: Option<&serde_json::Value>,
533 prefix: &str,
534 colors: Colors,
535 ) {
536 let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
537 let _ = writeln!(
538 out,
539 "{}[{}]{} {}Tool{}: {}{}{}",
540 colors.dim(),
541 prefix,
542 colors.reset(),
543 colors.magenta(),
544 colors.reset(),
545 colors.bold(),
546 tool_name,
547 colors.reset(),
548 );
549
550 // Show tool input details at Normal and above (not just Verbose)
551 // Tool inputs provide crucial context for understanding agent actions
552 if self.verbosity.show_tool_input() {
553 if let Some(input_val) = input {
554 let input_str = format_tool_input(input_val);
555 let limit = self.verbosity.truncate_limit("tool_input");
556 let preview = truncate_text(&input_str, limit);
557 if !preview.is_empty() {
558 let _ = writeln!(
559 out,
560 "{}[{}]{} {} └─ {}{}",
561 colors.dim(),
562 prefix,
563 colors.reset(),
564 colors.dim(),
565 preview,
566 colors.reset()
567 );
568 }
569 }
570 }
571 }
572
573 /// Format a tool result content block for assistant output.
574 fn format_tool_result_block(
575 &self,
576 out: &mut String,
577 content: &serde_json::Value,
578 prefix: &str,
579 colors: Colors,
580 ) {
581 let content_str = match content {
582 serde_json::Value::String(s) => s.clone(),
583 other => other.to_string(),
584 };
585 let limit = self.verbosity.truncate_limit("tool_result");
586 let preview = truncate_text(&content_str, limit);
587 let _ = writeln!(
588 out,
589 "{}[{}]{} {}Result:{} {}",
590 colors.dim(),
591 prefix,
592 colors.reset(),
593 colors.dim(),
594 colors.reset(),
595 preview
596 );
597 }
598
599 /// Format all content blocks from an assistant message.
600 fn format_content_blocks(
601 &self,
602 out: &mut String,
603 content: &[ContentBlock],
604 prefix: &str,
605 colors: Colors,
606 ) {
607 for block in content {
608 match block {
609 ContentBlock::Text { text } => {
610 if let Some(text) = text {
611 self.format_text_block(out, text, prefix, colors);
612 }
613 }
614 ContentBlock::ToolUse { name, input } => {
615 self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
616 }
617 ContentBlock::ToolResult { content } => {
618 if let Some(content) = content {
619 self.format_tool_result_block(out, content, prefix, colors);
620 }
621 }
622 ContentBlock::Unknown => {}
623 }
624 }
625 }
626
627 /// Format an assistant event
628 fn format_assistant_event(
629 &self,
630 message: Option<crate::json_parser::types::AssistantMessage>,
631 ) -> String {
632 // CRITICAL FIX: When ANY content has been streamed via deltas,
633 // the Assistant event should NOT display it again.
634 // The Assistant event represents the "complete" message, but if we've
635 // already shown the streaming deltas, showing it again causes duplication.
636 if self.is_duplicate_assistant_message(message.as_ref()) {
637 return String::new();
638 }
639
640 let mut out = String::new();
641 if let Some(msg) = message {
642 if let Some(content) = msg.content {
643 self.format_content_blocks(&mut out, &content, &self.display_name, self.colors);
644 }
645 }
646 out
647 }
648
649 /// Format a user event
650 fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
651 let c = &self.colors;
652 let prefix = &self.display_name;
653
654 if let Some(msg) = message {
655 if let Some(content) = msg.content {
656 if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
657 let limit = self.verbosity.truncate_limit("user");
658 let preview = truncate_text(text, limit);
659 return format!(
660 "{}[{}]{} {}User{}: {}{}{}\n",
661 c.dim(),
662 prefix,
663 c.reset(),
664 c.blue(),
665 c.reset(),
666 c.dim(),
667 preview,
668 c.reset()
669 );
670 }
671 }
672 }
673 String::new()
674 }
675
676 /// Format a result event
677 fn format_result_event(
678 &self,
679 subtype: Option<String>,
680 duration_ms: Option<u64>,
681 total_cost_usd: Option<f64>,
682 num_turns: Option<u32>,
683 result: Option<String>,
684 error: Option<String>,
685 ) -> String {
686 let c = &self.colors;
687 let prefix = &self.display_name;
688
689 let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
690 let duration_m = duration_total_secs / 60;
691 let duration_s_rem = duration_total_secs % 60;
692 let cost = total_cost_usd.unwrap_or(0.0);
693 let turns = num_turns.unwrap_or(0);
694
695 let mut out = if subtype.as_deref() == Some("success") {
696 format!(
697 "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
698 c.dim(),
699 prefix,
700 c.reset(),
701 c.green(),
702 CHECK,
703 c.reset(),
704 c.dim(),
705 duration_m,
706 duration_s_rem,
707 turns,
708 cost,
709 c.reset()
710 )
711 } else {
712 let err = error.unwrap_or_else(|| "unknown error".to_string());
713 format!(
714 "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
715 c.dim(),
716 prefix,
717 c.reset(),
718 c.red(),
719 CROSS,
720 subtype.unwrap_or_else(|| "error".to_string()),
721 c.reset(),
722 err,
723 c.dim(),
724 duration_m,
725 duration_s_rem,
726 c.reset()
727 )
728 };
729
730 if let Some(result) = result {
731 let limit = self.verbosity.truncate_limit("result");
732 let preview = truncate_text(&result, limit);
733 let _ = writeln!(
734 out,
735 "\n{}Result summary:{}\n{}{}{}",
736 c.bold(),
737 c.reset(),
738 c.dim(),
739 preview,
740 c.reset()
741 );
742 }
743 out
744 }
745
746 /// Handle content block delta events
747 fn handle_content_block_delta(
748 &self,
749 session: &mut std::cell::RefMut<'_, StreamingSession>,
750 index: u64,
751 delta: ContentBlockDelta,
752 ) -> String {
753 let c = &self.colors;
754 let prefix = &self.display_name;
755
756 match delta {
757 ContentBlockDelta::TextDelta { text: Some(text) } => {
758 let index_str = index.to_string();
759
760 // Use StreamingSession to track state and determine prefix display
761 // Note: Snapshot-as-delta detection and extraction is handled internally
762 // by on_text_delta(), which also increments streaming_metrics counters.
763 let show_prefix = session.on_text_delta(index, &text);
764
765 // Get accumulated text for streaming display
766 let accumulated_text = session
767 .get_accumulated(ContentType::Text, &index_str)
768 .unwrap_or("");
769
770 // Sanitize the accumulated text to check if it's empty
771 // This is needed to skip rendering when the accumulated content is just whitespace
772 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
773
774 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
775 // This prevents rendering empty lines when the accumulated content is just whitespace
776 if sanitized_text.is_empty() {
777 return String::new();
778 }
779
780 // Check if this sanitized content has already been rendered
781 // This prevents duplicates when accumulated content differs only by whitespace
782 if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
783 {
784 return String::new();
785 }
786
787 // Use TextDeltaRenderer for consistent rendering
788 let terminal_mode = *self.terminal_mode.borrow();
789
790 // Use prefix trie to detect if new content extends previously rendered content
791 // If yes, we do an in-place update (carriage return + new content)
792 let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
793
794 let output = if show_prefix && !has_prefix {
795 // First delta with no prefix match - use the renderer with prefix
796 TextDeltaRenderer::render_first_delta(
797 accumulated_text,
798 prefix,
799 *c,
800 terminal_mode,
801 )
802 } else {
803 // Either continuation OR prefix match - use renderer for in-place update
804 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
805 TextDeltaRenderer::render_subsequent_delta(
806 accumulated_text,
807 prefix,
808 *c,
809 terminal_mode,
810 )
811 };
812
813 // Mark this sanitized content as rendered for future duplicate detection
814 // We use the sanitized text (not the rendered output) to avoid false positives
815 // when the same accumulated text is rendered with different terminal modes
816 session.mark_rendered(ContentType::Text, &index_str);
817 session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
818
819 output
820 }
821 ContentBlockDelta::ThinkingDelta {
822 thinking: Some(text),
823 } => {
824 // Track thinking deltas
825 session.on_thinking_delta(index, &text);
826 // Display thinking with visual distinction
827 Self::formatter().format_thinking(text.as_str(), prefix, *c)
828 }
829 ContentBlockDelta::ToolUseDelta {
830 tool_use: Some(tool_delta),
831 } => {
832 // Handle tool input streaming
833 // Extract the tool input from the delta
834 let input_str =
835 tool_delta
836 .get("input")
837 .map_or_else(String::new, |input| match input {
838 serde_json::Value::String(s) => s.clone(),
839 other => format_tool_input(other),
840 });
841
842 if input_str.is_empty() {
843 String::new()
844 } else {
845 // Accumulate tool input
846 session.on_tool_input_delta(index, &input_str);
847
848 // Show partial tool input in real-time
849 let formatter = DeltaDisplayFormatter::new();
850 formatter.format_tool_input(&input_str, prefix, *c)
851 }
852 }
853 _ => String::new(),
854 }
855 }
856
857 /// Handle text delta events
858 fn handle_text_delta(
859 &self,
860 session: &mut std::cell::RefMut<'_, StreamingSession>,
861 text: &str,
862 ) -> String {
863 let c = &self.colors;
864 let prefix = &self.display_name;
865
866 // Standalone text delta (not part of content block)
867 // Use default index "0" for standalone text
868 let default_index = 0u64;
869 let default_index_str = "0";
870
871 // Use StreamingSession to track state and determine prefix display
872 // Note: Snapshot-as-delta detection and extraction is handled internally
873 // by on_text_delta(), which also increments streaming_metrics counters.
874 let show_prefix = session.on_text_delta(default_index, text);
875
876 // Get accumulated text for streaming display
877 let accumulated_text = session
878 .get_accumulated(ContentType::Text, default_index_str)
879 .unwrap_or("");
880
881 // Sanitize the accumulated text to check if it's empty
882 // This is needed to skip rendering when the accumulated content is just whitespace
883 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
884
885 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
886 // This prevents rendering empty lines when the accumulated content is just whitespace
887 if sanitized_text.is_empty() {
888 return String::new();
889 }
890
891 // Check if this sanitized content has already been rendered
892 // This prevents duplicates when accumulated content differs only by whitespace
893 if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
894 return String::new();
895 }
896
897 // Use TextDeltaRenderer for consistent rendering across all parsers
898 let terminal_mode = *self.terminal_mode.borrow();
899
900 // Use prefix trie to detect if new content extends previously rendered content
901 // If yes, we do an in-place update (carriage return + new content)
902 let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
903
904 let output = if show_prefix && !has_prefix {
905 // First delta with no prefix match - use the renderer with prefix
906 TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
907 } else {
908 // Either continuation OR prefix match - use renderer for in-place update
909 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
910 TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
911 };
912
913 // Mark this sanitized content as rendered for future duplicate detection
914 // We use the sanitized text (not the rendered output) to avoid false positives
915 // when the same accumulated text is rendered with different terminal modes
916 session.mark_rendered(ContentType::Text, default_index_str);
917 session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
918
919 output
920 }
921
922 /// Handle message stop events
923 fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
924 let c = &self.colors;
925
926 // Message complete - add final newline if we were in a content block
927 // OR if any content was streamed (handles edge cases where block state
928 // may not have been set but content was still streamed)
929 let metrics = session.get_streaming_quality_metrics();
930 let was_in_block = session.on_message_stop();
931 let had_content = session.has_any_streamed_content();
932 if was_in_block || had_content {
933 // Use TextDeltaRenderer for completion - adds final newline
934 let terminal_mode = *self.terminal_mode.borrow();
935 let completion = format!(
936 "{}{}",
937 c.reset(),
938 TextDeltaRenderer::render_completion(terminal_mode)
939 );
940 // Show streaming quality metrics in debug mode or when flag is set
941 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
942 && metrics.total_deltas > 0;
943 if show_metrics {
944 format!("{}\n{}", completion, metrics.format(*c))
945 } else {
946 completion
947 }
948 } else {
949 String::new()
950 }
951 }
952
953 /// Handle error events
954 fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
955 let c = &self.colors;
956 let prefix = &self.display_name;
957
958 let msg = err
959 .message
960 .unwrap_or_else(|| "Unknown streaming error".to_string());
961 format!(
962 "{}[{}]{} {}Error: {}{}\n",
963 c.dim(),
964 prefix,
965 c.reset(),
966 c.red(),
967 msg,
968 c.reset()
969 )
970 }
971
972 /// Handle unknown events
973 fn handle_unknown_event(&self) -> String {
974 let c = &self.colors;
975 let prefix = &self.display_name;
976
977 // Unknown stream event - in debug mode, log it
978 if self.verbosity.is_debug() {
979 format!(
980 "{}[{}]{} {}Unknown streaming event{}\n",
981 c.dim(),
982 prefix,
983 c.reset(),
984 c.dim(),
985 c.reset()
986 )
987 } else {
988 String::new()
989 }
990 }
991
992 /// Check if a Claude event is a control event (state management with no user output)
993 ///
994 /// Control events are valid JSON that represent state transitions rather than
995 /// user-facing content. They should be tracked separately from "ignored" events
996 /// to avoid false health warnings.
997 const fn is_control_event(event: &ClaudeEvent) -> bool {
998 match event {
999 // Stream events that are control events
1000 ClaudeEvent::StreamEvent { event } => matches!(
1001 event,
1002 StreamInnerEvent::MessageStart { .. }
1003 | StreamInnerEvent::ContentBlockStart { .. }
1004 | StreamInnerEvent::ContentBlockStop { .. }
1005 | StreamInnerEvent::MessageDelta { .. }
1006 | StreamInnerEvent::MessageStop
1007 | StreamInnerEvent::Ping
1008 ),
1009 _ => false,
1010 }
1011 }
1012
1013 /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1014 ///
1015 /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1016 /// tool input deltas) that are shown to the user in real-time. These should be
1017 /// tracked separately to avoid inflating "ignored" percentages.
1018 const fn is_partial_event(event: &ClaudeEvent) -> bool {
1019 match event {
1020 // Stream events that produce incremental content
1021 ClaudeEvent::StreamEvent { event } => matches!(
1022 event,
1023 StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1024 ),
1025 _ => false,
1026 }
1027 }
1028
1029 /// Get a shared delta display formatter
1030 const fn formatter() -> DeltaDisplayFormatter {
1031 DeltaDisplayFormatter::new()
1032 }
1033
1034 /// Parse a stream of Claude NDJSON events
1035 pub fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
1036 use super::incremental_parser::IncrementalNdjsonParser;
1037
1038 let c = &self.colors;
1039 let monitor = HealthMonitor::new("Claude");
1040 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
1041 std::fs::OpenOptions::new()
1042 .create(true)
1043 .append(true)
1044 .open(log_path)
1045 .ok()
1046 .map(std::io::BufWriter::new)
1047 });
1048
1049 // Use incremental parser for true real-time streaming
1050 // This processes JSON as soon as it's complete, not waiting for newlines
1051 let mut incremental_parser = IncrementalNdjsonParser::new();
1052 let mut byte_buffer = Vec::new();
1053
1054 loop {
1055 // Read available bytes
1056 byte_buffer.clear();
1057 let chunk = reader.fill_buf()?;
1058 if chunk.is_empty() {
1059 break;
1060 }
1061
1062 // Process all bytes immediately
1063 byte_buffer.extend_from_slice(chunk);
1064 let consumed = chunk.len();
1065 reader.consume(consumed);
1066
1067 // Feed bytes to incremental parser
1068 let json_events = incremental_parser.feed(&byte_buffer);
1069
1070 // Process each complete JSON event immediately
1071 for line in json_events {
1072 let trimmed = line.trim();
1073 if trimmed.is_empty() {
1074 continue;
1075 }
1076
1077 // In debug mode, also show the raw JSON
1078 if self.verbosity.is_debug() {
1079 eprintln!(
1080 "{}[DEBUG]{} {}{}{}",
1081 c.dim(),
1082 c.reset(),
1083 c.dim(),
1084 &line,
1085 c.reset()
1086 );
1087 }
1088
1089 // Parse the event once - parse_event handles malformed JSON by returning None
1090 match self.parse_event(&line) {
1091 Some(output) => {
1092 // Check if this is a partial/delta event (streaming content)
1093 if trimmed.starts_with('{') {
1094 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1095 if Self::is_partial_event(&event) {
1096 monitor.record_partial_event();
1097 } else {
1098 monitor.record_parsed();
1099 }
1100 } else {
1101 monitor.record_parsed();
1102 }
1103 } else {
1104 monitor.record_parsed();
1105 }
1106 // Write output to printer
1107 let mut printer = self.printer.borrow_mut();
1108 write!(printer, "{output}")?;
1109 printer.flush()?;
1110 }
1111 None => {
1112 // Check if this was a control event (state management with no user output)
1113 // Control events are valid JSON that return empty output but aren't "ignored"
1114 if trimmed.starts_with('{') {
1115 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1116 if Self::is_control_event(&event) {
1117 monitor.record_control_event();
1118 } else {
1119 // Valid JSON but not a control event - track as unknown
1120 monitor.record_unknown_event();
1121 }
1122 } else {
1123 // Failed to deserialize - track as parse error
1124 monitor.record_parse_error();
1125 }
1126 } else {
1127 monitor.record_ignored();
1128 }
1129 }
1130 }
1131
1132 // Log raw JSON to file if configured
1133 if let Some(ref mut file) = log_writer {
1134 writeln!(file, "{line}")?;
1135 }
1136 }
1137 }
1138
1139 if let Some(ref mut file) = log_writer {
1140 file.flush()?;
1141 // Ensure data is written to disk before continuing
1142 // This prevents race conditions where extraction runs before OS commits writes
1143 let _ = file.get_mut().sync_all();
1144 }
1145 if let Some(warning) = monitor.check_and_warn(*c) {
1146 let mut printer = self.printer.borrow_mut();
1147 writeln!(printer, "{warning}")?;
1148 printer.flush()?;
1149 }
1150 Ok(())
1151 }
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156 #[cfg(test)]
1157 use super::*;
1158 #[cfg(test)]
1159 use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1160
1161 #[test]
1162 #[cfg(test)]
1163 fn test_printer_method_accessible() {
1164 // Test that the printer() method is accessible and returns a SharedPrinter
1165 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1166 let parser =
1167 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1168
1169 // This test verifies the printer() method is accessible
1170 let _printer_ref = parser.printer();
1171 }
1172
1173 #[test]
1174 #[cfg(test)]
1175 fn test_streaming_metrics_method_accessible() {
1176 // Test that the streaming_metrics() method is accessible
1177 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1178 let parser =
1179 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1180
1181 // This test verifies the streaming_metrics() method is accessible
1182 let _metrics = parser.streaming_metrics();
1183 }
1184}