ralph_workflow/json_parser/
claude.rs

1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r          (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r  (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n    (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46use super::health::StreamingQualityMetrics;
47use super::printer::SharedPrinter;
48use super::streaming_state::StreamingSession;
49use super::terminal::TerminalMode;
50use super::types::{
51    format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
52    ContentType, StreamInnerEvent,
53};
54
55/// Claude event parser
56///
57/// Note: This parser is designed for single-threaded use only.
58/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
59/// Do not share this parser across threads.
60pub struct ClaudeParser {
61    colors: Colors,
62    pub(crate) verbosity: Verbosity,
63    log_file: Option<String>,
64    display_name: String,
65    /// Unified streaming session tracker
66    /// Provides single source of truth for streaming state across all content types
67    streaming_session: Rc<RefCell<StreamingSession>>,
68    /// Terminal mode for output formatting
69    /// Detected at parse time and cached for performance
70    terminal_mode: RefCell<TerminalMode>,
71    /// Whether to show streaming quality metrics
72    show_streaming_metrics: bool,
73    /// Output printer for capturing or displaying output
74    printer: SharedPrinter,
75}
76
77impl ClaudeParser {
78    /// Create a new `ClaudeParser` with the given colors and verbosity.
79    ///
80    /// # Arguments
81    ///
82    /// * `colors` - Colors for terminal output
83    /// * `verbosity` - Verbosity level for output
84    ///
85    /// # Returns
86    ///
87    /// A new `ClaudeParser` instance
88    ///
89    /// # Example
90    ///
91    /// ```ignore
92    /// use ralph_workflow::json_parser::ClaudeParser;
93    /// use ralph_workflow::logger::Colors;
94    /// use ralph_workflow::config::Verbosity;
95    ///
96    /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
97    /// ```
98    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
99        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
100    }
101
102    /// Create a new `ClaudeParser` with a custom printer.
103    ///
104    /// # Arguments
105    ///
106    /// * `colors` - Colors for terminal output
107    /// * `verbosity` - Verbosity level for output
108    /// * `printer` - Shared printer for output
109    ///
110    /// # Returns
111    ///
112    /// A new `ClaudeParser` instance
113    pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
114        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
115        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
116
117        // Use the printer's is_terminal method to validate it's connected correctly
118        // This is a sanity check that also satisfies the compiler that the method is used
119        let _printer_is_terminal = printer.borrow().is_terminal();
120
121        Self {
122            colors,
123            verbosity,
124            log_file: None,
125            display_name: "Claude".to_string(),
126            streaming_session: Rc::new(RefCell::new(streaming_session)),
127            terminal_mode: RefCell::new(TerminalMode::detect()),
128            show_streaming_metrics: false,
129            printer,
130        }
131    }
132
133    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
134        self.show_streaming_metrics = show;
135        self
136    }
137
138    /// Set the display name for this parser.
139    ///
140    /// # Arguments
141    ///
142    /// * `display_name` - The name to display in output
143    ///
144    /// # Returns
145    ///
146    /// Self for builder pattern chaining
147    pub fn with_display_name(mut self, display_name: &str) -> Self {
148        self.display_name = display_name.to_string();
149        self
150    }
151
152    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
153        self.log_file = Some(path.to_string());
154        self
155    }
156
157    /// Set the terminal mode for this parser.
158    ///
159    /// # Arguments
160    ///
161    /// * `mode` - The terminal mode to use
162    ///
163    /// # Returns
164    ///
165    /// Self for builder pattern chaining
166    #[cfg(test)]
167    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
168        *self.terminal_mode.borrow_mut() = mode;
169        self
170    }
171
172    /// Get a shared reference to the printer.
173    ///
174    /// This allows tests, monitoring, and other code to access the printer after parsing
175    /// to verify output content, check for duplicates, or capture output for analysis.
176    ///
177    /// # Returns
178    ///
179    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
180    ///
181    /// # Example
182    ///
183    /// ```ignore
184    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
185    /// use std::rc::Rc;
186    /// use std::cell::RefCell;
187    ///
188    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
189    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
190    ///
191    /// // Parse events...
192    ///
193    /// // Now access the printer to verify output
194    /// let printer_ref = parser.printer().borrow();
195    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
196    /// ```
197    #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
198    pub fn printer(&self) -> SharedPrinter {
199        Rc::clone(&self.printer)
200    }
201
202    /// Get streaming quality metrics from the current session.
203    ///
204    /// This provides insight into the deduplication and streaming quality of the
205    /// parsing session, including:
206    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
207    /// - Number of large deltas (potential protocol violations)
208    /// - Total deltas processed
209    ///
210    /// Useful for testing, monitoring, and debugging streaming behavior.
211    ///
212    /// # Returns
213    ///
214    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
215    ///
216    /// # Example
217    ///
218    /// ```ignore
219    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
220    /// use std::rc::Rc;
221    /// use std::cell::RefCell;
222    ///
223    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
224    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
225    ///
226    /// // Parse events...
227    ///
228    /// // Verify deduplication logic triggered
229    /// let metrics = parser.streaming_metrics();
230    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
231    /// ```
232    #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
233    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
234        self.streaming_session
235            .borrow()
236            .get_streaming_quality_metrics()
237    }
238
239    /// Parse and display a single Claude JSON event
240    ///
241    /// Returns `Some(formatted_output)` for valid events, or None for:
242    /// - Malformed JSON (logged at debug level)
243    /// - Unknown event types
244    /// - Empty or whitespace-only output
245    pub fn parse_event(&self, line: &str) -> Option<String> {
246        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
247            e
248        } else {
249            // Non-JSON line - could be raw text output from agent
250            // Pass through as-is if it looks like real output (not empty)
251            let trimmed = line.trim();
252            if !trimmed.is_empty() && !trimmed.starts_with('{') {
253                return Some(format!("{trimmed}\n"));
254            }
255            return None;
256        };
257        let c = &self.colors;
258        let prefix = &self.display_name;
259
260        let output = match event {
261            ClaudeEvent::System {
262                subtype,
263                session_id,
264                cwd,
265            } => self.format_system_event(subtype.as_ref(), session_id, cwd),
266            ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
267            ClaudeEvent::User { message } => self.format_user_event(message),
268            ClaudeEvent::Result {
269                subtype,
270                duration_ms,
271                total_cost_usd,
272                num_turns,
273                result,
274                error,
275            } => self.format_result_event(
276                subtype,
277                duration_ms,
278                total_cost_usd,
279                num_turns,
280                result,
281                error,
282            ),
283            ClaudeEvent::StreamEvent { event } => {
284                // Handle streaming events for delta/partial updates
285                self.parse_stream_event(event)
286            }
287            ClaudeEvent::Unknown => {
288                // Use the generic unknown event formatter for consistent handling
289                // In verbose mode, this will show the event type and key fields
290                // In normal mode, this returns empty string
291                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
292            }
293        };
294
295        if output.is_empty() {
296            None
297        } else {
298            Some(output)
299        }
300    }
301
302    /// Parse a streaming event for delta/partial updates
303    ///
304    /// Handles the nested events within `stream_event`:
305    /// - MessageStart/Stop: Manage session state
306    /// - `ContentBlockStart`: Initialize new content blocks
307    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
308    /// - `ContentBlockStop`: Finalize content blocks
309    /// - `MessageDelta`: Process message metadata without output
310    /// - Error: Display appropriately
311    ///
312    /// Returns String for display content, empty String for control events.
313    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
314        let mut session = self.streaming_session.borrow_mut();
315
316        match event {
317            StreamInnerEvent::MessageStart {
318                message: _,
319                message_id,
320            } => {
321                // Set message ID for tracking and clear session state on new message
322                session.set_current_message_id(message_id);
323                session.on_message_start();
324                String::new()
325            }
326            StreamInnerEvent::ContentBlockStart {
327                index: Some(index),
328                content_block: Some(block),
329            } => {
330                // Initialize a new content block at this index
331                session.on_content_block_start(index);
332                match &block {
333                    ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
334                        // Initial text in ContentBlockStart - treat as first delta
335                        session.on_text_delta(index, t);
336                    }
337                    ContentBlock::ToolUse {
338                        name: _,
339                        input: Some(i),
340                    } => {
341                        // Initialize tool input accumulator
342                        let input_str = if let serde_json::Value::String(s) = &i {
343                            s.clone()
344                        } else {
345                            format_tool_input(i)
346                        };
347                        session.on_tool_input_delta(index, &input_str);
348                    }
349                    _ => {}
350                }
351                String::new()
352            }
353            StreamInnerEvent::ContentBlockStart {
354                index: Some(index),
355                content_block: None,
356            } => {
357                // Content block started but no initial content provided
358                session.on_content_block_start(index);
359                String::new()
360            }
361            StreamInnerEvent::ContentBlockStart { .. } => {
362                // Content block without index - ignore
363                String::new()
364            }
365            StreamInnerEvent::ContentBlockDelta {
366                index: Some(index),
367                delta: Some(delta),
368            } => self.handle_content_block_delta(&mut session, index, delta),
369            StreamInnerEvent::TextDelta { text: Some(text) } => {
370                self.handle_text_delta(&mut session, &text)
371            }
372            StreamInnerEvent::ContentBlockStop { .. } => {
373                // Content block completion event - no output needed
374                // This event marks the end of a content block but doesn't produce
375                // any displayable content. It's a control event for state management.
376                String::new()
377            }
378            StreamInnerEvent::MessageDelta { .. } => {
379                // Message delta event with usage/metadata - no output needed
380                // This event contains final message metadata (stop_reason, usage stats)
381                // but is used for tracking/monitoring purposes only, not display.
382                String::new()
383            }
384            StreamInnerEvent::ContentBlockDelta { .. }
385            | StreamInnerEvent::Ping
386            | StreamInnerEvent::TextDelta { text: None }
387            | StreamInnerEvent::Error { error: None } => String::new(),
388            StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
389            StreamInnerEvent::Error {
390                error: Some(err), ..
391            } => self.handle_error_event(err),
392            StreamInnerEvent::Unknown => self.handle_unknown_event(),
393        }
394    }
395
396    /// Format a system event
397    fn format_system_event(
398        &self,
399        subtype: Option<&String>,
400        session_id: Option<String>,
401        cwd: Option<String>,
402    ) -> String {
403        let c = &self.colors;
404        let prefix = &self.display_name;
405
406        if subtype.map(std::string::String::as_str) == Some("init") {
407            let sid = session_id.unwrap_or_else(|| "unknown".to_string());
408            let mut out = format!(
409                "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
410                c.dim(),
411                prefix,
412                c.reset(),
413                c.cyan(),
414                c.reset(),
415                c.dim(),
416                sid,
417                c.reset()
418            );
419            if let Some(cwd) = cwd {
420                let _ = writeln!(
421                    out,
422                    "{}[{}]{} {}Working dir: {}{}",
423                    c.dim(),
424                    prefix,
425                    c.reset(),
426                    c.dim(),
427                    cwd,
428                    c.reset()
429                );
430            }
431            out
432        } else {
433            format!(
434                "{}[{}]{} {}{}{}\n",
435                c.dim(),
436                prefix,
437                c.reset(),
438                c.cyan(),
439                subtype.map_or("system", |s| s.as_str()),
440                c.reset()
441            )
442        }
443    }
444
445    /// Extract text content from a message for hash-based deduplication.
446    fn extract_text_content_for_hash(
447        message: Option<&crate::json_parser::types::AssistantMessage>,
448    ) -> Option<String> {
449        message?.content.as_ref().map(|content| {
450            content
451                .iter()
452                .filter_map(|block| {
453                    if let ContentBlock::Text { text } = block {
454                        text.as_deref()
455                    } else {
456                        None
457                    }
458                })
459                .collect::<Vec<_>>()
460                .join("")
461        })
462    }
463
464    /// Check if this assistant message is a duplicate of already-streamed content.
465    fn is_duplicate_assistant_message(
466        &self,
467        message: Option<&crate::json_parser::types::AssistantMessage>,
468    ) -> bool {
469        let session = self.streaming_session.borrow();
470
471        // Extract message_id from the assistant message
472        let assistant_msg_id = message.and_then(|m| m.id.as_ref());
473
474        // Check if this assistant event has a message_id that matches the current streaming message
475        // If it does, and we have streamed content, then this assistant event is a duplicate
476        // because the content was already streamed via deltas.
477        if let Some(ast_msg_id) = assistant_msg_id {
478            // Check if message was already marked as displayed (after message_stop)
479            if session.is_duplicate_final_message(ast_msg_id) {
480                return true;
481            }
482
483            // Check if the assistant message_id matches the current streaming message_id
484            if session.get_current_message_id() == Some(ast_msg_id) {
485                // Same message - check if we have streamed any content
486                // If yes, the assistant event is a duplicate
487                if session.has_any_streamed_content() {
488                    return true;
489                }
490            }
491        }
492
493        // If no message_id match, fall back to hash-based deduplication
494        let text_content_for_hash = Self::extract_text_content_for_hash(message);
495        if let Some(ref text_content) = text_content_for_hash {
496            if !text_content.is_empty() {
497                return session.is_duplicate_by_hash(text_content);
498            }
499        }
500
501        // Fallback to coarse check
502        session.has_any_streamed_content()
503    }
504
505    /// Format a text content block for assistant output.
506    fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
507        let limit = self.verbosity.truncate_limit("text");
508        let preview = truncate_text(text, limit);
509        let _ = writeln!(
510            out,
511            "{}[{}]{} {}{}{}",
512            colors.dim(),
513            prefix,
514            colors.reset(),
515            colors.white(),
516            preview,
517            colors.reset()
518        );
519    }
520
521    /// Format a tool use content block for assistant output.
522    fn format_tool_use_block(
523        &self,
524        out: &mut String,
525        tool: Option<&String>,
526        input: Option<&serde_json::Value>,
527        prefix: &str,
528        colors: Colors,
529    ) {
530        let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
531        let _ = writeln!(
532            out,
533            "{}[{}]{} {}Tool{}: {}{}{}",
534            colors.dim(),
535            prefix,
536            colors.reset(),
537            colors.magenta(),
538            colors.reset(),
539            colors.bold(),
540            tool_name,
541            colors.reset(),
542        );
543
544        // Show tool input details at Normal and above (not just Verbose)
545        // Tool inputs provide crucial context for understanding agent actions
546        if self.verbosity.show_tool_input() {
547            if let Some(input_val) = input {
548                let input_str = format_tool_input(input_val);
549                let limit = self.verbosity.truncate_limit("tool_input");
550                let preview = truncate_text(&input_str, limit);
551                if !preview.is_empty() {
552                    let _ = writeln!(
553                        out,
554                        "{}[{}]{} {}  └─ {}{}",
555                        colors.dim(),
556                        prefix,
557                        colors.reset(),
558                        colors.dim(),
559                        preview,
560                        colors.reset()
561                    );
562                }
563            }
564        }
565    }
566
567    /// Format a tool result content block for assistant output.
568    fn format_tool_result_block(
569        &self,
570        out: &mut String,
571        content: &serde_json::Value,
572        prefix: &str,
573        colors: Colors,
574    ) {
575        let content_str = match content {
576            serde_json::Value::String(s) => s.clone(),
577            other => other.to_string(),
578        };
579        let limit = self.verbosity.truncate_limit("tool_result");
580        let preview = truncate_text(&content_str, limit);
581        let _ = writeln!(
582            out,
583            "{}[{}]{} {}Result:{} {}",
584            colors.dim(),
585            prefix,
586            colors.reset(),
587            colors.dim(),
588            colors.reset(),
589            preview
590        );
591    }
592
593    /// Format all content blocks from an assistant message.
594    fn format_content_blocks(
595        &self,
596        out: &mut String,
597        content: &[ContentBlock],
598        prefix: &str,
599        colors: Colors,
600    ) {
601        for block in content {
602            match block {
603                ContentBlock::Text { text } => {
604                    if let Some(text) = text {
605                        self.format_text_block(out, text, prefix, colors);
606                    }
607                }
608                ContentBlock::ToolUse { name, input } => {
609                    self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
610                }
611                ContentBlock::ToolResult { content } => {
612                    if let Some(content) = content {
613                        self.format_tool_result_block(out, content, prefix, colors);
614                    }
615                }
616                ContentBlock::Unknown => {}
617            }
618        }
619    }
620
621    /// Format an assistant event
622    fn format_assistant_event(
623        &self,
624        message: Option<crate::json_parser::types::AssistantMessage>,
625    ) -> String {
626        // CRITICAL FIX: When ANY content has been streamed via deltas,
627        // the Assistant event should NOT display it again.
628        // The Assistant event represents the "complete" message, but if we've
629        // already shown the streaming deltas, showing it again causes duplication.
630        if self.is_duplicate_assistant_message(message.as_ref()) {
631            return String::new();
632        }
633
634        let mut out = String::new();
635        if let Some(msg) = message {
636            if let Some(content) = msg.content {
637                self.format_content_blocks(&mut out, &content, &self.display_name, self.colors);
638            }
639        }
640        out
641    }
642
643    /// Format a user event
644    fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
645        let c = &self.colors;
646        let prefix = &self.display_name;
647
648        if let Some(msg) = message {
649            if let Some(content) = msg.content {
650                if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
651                    let limit = self.verbosity.truncate_limit("user");
652                    let preview = truncate_text(text, limit);
653                    return format!(
654                        "{}[{}]{} {}User{}: {}{}{}\n",
655                        c.dim(),
656                        prefix,
657                        c.reset(),
658                        c.blue(),
659                        c.reset(),
660                        c.dim(),
661                        preview,
662                        c.reset()
663                    );
664                }
665            }
666        }
667        String::new()
668    }
669
670    /// Format a result event
671    fn format_result_event(
672        &self,
673        subtype: Option<String>,
674        duration_ms: Option<u64>,
675        total_cost_usd: Option<f64>,
676        num_turns: Option<u32>,
677        result: Option<String>,
678        error: Option<String>,
679    ) -> String {
680        let c = &self.colors;
681        let prefix = &self.display_name;
682
683        let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
684        let duration_m = duration_total_secs / 60;
685        let duration_s_rem = duration_total_secs % 60;
686        let cost = total_cost_usd.unwrap_or(0.0);
687        let turns = num_turns.unwrap_or(0);
688
689        let mut out = if subtype.as_deref() == Some("success") {
690            format!(
691                "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
692                c.dim(),
693                prefix,
694                c.reset(),
695                c.green(),
696                CHECK,
697                c.reset(),
698                c.dim(),
699                duration_m,
700                duration_s_rem,
701                turns,
702                cost,
703                c.reset()
704            )
705        } else {
706            let err = error.unwrap_or_else(|| "unknown error".to_string());
707            format!(
708                "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
709                c.dim(),
710                prefix,
711                c.reset(),
712                c.red(),
713                CROSS,
714                subtype.unwrap_or_else(|| "error".to_string()),
715                c.reset(),
716                err,
717                c.dim(),
718                duration_m,
719                duration_s_rem,
720                c.reset()
721            )
722        };
723
724        if let Some(result) = result {
725            let limit = self.verbosity.truncate_limit("result");
726            let preview = truncate_text(&result, limit);
727            let _ = writeln!(
728                out,
729                "\n{}Result summary:{}\n{}{}{}",
730                c.bold(),
731                c.reset(),
732                c.dim(),
733                preview,
734                c.reset()
735            );
736        }
737        out
738    }
739
740    /// Handle content block delta events
741    fn handle_content_block_delta(
742        &self,
743        session: &mut std::cell::RefMut<'_, StreamingSession>,
744        index: u64,
745        delta: ContentBlockDelta,
746    ) -> String {
747        let c = &self.colors;
748        let prefix = &self.display_name;
749
750        match delta {
751            ContentBlockDelta::TextDelta { text: Some(text) } => {
752                // Check for snapshot-as-delta bug (GLM sending full accumulated content)
753                // If detected, extract only the delta portion
754                let index_str = index.to_string();
755                let text_to_process = if session.is_likely_snapshot(&text, &index_str) {
756                    // Snapshot detected - extract delta (auto-corrects GLM/CCS quirk)
757                    session
758                        .get_delta_from_snapshot(&text, &index_str)
759                        .unwrap_or(&text)
760                } else {
761                    // Genuine delta - use as-is
762                    &text
763                };
764
765                // Check for empty delta after snapshot extraction
766                if text_to_process.is_empty() {
767                    return String::new();
768                }
769
770                // Use StreamingSession to track state and determine prefix display
771                let show_prefix = session.on_text_delta(index, text_to_process);
772
773                // Get accumulated text for streaming display
774                let accumulated_text = session
775                    .get_accumulated(ContentType::Text, &index_str)
776                    .unwrap_or("");
777
778                // Sanitize the accumulated text to check if it's empty
779                // This is needed to skip rendering when the accumulated content is just whitespace
780                let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
781
782                // Skip rendering if the sanitized text is empty (e.g., only whitespace)
783                // This prevents rendering empty lines when the accumulated content is just whitespace
784                if sanitized_text.is_empty() {
785                    return String::new();
786                }
787
788                // Check if this sanitized content has already been rendered
789                // This prevents duplicates when accumulated content differs only by whitespace
790                if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
791                {
792                    return String::new();
793                }
794
795                // Use TextDeltaRenderer for consistent rendering
796                let terminal_mode = *self.terminal_mode.borrow();
797
798                // Use prefix trie to detect if new content extends previously rendered content
799                // If yes, we do an in-place update (carriage return + new content)
800                let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
801
802                let output = if show_prefix && !has_prefix {
803                    // First delta with no prefix match - use the renderer with prefix
804                    TextDeltaRenderer::render_first_delta(
805                        accumulated_text,
806                        prefix,
807                        *c,
808                        terminal_mode,
809                    )
810                } else {
811                    // Either continuation OR prefix match - use renderer for in-place update
812                    // This handles the case where "Hello" becomes "Hello World" - we REPLACE
813                    TextDeltaRenderer::render_subsequent_delta(
814                        accumulated_text,
815                        prefix,
816                        *c,
817                        terminal_mode,
818                    )
819                };
820
821                // Mark this sanitized content as rendered for future duplicate detection
822                // We use the sanitized text (not the rendered output) to avoid false positives
823                // when the same accumulated text is rendered with different terminal modes
824                session.mark_rendered(ContentType::Text, &index_str);
825                session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
826
827                output
828            }
829            ContentBlockDelta::ThinkingDelta {
830                thinking: Some(text),
831            } => {
832                // Track thinking deltas
833                session.on_thinking_delta(index, &text);
834                // Display thinking with visual distinction
835                Self::formatter().format_thinking(text.as_str(), prefix, *c)
836            }
837            ContentBlockDelta::ToolUseDelta {
838                tool_use: Some(tool_delta),
839            } => {
840                // Handle tool input streaming
841                // Extract the tool input from the delta
842                let input_str =
843                    tool_delta
844                        .get("input")
845                        .map_or_else(String::new, |input| match input {
846                            serde_json::Value::String(s) => s.clone(),
847                            other => format_tool_input(other),
848                        });
849
850                if input_str.is_empty() {
851                    String::new()
852                } else {
853                    // Accumulate tool input
854                    session.on_tool_input_delta(index, &input_str);
855
856                    // Show partial tool input in real-time
857                    let formatter = DeltaDisplayFormatter::new();
858                    formatter.format_tool_input(&input_str, prefix, *c)
859                }
860            }
861            _ => String::new(),
862        }
863    }
864
865    /// Handle text delta events
866    fn handle_text_delta(
867        &self,
868        session: &mut std::cell::RefMut<'_, StreamingSession>,
869        text: &str,
870    ) -> String {
871        let c = &self.colors;
872        let prefix = &self.display_name;
873
874        // Standalone text delta (not part of content block)
875        // Use default index "0" for standalone text
876        let default_index = 0u64;
877        let default_index_str = "0";
878
879        // Check for snapshot-as-delta bug
880        let text_to_process = if session.is_likely_snapshot(text, default_index_str) {
881            // Snapshot detected - extract delta (auto-corrects GLM/CCS quirk)
882            session
883                .get_delta_from_snapshot(text, default_index_str)
884                .unwrap_or(text)
885        } else {
886            text
887        };
888
889        // Check for empty delta after snapshot extraction
890        if text_to_process.is_empty() {
891            return String::new();
892        }
893
894        let show_prefix = session.on_text_delta(default_index, text_to_process);
895
896        // Get accumulated text for streaming display
897        let accumulated_text = session
898            .get_accumulated(ContentType::Text, default_index_str)
899            .unwrap_or("");
900
901        // Sanitize the accumulated text to check if it's empty
902        // This is needed to skip rendering when the accumulated content is just whitespace
903        let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
904
905        // Skip rendering if the sanitized text is empty (e.g., only whitespace)
906        // This prevents rendering empty lines when the accumulated content is just whitespace
907        if sanitized_text.is_empty() {
908            return String::new();
909        }
910
911        // Check if this sanitized content has already been rendered
912        // This prevents duplicates when accumulated content differs only by whitespace
913        if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
914            return String::new();
915        }
916
917        // Use TextDeltaRenderer for consistent rendering across all parsers
918        let terminal_mode = *self.terminal_mode.borrow();
919
920        // Use prefix trie to detect if new content extends previously rendered content
921        // If yes, we do an in-place update (carriage return + new content)
922        let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
923
924        let output = if show_prefix && !has_prefix {
925            // First delta with no prefix match - use the renderer with prefix
926            TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
927        } else {
928            // Either continuation OR prefix match - use renderer for in-place update
929            // This handles the case where "Hello" becomes "Hello World" - we REPLACE
930            TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
931        };
932
933        // Mark this sanitized content as rendered for future duplicate detection
934        // We use the sanitized text (not the rendered output) to avoid false positives
935        // when the same accumulated text is rendered with different terminal modes
936        session.mark_rendered(ContentType::Text, default_index_str);
937        session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
938
939        output
940    }
941
942    /// Handle message stop events
943    fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
944        let c = &self.colors;
945
946        // Message complete - add final newline if we were in a content block
947        // OR if any content was streamed (handles edge cases where block state
948        // may not have been set but content was still streamed)
949        let metrics = session.get_streaming_quality_metrics();
950        let was_in_block = session.on_message_stop();
951        let had_content = session.has_any_streamed_content();
952        if was_in_block || had_content {
953            // Use TextDeltaRenderer for completion - adds final newline
954            let terminal_mode = *self.terminal_mode.borrow();
955            let completion = format!(
956                "{}{}",
957                c.reset(),
958                TextDeltaRenderer::render_completion(terminal_mode)
959            );
960            // Show streaming quality metrics in debug mode or when flag is set
961            let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
962                && metrics.total_deltas > 0;
963            if show_metrics {
964                format!("{}\n{}", completion, metrics.format(*c))
965            } else {
966                completion
967            }
968        } else {
969            String::new()
970        }
971    }
972
973    /// Handle error events
974    fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
975        let c = &self.colors;
976        let prefix = &self.display_name;
977
978        let msg = err
979            .message
980            .unwrap_or_else(|| "Unknown streaming error".to_string());
981        format!(
982            "{}[{}]{} {}Error: {}{}\n",
983            c.dim(),
984            prefix,
985            c.reset(),
986            c.red(),
987            msg,
988            c.reset()
989        )
990    }
991
992    /// Handle unknown events
993    fn handle_unknown_event(&self) -> String {
994        let c = &self.colors;
995        let prefix = &self.display_name;
996
997        // Unknown stream event - in debug mode, log it
998        if self.verbosity.is_debug() {
999            format!(
1000                "{}[{}]{} {}Unknown streaming event{}\n",
1001                c.dim(),
1002                prefix,
1003                c.reset(),
1004                c.dim(),
1005                c.reset()
1006            )
1007        } else {
1008            String::new()
1009        }
1010    }
1011
1012    /// Check if a Claude event is a control event (state management with no user output)
1013    ///
1014    /// Control events are valid JSON that represent state transitions rather than
1015    /// user-facing content. They should be tracked separately from "ignored" events
1016    /// to avoid false health warnings.
1017    const fn is_control_event(event: &ClaudeEvent) -> bool {
1018        match event {
1019            // Stream events that are control events
1020            ClaudeEvent::StreamEvent { event } => matches!(
1021                event,
1022                StreamInnerEvent::MessageStart { .. }
1023                    | StreamInnerEvent::ContentBlockStart { .. }
1024                    | StreamInnerEvent::ContentBlockStop { .. }
1025                    | StreamInnerEvent::MessageDelta { .. }
1026                    | StreamInnerEvent::MessageStop
1027                    | StreamInnerEvent::Ping
1028            ),
1029            _ => false,
1030        }
1031    }
1032
1033    /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1034    ///
1035    /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1036    /// tool input deltas) that are shown to the user in real-time. These should be
1037    /// tracked separately to avoid inflating "ignored" percentages.
1038    const fn is_partial_event(event: &ClaudeEvent) -> bool {
1039        match event {
1040            // Stream events that produce incremental content
1041            ClaudeEvent::StreamEvent { event } => matches!(
1042                event,
1043                StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1044            ),
1045            _ => false,
1046        }
1047    }
1048
1049    /// Get a shared delta display formatter
1050    const fn formatter() -> DeltaDisplayFormatter {
1051        DeltaDisplayFormatter::new()
1052    }
1053
1054    /// Parse a stream of Claude NDJSON events
1055    pub fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
1056        use super::incremental_parser::IncrementalNdjsonParser;
1057
1058        let c = &self.colors;
1059        let monitor = HealthMonitor::new("Claude");
1060        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
1061            std::fs::OpenOptions::new()
1062                .create(true)
1063                .append(true)
1064                .open(log_path)
1065                .ok()
1066                .map(std::io::BufWriter::new)
1067        });
1068
1069        // Use incremental parser for true real-time streaming
1070        // This processes JSON as soon as it's complete, not waiting for newlines
1071        let mut incremental_parser = IncrementalNdjsonParser::new();
1072        let mut byte_buffer = Vec::new();
1073
1074        loop {
1075            // Read available bytes
1076            byte_buffer.clear();
1077            let chunk = reader.fill_buf()?;
1078            if chunk.is_empty() {
1079                break;
1080            }
1081
1082            // Process all bytes immediately
1083            byte_buffer.extend_from_slice(chunk);
1084            let consumed = chunk.len();
1085            reader.consume(consumed);
1086
1087            // Feed bytes to incremental parser
1088            let json_events = incremental_parser.feed(&byte_buffer);
1089
1090            // Process each complete JSON event immediately
1091            for line in json_events {
1092                let trimmed = line.trim();
1093                if trimmed.is_empty() {
1094                    continue;
1095                }
1096
1097                // In debug mode, also show the raw JSON
1098                if self.verbosity.is_debug() {
1099                    eprintln!(
1100                        "{}[DEBUG]{} {}{}{}",
1101                        c.dim(),
1102                        c.reset(),
1103                        c.dim(),
1104                        &line,
1105                        c.reset()
1106                    );
1107                }
1108
1109                // Parse the event once - parse_event handles malformed JSON by returning None
1110                match self.parse_event(&line) {
1111                    Some(output) => {
1112                        // Check if this is a partial/delta event (streaming content)
1113                        if trimmed.starts_with('{') {
1114                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1115                                if Self::is_partial_event(&event) {
1116                                    monitor.record_partial_event();
1117                                } else {
1118                                    monitor.record_parsed();
1119                                }
1120                            } else {
1121                                monitor.record_parsed();
1122                            }
1123                        } else {
1124                            monitor.record_parsed();
1125                        }
1126                        // Write output to printer
1127                        let mut printer = self.printer.borrow_mut();
1128                        write!(printer, "{output}")?;
1129                        printer.flush()?;
1130                    }
1131                    None => {
1132                        // Check if this was a control event (state management with no user output)
1133                        // Control events are valid JSON that return empty output but aren't "ignored"
1134                        if trimmed.starts_with('{') {
1135                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1136                                if Self::is_control_event(&event) {
1137                                    monitor.record_control_event();
1138                                } else {
1139                                    // Valid JSON but not a control event - track as unknown
1140                                    monitor.record_unknown_event();
1141                                }
1142                            } else {
1143                                // Failed to deserialize - track as parse error
1144                                monitor.record_parse_error();
1145                            }
1146                        } else {
1147                            monitor.record_ignored();
1148                        }
1149                    }
1150                }
1151
1152                // Log raw JSON to file if configured
1153                if let Some(ref mut file) = log_writer {
1154                    writeln!(file, "{line}")?;
1155                }
1156            }
1157        }
1158
1159        if let Some(ref mut file) = log_writer {
1160            file.flush()?;
1161        }
1162        if let Some(warning) = monitor.check_and_warn(*c) {
1163            let mut printer = self.printer.borrow_mut();
1164            writeln!(printer, "{warning}")?;
1165            printer.flush()?;
1166        }
1167        Ok(())
1168    }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173    #[cfg(test)]
1174    use super::*;
1175    #[cfg(test)]
1176    use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1177
1178    #[test]
1179    #[cfg(test)]
1180    fn test_printer_method_accessible() {
1181        // Test that the printer() method is accessible and returns a SharedPrinter
1182        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1183        let parser =
1184            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1185
1186        // This test verifies the printer() method is accessible
1187        let _printer_ref = parser.printer();
1188    }
1189
1190    #[test]
1191    #[cfg(test)]
1192    fn test_streaming_metrics_method_accessible() {
1193        // Test that the streaming_metrics() method is accessible
1194        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1195        let parser =
1196            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1197
1198        // This test verifies the streaming_metrics() method is accessible
1199        let _metrics = parser.streaming_metrics();
1200    }
1201}