ralph_workflow/json_parser/
claude.rs

1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r          (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r  (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n    (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46#[cfg(feature = "test-utils")]
47use super::health::StreamingQualityMetrics;
48use super::printer::SharedPrinter;
49use super::streaming_state::StreamingSession;
50use super::terminal::TerminalMode;
51use super::types::{
52    format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
53    ContentType, StreamInnerEvent,
54};
55
56/// Claude event parser
57///
58/// Note: This parser is designed for single-threaded use only.
59/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
60/// Do not share this parser across threads.
61pub struct ClaudeParser {
62    colors: Colors,
63    pub(crate) verbosity: Verbosity,
64    log_file: Option<String>,
65    display_name: String,
66    /// Unified streaming session tracker
67    /// Provides single source of truth for streaming state across all content types
68    streaming_session: Rc<RefCell<StreamingSession>>,
69    /// Terminal mode for output formatting
70    /// Detected at parse time and cached for performance
71    terminal_mode: RefCell<TerminalMode>,
72    /// Whether to show streaming quality metrics
73    show_streaming_metrics: bool,
74    /// Output printer for capturing or displaying output
75    printer: SharedPrinter,
76}
77
78impl ClaudeParser {
79    /// Create a new `ClaudeParser` with the given colors and verbosity.
80    ///
81    /// # Arguments
82    ///
83    /// * `colors` - Colors for terminal output
84    /// * `verbosity` - Verbosity level for output
85    ///
86    /// # Returns
87    ///
88    /// A new `ClaudeParser` instance
89    ///
90    /// # Example
91    ///
92    /// ```ignore
93    /// use ralph_workflow::json_parser::ClaudeParser;
94    /// use ralph_workflow::logger::Colors;
95    /// use ralph_workflow::config::Verbosity;
96    ///
97    /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
98    /// ```
99    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
100        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
101    }
102
103    /// Create a new `ClaudeParser` with a custom printer.
104    ///
105    /// # Arguments
106    ///
107    /// * `colors` - Colors for terminal output
108    /// * `verbosity` - Verbosity level for output
109    /// * `printer` - Shared printer for output
110    ///
111    /// # Returns
112    ///
113    /// A new `ClaudeParser` instance
114    pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
115        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
116        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
117
118        // Use the printer's is_terminal method to validate it's connected correctly
119        // This is a sanity check that also satisfies the compiler that the method is used
120        let _printer_is_terminal = printer.borrow().is_terminal();
121
122        Self {
123            colors,
124            verbosity,
125            log_file: None,
126            display_name: "Claude".to_string(),
127            streaming_session: Rc::new(RefCell::new(streaming_session)),
128            terminal_mode: RefCell::new(TerminalMode::detect()),
129            show_streaming_metrics: false,
130            printer,
131        }
132    }
133
134    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
135        self.show_streaming_metrics = show;
136        self
137    }
138
139    /// Set the display name for this parser.
140    ///
141    /// # Arguments
142    ///
143    /// * `display_name` - The name to display in output
144    ///
145    /// # Returns
146    ///
147    /// Self for builder pattern chaining
148    pub fn with_display_name(mut self, display_name: &str) -> Self {
149        self.display_name = display_name.to_string();
150        self
151    }
152
153    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
154        self.log_file = Some(path.to_string());
155        self
156    }
157
158    /// Set the terminal mode for this parser.
159    ///
160    /// # Arguments
161    ///
162    /// * `mode` - The terminal mode to use
163    ///
164    /// # Returns
165    ///
166    /// Self for builder pattern chaining
167    #[cfg(test)]
168    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
169        *self.terminal_mode.borrow_mut() = mode;
170        self
171    }
172
173    /// Get a shared reference to the printer.
174    ///
175    /// This allows tests, monitoring, and other code to access the printer after parsing
176    /// to verify output content, check for duplicates, or capture output for analysis.
177    ///
178    /// # Returns
179    ///
180    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
181    ///
182    /// # Example
183    ///
184    /// ```ignore
185    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
186    /// use std::rc::Rc;
187    /// use std::cell::RefCell;
188    ///
189    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
190    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
191    ///
192    /// // Parse events...
193    ///
194    /// // Now access the printer to verify output
195    /// let printer_ref = parser.printer().borrow();
196    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
197    /// ```
198    /// Get a clone of the printer used by this parser.
199    ///
200    /// This is primarily useful for testing and monitoring.
201    /// Only available with the `test-utils` feature.
202    #[cfg(feature = "test-utils")]
203    pub fn printer(&self) -> SharedPrinter {
204        Rc::clone(&self.printer)
205    }
206
207    /// Get streaming quality metrics from the current session.
208    ///
209    /// This provides insight into the deduplication and streaming quality of the
210    /// parsing session, including:
211    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
212    /// - Number of large deltas (potential protocol violations)
213    /// - Total deltas processed
214    ///
215    /// Useful for testing, monitoring, and debugging streaming behavior.
216    /// Only available with the `test-utils` feature.
217    ///
218    /// # Returns
219    ///
220    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
221    ///
222    /// # Example
223    ///
224    /// ```ignore
225    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
226    /// use std::rc::Rc;
227    /// use std::cell::RefCell;
228    ///
229    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
230    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
231    ///
232    /// // Parse events...
233    ///
234    /// // Verify deduplication logic triggered
235    /// let metrics = parser.streaming_metrics();
236    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
237    /// ```
238    #[cfg(feature = "test-utils")]
239    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
240        self.streaming_session
241            .borrow()
242            .get_streaming_quality_metrics()
243    }
244
245    /// Parse and display a single Claude JSON event
246    ///
247    /// Returns `Some(formatted_output)` for valid events, or None for:
248    /// - Malformed JSON (logged at debug level)
249    /// - Unknown event types
250    /// - Empty or whitespace-only output
251    pub fn parse_event(&self, line: &str) -> Option<String> {
252        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
253            e
254        } else {
255            // Non-JSON line - could be raw text output from agent
256            // Pass through as-is if it looks like real output (not empty)
257            let trimmed = line.trim();
258            if !trimmed.is_empty() && !trimmed.starts_with('{') {
259                return Some(format!("{trimmed}\n"));
260            }
261            return None;
262        };
263        let c = &self.colors;
264        let prefix = &self.display_name;
265
266        let output = match event {
267            ClaudeEvent::System {
268                subtype,
269                session_id,
270                cwd,
271            } => self.format_system_event(subtype.as_ref(), session_id, cwd),
272            ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
273            ClaudeEvent::User { message } => self.format_user_event(message),
274            ClaudeEvent::Result {
275                subtype,
276                duration_ms,
277                total_cost_usd,
278                num_turns,
279                result,
280                error,
281            } => self.format_result_event(
282                subtype,
283                duration_ms,
284                total_cost_usd,
285                num_turns,
286                result,
287                error,
288            ),
289            ClaudeEvent::StreamEvent { event } => {
290                // Handle streaming events for delta/partial updates
291                self.parse_stream_event(event)
292            }
293            ClaudeEvent::Unknown => {
294                // Use the generic unknown event formatter for consistent handling
295                // In verbose mode, this will show the event type and key fields
296                // In normal mode, this returns empty string
297                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
298            }
299        };
300
301        if output.is_empty() {
302            None
303        } else {
304            Some(output)
305        }
306    }
307
308    /// Parse a streaming event for delta/partial updates
309    ///
310    /// Handles the nested events within `stream_event`:
311    /// - MessageStart/Stop: Manage session state
312    /// - `ContentBlockStart`: Initialize new content blocks
313    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
314    /// - `ContentBlockStop`: Finalize content blocks
315    /// - `MessageDelta`: Process message metadata without output
316    /// - Error: Display appropriately
317    ///
318    /// Returns String for display content, empty String for control events.
319    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
320        let mut session = self.streaming_session.borrow_mut();
321
322        match event {
323            StreamInnerEvent::MessageStart {
324                message: _,
325                message_id,
326            } => {
327                // Set message ID for tracking and clear session state on new message
328                session.set_current_message_id(message_id);
329                session.on_message_start();
330                String::new()
331            }
332            StreamInnerEvent::ContentBlockStart {
333                index: Some(index),
334                content_block: Some(block),
335            } => {
336                // Initialize a new content block at this index
337                session.on_content_block_start(index);
338                match &block {
339                    ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
340                        // Initial text in ContentBlockStart - treat as first delta
341                        session.on_text_delta(index, t);
342                    }
343                    ContentBlock::ToolUse {
344                        name: _,
345                        input: Some(i),
346                    } => {
347                        // Initialize tool input accumulator
348                        let input_str = if let serde_json::Value::String(s) = &i {
349                            s.clone()
350                        } else {
351                            format_tool_input(i)
352                        };
353                        session.on_tool_input_delta(index, &input_str);
354                    }
355                    _ => {}
356                }
357                String::new()
358            }
359            StreamInnerEvent::ContentBlockStart {
360                index: Some(index),
361                content_block: None,
362            } => {
363                // Content block started but no initial content provided
364                session.on_content_block_start(index);
365                String::new()
366            }
367            StreamInnerEvent::ContentBlockStart { .. } => {
368                // Content block without index - ignore
369                String::new()
370            }
371            StreamInnerEvent::ContentBlockDelta {
372                index: Some(index),
373                delta: Some(delta),
374            } => self.handle_content_block_delta(&mut session, index, delta),
375            StreamInnerEvent::TextDelta { text: Some(text) } => {
376                self.handle_text_delta(&mut session, &text)
377            }
378            StreamInnerEvent::ContentBlockStop { .. } => {
379                // Content block completion event - no output needed
380                // This event marks the end of a content block but doesn't produce
381                // any displayable content. It's a control event for state management.
382                String::new()
383            }
384            StreamInnerEvent::MessageDelta { .. } => {
385                // Message delta event with usage/metadata - no output needed
386                // This event contains final message metadata (stop_reason, usage stats)
387                // but is used for tracking/monitoring purposes only, not display.
388                String::new()
389            }
390            StreamInnerEvent::ContentBlockDelta { .. }
391            | StreamInnerEvent::Ping
392            | StreamInnerEvent::TextDelta { text: None }
393            | StreamInnerEvent::Error { error: None } => String::new(),
394            StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
395            StreamInnerEvent::Error {
396                error: Some(err), ..
397            } => self.handle_error_event(err),
398            StreamInnerEvent::Unknown => self.handle_unknown_event(),
399        }
400    }
401
402    /// Format a system event
403    fn format_system_event(
404        &self,
405        subtype: Option<&String>,
406        session_id: Option<String>,
407        cwd: Option<String>,
408    ) -> String {
409        let c = &self.colors;
410        let prefix = &self.display_name;
411
412        if subtype.map(std::string::String::as_str) == Some("init") {
413            let sid = session_id.unwrap_or_else(|| "unknown".to_string());
414            let mut out = format!(
415                "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
416                c.dim(),
417                prefix,
418                c.reset(),
419                c.cyan(),
420                c.reset(),
421                c.dim(),
422                sid,
423                c.reset()
424            );
425            if let Some(cwd) = cwd {
426                let _ = writeln!(
427                    out,
428                    "{}[{}]{} {}Working dir: {}{}",
429                    c.dim(),
430                    prefix,
431                    c.reset(),
432                    c.dim(),
433                    cwd,
434                    c.reset()
435                );
436            }
437            out
438        } else {
439            format!(
440                "{}[{}]{} {}{}{}\n",
441                c.dim(),
442                prefix,
443                c.reset(),
444                c.cyan(),
445                subtype.map_or("system", |s| s.as_str()),
446                c.reset()
447            )
448        }
449    }
450
451    /// Extract text content from a message for hash-based deduplication.
452    fn extract_text_content_for_hash(
453        message: Option<&crate::json_parser::types::AssistantMessage>,
454    ) -> Option<String> {
455        message?.content.as_ref().map(|content| {
456            content
457                .iter()
458                .filter_map(|block| {
459                    if let ContentBlock::Text { text } = block {
460                        text.as_deref()
461                    } else {
462                        None
463                    }
464                })
465                .collect::<Vec<_>>()
466                .join("")
467        })
468    }
469
470    /// Check if this assistant message is a duplicate of already-streamed content.
471    fn is_duplicate_assistant_message(
472        &self,
473        message: Option<&crate::json_parser::types::AssistantMessage>,
474    ) -> bool {
475        let session = self.streaming_session.borrow();
476
477        // Extract message_id from the assistant message
478        let assistant_msg_id = message.and_then(|m| m.id.as_ref());
479
480        // Check if this assistant event has a message_id that matches the current streaming message
481        // If it does, and we have streamed content, then this assistant event is a duplicate
482        // because the content was already streamed via deltas.
483        if let Some(ast_msg_id) = assistant_msg_id {
484            // Check if message was already marked as displayed (after message_stop)
485            if session.is_duplicate_final_message(ast_msg_id) {
486                return true;
487            }
488
489            // Check if the assistant message_id matches the current streaming message_id
490            if session.get_current_message_id() == Some(ast_msg_id) {
491                // Same message - check if we have streamed any content
492                // If yes, the assistant event is a duplicate
493                if session.has_any_streamed_content() {
494                    return true;
495                }
496            }
497        }
498
499        // If no message_id match, fall back to hash-based deduplication
500        let text_content_for_hash = Self::extract_text_content_for_hash(message);
501        if let Some(ref text_content) = text_content_for_hash {
502            if !text_content.is_empty() {
503                return session.is_duplicate_by_hash(text_content);
504            }
505        }
506
507        // Fallback to coarse check
508        session.has_any_streamed_content()
509    }
510
511    /// Format a text content block for assistant output.
512    fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
513        let limit = self.verbosity.truncate_limit("text");
514        let preview = truncate_text(text, limit);
515        let _ = writeln!(
516            out,
517            "{}[{}]{} {}{}{}",
518            colors.dim(),
519            prefix,
520            colors.reset(),
521            colors.white(),
522            preview,
523            colors.reset()
524        );
525    }
526
527    /// Format a tool use content block for assistant output.
528    fn format_tool_use_block(
529        &self,
530        out: &mut String,
531        tool: Option<&String>,
532        input: Option<&serde_json::Value>,
533        prefix: &str,
534        colors: Colors,
535    ) {
536        let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
537        let _ = writeln!(
538            out,
539            "{}[{}]{} {}Tool{}: {}{}{}",
540            colors.dim(),
541            prefix,
542            colors.reset(),
543            colors.magenta(),
544            colors.reset(),
545            colors.bold(),
546            tool_name,
547            colors.reset(),
548        );
549
550        // Show tool input details at Normal and above (not just Verbose)
551        // Tool inputs provide crucial context for understanding agent actions
552        if self.verbosity.show_tool_input() {
553            if let Some(input_val) = input {
554                let input_str = format_tool_input(input_val);
555                let limit = self.verbosity.truncate_limit("tool_input");
556                let preview = truncate_text(&input_str, limit);
557                if !preview.is_empty() {
558                    let _ = writeln!(
559                        out,
560                        "{}[{}]{} {}  └─ {}{}",
561                        colors.dim(),
562                        prefix,
563                        colors.reset(),
564                        colors.dim(),
565                        preview,
566                        colors.reset()
567                    );
568                }
569            }
570        }
571    }
572
573    /// Format a tool result content block for assistant output.
574    fn format_tool_result_block(
575        &self,
576        out: &mut String,
577        content: &serde_json::Value,
578        prefix: &str,
579        colors: Colors,
580    ) {
581        let content_str = match content {
582            serde_json::Value::String(s) => s.clone(),
583            other => other.to_string(),
584        };
585        let limit = self.verbosity.truncate_limit("tool_result");
586        let preview = truncate_text(&content_str, limit);
587        let _ = writeln!(
588            out,
589            "{}[{}]{} {}Result:{} {}",
590            colors.dim(),
591            prefix,
592            colors.reset(),
593            colors.dim(),
594            colors.reset(),
595            preview
596        );
597    }
598
599    /// Format all content blocks from an assistant message.
600    fn format_content_blocks(
601        &self,
602        out: &mut String,
603        content: &[ContentBlock],
604        prefix: &str,
605        colors: Colors,
606    ) {
607        for block in content {
608            match block {
609                ContentBlock::Text { text } => {
610                    if let Some(text) = text {
611                        self.format_text_block(out, text, prefix, colors);
612                    }
613                }
614                ContentBlock::ToolUse { name, input } => {
615                    self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
616                }
617                ContentBlock::ToolResult { content } => {
618                    if let Some(content) = content {
619                        self.format_tool_result_block(out, content, prefix, colors);
620                    }
621                }
622                ContentBlock::Unknown => {}
623            }
624        }
625    }
626
627    /// Format an assistant event
628    fn format_assistant_event(
629        &self,
630        message: Option<crate::json_parser::types::AssistantMessage>,
631    ) -> String {
632        // CRITICAL FIX: When ANY content has been streamed via deltas,
633        // the Assistant event should NOT display it again.
634        // The Assistant event represents the "complete" message, but if we've
635        // already shown the streaming deltas, showing it again causes duplication.
636        if self.is_duplicate_assistant_message(message.as_ref()) {
637            return String::new();
638        }
639
640        let mut out = String::new();
641        if let Some(msg) = message {
642            if let Some(content) = msg.content {
643                self.format_content_blocks(&mut out, &content, &self.display_name, self.colors);
644            }
645        }
646        out
647    }
648
649    /// Format a user event
650    fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
651        let c = &self.colors;
652        let prefix = &self.display_name;
653
654        if let Some(msg) = message {
655            if let Some(content) = msg.content {
656                if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
657                    let limit = self.verbosity.truncate_limit("user");
658                    let preview = truncate_text(text, limit);
659                    return format!(
660                        "{}[{}]{} {}User{}: {}{}{}\n",
661                        c.dim(),
662                        prefix,
663                        c.reset(),
664                        c.blue(),
665                        c.reset(),
666                        c.dim(),
667                        preview,
668                        c.reset()
669                    );
670                }
671            }
672        }
673        String::new()
674    }
675
676    /// Format a result event
677    fn format_result_event(
678        &self,
679        subtype: Option<String>,
680        duration_ms: Option<u64>,
681        total_cost_usd: Option<f64>,
682        num_turns: Option<u32>,
683        result: Option<String>,
684        error: Option<String>,
685    ) -> String {
686        let c = &self.colors;
687        let prefix = &self.display_name;
688
689        let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
690        let duration_m = duration_total_secs / 60;
691        let duration_s_rem = duration_total_secs % 60;
692        let cost = total_cost_usd.unwrap_or(0.0);
693        let turns = num_turns.unwrap_or(0);
694
695        let mut out = if subtype.as_deref() == Some("success") {
696            format!(
697                "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
698                c.dim(),
699                prefix,
700                c.reset(),
701                c.green(),
702                CHECK,
703                c.reset(),
704                c.dim(),
705                duration_m,
706                duration_s_rem,
707                turns,
708                cost,
709                c.reset()
710            )
711        } else {
712            let err = error.unwrap_or_else(|| "unknown error".to_string());
713            format!(
714                "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
715                c.dim(),
716                prefix,
717                c.reset(),
718                c.red(),
719                CROSS,
720                subtype.unwrap_or_else(|| "error".to_string()),
721                c.reset(),
722                err,
723                c.dim(),
724                duration_m,
725                duration_s_rem,
726                c.reset()
727            )
728        };
729
730        if let Some(result) = result {
731            let limit = self.verbosity.truncate_limit("result");
732            let preview = truncate_text(&result, limit);
733            let _ = writeln!(
734                out,
735                "\n{}Result summary:{}\n{}{}{}",
736                c.bold(),
737                c.reset(),
738                c.dim(),
739                preview,
740                c.reset()
741            );
742        }
743        out
744    }
745
746    /// Handle content block delta events
747    fn handle_content_block_delta(
748        &self,
749        session: &mut std::cell::RefMut<'_, StreamingSession>,
750        index: u64,
751        delta: ContentBlockDelta,
752    ) -> String {
753        let c = &self.colors;
754        let prefix = &self.display_name;
755
756        match delta {
757            ContentBlockDelta::TextDelta { text: Some(text) } => {
758                let index_str = index.to_string();
759
760                // Use StreamingSession to track state and determine prefix display
761                // Note: Snapshot-as-delta detection and extraction is handled internally
762                // by on_text_delta(), which also increments streaming_metrics counters.
763                let show_prefix = session.on_text_delta(index, &text);
764
765                // Get accumulated text for streaming display
766                let accumulated_text = session
767                    .get_accumulated(ContentType::Text, &index_str)
768                    .unwrap_or("");
769
770                // Sanitize the accumulated text to check if it's empty
771                // This is needed to skip rendering when the accumulated content is just whitespace
772                let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
773
774                // Skip rendering if the sanitized text is empty (e.g., only whitespace)
775                // This prevents rendering empty lines when the accumulated content is just whitespace
776                if sanitized_text.is_empty() {
777                    return String::new();
778                }
779
780                // Check if this sanitized content has already been rendered
781                // This prevents duplicates when accumulated content differs only by whitespace
782                if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
783                {
784                    return String::new();
785                }
786
787                // Use TextDeltaRenderer for consistent rendering
788                let terminal_mode = *self.terminal_mode.borrow();
789
790                // Use prefix trie to detect if new content extends previously rendered content
791                // If yes, we do an in-place update (carriage return + new content)
792                let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
793
794                let output = if show_prefix && !has_prefix {
795                    // First delta with no prefix match - use the renderer with prefix
796                    TextDeltaRenderer::render_first_delta(
797                        accumulated_text,
798                        prefix,
799                        *c,
800                        terminal_mode,
801                    )
802                } else {
803                    // Either continuation OR prefix match - use renderer for in-place update
804                    // This handles the case where "Hello" becomes "Hello World" - we REPLACE
805                    TextDeltaRenderer::render_subsequent_delta(
806                        accumulated_text,
807                        prefix,
808                        *c,
809                        terminal_mode,
810                    )
811                };
812
813                // Mark this sanitized content as rendered for future duplicate detection
814                // We use the sanitized text (not the rendered output) to avoid false positives
815                // when the same accumulated text is rendered with different terminal modes
816                session.mark_rendered(ContentType::Text, &index_str);
817                session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
818
819                output
820            }
821            ContentBlockDelta::ThinkingDelta {
822                thinking: Some(text),
823            } => {
824                // Track thinking deltas
825                session.on_thinking_delta(index, &text);
826                // Display thinking with visual distinction
827                Self::formatter().format_thinking(text.as_str(), prefix, *c)
828            }
829            ContentBlockDelta::ToolUseDelta {
830                tool_use: Some(tool_delta),
831            } => {
832                // Handle tool input streaming
833                // Extract the tool input from the delta
834                let input_str =
835                    tool_delta
836                        .get("input")
837                        .map_or_else(String::new, |input| match input {
838                            serde_json::Value::String(s) => s.clone(),
839                            other => format_tool_input(other),
840                        });
841
842                if input_str.is_empty() {
843                    String::new()
844                } else {
845                    // Accumulate tool input
846                    session.on_tool_input_delta(index, &input_str);
847
848                    // Show partial tool input in real-time
849                    let formatter = DeltaDisplayFormatter::new();
850                    formatter.format_tool_input(&input_str, prefix, *c)
851                }
852            }
853            _ => String::new(),
854        }
855    }
856
857    /// Handle text delta events
858    fn handle_text_delta(
859        &self,
860        session: &mut std::cell::RefMut<'_, StreamingSession>,
861        text: &str,
862    ) -> String {
863        let c = &self.colors;
864        let prefix = &self.display_name;
865
866        // Standalone text delta (not part of content block)
867        // Use default index "0" for standalone text
868        let default_index = 0u64;
869        let default_index_str = "0";
870
871        // Use StreamingSession to track state and determine prefix display
872        // Note: Snapshot-as-delta detection and extraction is handled internally
873        // by on_text_delta(), which also increments streaming_metrics counters.
874        let show_prefix = session.on_text_delta(default_index, text);
875
876        // Get accumulated text for streaming display
877        let accumulated_text = session
878            .get_accumulated(ContentType::Text, default_index_str)
879            .unwrap_or("");
880
881        // Sanitize the accumulated text to check if it's empty
882        // This is needed to skip rendering when the accumulated content is just whitespace
883        let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
884
885        // Skip rendering if the sanitized text is empty (e.g., only whitespace)
886        // This prevents rendering empty lines when the accumulated content is just whitespace
887        if sanitized_text.is_empty() {
888            return String::new();
889        }
890
891        // Check if this sanitized content has already been rendered
892        // This prevents duplicates when accumulated content differs only by whitespace
893        if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
894            return String::new();
895        }
896
897        // Use TextDeltaRenderer for consistent rendering across all parsers
898        let terminal_mode = *self.terminal_mode.borrow();
899
900        // Use prefix trie to detect if new content extends previously rendered content
901        // If yes, we do an in-place update (carriage return + new content)
902        let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
903
904        let output = if show_prefix && !has_prefix {
905            // First delta with no prefix match - use the renderer with prefix
906            TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
907        } else {
908            // Either continuation OR prefix match - use renderer for in-place update
909            // This handles the case where "Hello" becomes "Hello World" - we REPLACE
910            TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
911        };
912
913        // Mark this sanitized content as rendered for future duplicate detection
914        // We use the sanitized text (not the rendered output) to avoid false positives
915        // when the same accumulated text is rendered with different terminal modes
916        session.mark_rendered(ContentType::Text, default_index_str);
917        session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
918
919        output
920    }
921
922    /// Handle message stop events
923    fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
924        let c = &self.colors;
925
926        // Message complete - add final newline if we were in a content block
927        // OR if any content was streamed (handles edge cases where block state
928        // may not have been set but content was still streamed)
929        let metrics = session.get_streaming_quality_metrics();
930        let was_in_block = session.on_message_stop();
931        let had_content = session.has_any_streamed_content();
932        if was_in_block || had_content {
933            // Use TextDeltaRenderer for completion - adds final newline
934            let terminal_mode = *self.terminal_mode.borrow();
935            let completion = format!(
936                "{}{}",
937                c.reset(),
938                TextDeltaRenderer::render_completion(terminal_mode)
939            );
940            // Show streaming quality metrics in debug mode or when flag is set
941            let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
942                && metrics.total_deltas > 0;
943            if show_metrics {
944                format!("{}\n{}", completion, metrics.format(*c))
945            } else {
946                completion
947            }
948        } else {
949            String::new()
950        }
951    }
952
953    /// Handle error events
954    fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
955        let c = &self.colors;
956        let prefix = &self.display_name;
957
958        let msg = err
959            .message
960            .unwrap_or_else(|| "Unknown streaming error".to_string());
961        format!(
962            "{}[{}]{} {}Error: {}{}\n",
963            c.dim(),
964            prefix,
965            c.reset(),
966            c.red(),
967            msg,
968            c.reset()
969        )
970    }
971
972    /// Handle unknown events
973    fn handle_unknown_event(&self) -> String {
974        let c = &self.colors;
975        let prefix = &self.display_name;
976
977        // Unknown stream event - in debug mode, log it
978        if self.verbosity.is_debug() {
979            format!(
980                "{}[{}]{} {}Unknown streaming event{}\n",
981                c.dim(),
982                prefix,
983                c.reset(),
984                c.dim(),
985                c.reset()
986            )
987        } else {
988            String::new()
989        }
990    }
991
992    /// Check if a Claude event is a control event (state management with no user output)
993    ///
994    /// Control events are valid JSON that represent state transitions rather than
995    /// user-facing content. They should be tracked separately from "ignored" events
996    /// to avoid false health warnings.
997    const fn is_control_event(event: &ClaudeEvent) -> bool {
998        match event {
999            // Stream events that are control events
1000            ClaudeEvent::StreamEvent { event } => matches!(
1001                event,
1002                StreamInnerEvent::MessageStart { .. }
1003                    | StreamInnerEvent::ContentBlockStart { .. }
1004                    | StreamInnerEvent::ContentBlockStop { .. }
1005                    | StreamInnerEvent::MessageDelta { .. }
1006                    | StreamInnerEvent::MessageStop
1007                    | StreamInnerEvent::Ping
1008            ),
1009            _ => false,
1010        }
1011    }
1012
1013    /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1014    ///
1015    /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1016    /// tool input deltas) that are shown to the user in real-time. These should be
1017    /// tracked separately to avoid inflating "ignored" percentages.
1018    const fn is_partial_event(event: &ClaudeEvent) -> bool {
1019        match event {
1020            // Stream events that produce incremental content
1021            ClaudeEvent::StreamEvent { event } => matches!(
1022                event,
1023                StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1024            ),
1025            _ => false,
1026        }
1027    }
1028
1029    /// Get a shared delta display formatter
1030    const fn formatter() -> DeltaDisplayFormatter {
1031        DeltaDisplayFormatter::new()
1032    }
1033
1034    /// Parse a stream of Claude NDJSON events
1035    pub fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
1036        use super::incremental_parser::IncrementalNdjsonParser;
1037
1038        let c = &self.colors;
1039        let monitor = HealthMonitor::new("Claude");
1040        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
1041            std::fs::OpenOptions::new()
1042                .create(true)
1043                .append(true)
1044                .open(log_path)
1045                .ok()
1046                .map(std::io::BufWriter::new)
1047        });
1048
1049        // Use incremental parser for true real-time streaming
1050        // This processes JSON as soon as it's complete, not waiting for newlines
1051        let mut incremental_parser = IncrementalNdjsonParser::new();
1052        let mut byte_buffer = Vec::new();
1053
1054        loop {
1055            // Read available bytes
1056            byte_buffer.clear();
1057            let chunk = reader.fill_buf()?;
1058            if chunk.is_empty() {
1059                break;
1060            }
1061
1062            // Process all bytes immediately
1063            byte_buffer.extend_from_slice(chunk);
1064            let consumed = chunk.len();
1065            reader.consume(consumed);
1066
1067            // Feed bytes to incremental parser
1068            let json_events = incremental_parser.feed(&byte_buffer);
1069
1070            // Process each complete JSON event immediately
1071            for line in json_events {
1072                let trimmed = line.trim();
1073                if trimmed.is_empty() {
1074                    continue;
1075                }
1076
1077                // In debug mode, also show the raw JSON
1078                if self.verbosity.is_debug() {
1079                    eprintln!(
1080                        "{}[DEBUG]{} {}{}{}",
1081                        c.dim(),
1082                        c.reset(),
1083                        c.dim(),
1084                        &line,
1085                        c.reset()
1086                    );
1087                }
1088
1089                // Parse the event once - parse_event handles malformed JSON by returning None
1090                match self.parse_event(&line) {
1091                    Some(output) => {
1092                        // Check if this is a partial/delta event (streaming content)
1093                        if trimmed.starts_with('{') {
1094                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1095                                if Self::is_partial_event(&event) {
1096                                    monitor.record_partial_event();
1097                                } else {
1098                                    monitor.record_parsed();
1099                                }
1100                            } else {
1101                                monitor.record_parsed();
1102                            }
1103                        } else {
1104                            monitor.record_parsed();
1105                        }
1106                        // Write output to printer
1107                        let mut printer = self.printer.borrow_mut();
1108                        write!(printer, "{output}")?;
1109                        printer.flush()?;
1110                    }
1111                    None => {
1112                        // Check if this was a control event (state management with no user output)
1113                        // Control events are valid JSON that return empty output but aren't "ignored"
1114                        if trimmed.starts_with('{') {
1115                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1116                                if Self::is_control_event(&event) {
1117                                    monitor.record_control_event();
1118                                } else {
1119                                    // Valid JSON but not a control event - track as unknown
1120                                    monitor.record_unknown_event();
1121                                }
1122                            } else {
1123                                // Failed to deserialize - track as parse error
1124                                monitor.record_parse_error();
1125                            }
1126                        } else {
1127                            monitor.record_ignored();
1128                        }
1129                    }
1130                }
1131
1132                // Log raw JSON to file if configured
1133                if let Some(ref mut file) = log_writer {
1134                    writeln!(file, "{line}")?;
1135                }
1136            }
1137        }
1138
1139        if let Some(ref mut file) = log_writer {
1140            file.flush()?;
1141            // Ensure data is written to disk before continuing
1142            // This prevents race conditions where extraction runs before OS commits writes
1143            let _ = file.get_mut().sync_all();
1144        }
1145        if let Some(warning) = monitor.check_and_warn(*c) {
1146            let mut printer = self.printer.borrow_mut();
1147            writeln!(printer, "{warning}")?;
1148            printer.flush()?;
1149        }
1150        Ok(())
1151    }
1152}
1153
1154#[cfg(test)]
1155mod tests {
1156    #[cfg(test)]
1157    use super::*;
1158    #[cfg(test)]
1159    use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1160
1161    #[test]
1162    #[cfg(test)]
1163    fn test_printer_method_accessible() {
1164        // Test that the printer() method is accessible and returns a SharedPrinter
1165        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1166        let parser =
1167            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1168
1169        // This test verifies the printer() method is accessible
1170        let _printer_ref = parser.printer();
1171    }
1172
1173    #[test]
1174    #[cfg(test)]
1175    fn test_streaming_metrics_method_accessible() {
1176        // Test that the streaming_metrics() method is accessible
1177        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1178        let parser =
1179            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1180
1181        // This test verifies the streaming_metrics() method is accessible
1182        let _metrics = parser.streaming_metrics();
1183    }
1184}