ralph_workflow/json_parser/
claude.rs

1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r          (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r  (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n    (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46#[cfg(feature = "test-utils")]
47use super::health::StreamingQualityMetrics;
48use super::printer::SharedPrinter;
49use super::streaming_state::StreamingSession;
50use super::terminal::TerminalMode;
51use super::types::{
52    format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
53    ContentType, StreamInnerEvent,
54};
55
56/// Claude event parser
57///
58/// Note: This parser is designed for single-threaded use only.
59/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
60/// Do not share this parser across threads.
61pub struct ClaudeParser {
62    colors: Colors,
63    pub(crate) verbosity: Verbosity,
64    log_file: Option<String>,
65    display_name: String,
66    /// Unified streaming session tracker
67    /// Provides single source of truth for streaming state across all content types
68    streaming_session: Rc<RefCell<StreamingSession>>,
69    /// Terminal mode for output formatting
70    /// Detected at parse time and cached for performance
71    terminal_mode: RefCell<TerminalMode>,
72    /// Whether to show streaming quality metrics
73    show_streaming_metrics: bool,
74    /// Output printer for capturing or displaying output
75    printer: SharedPrinter,
76}
77
78impl ClaudeParser {
79    /// Create a new `ClaudeParser` with the given colors and verbosity.
80    ///
81    /// # Arguments
82    ///
83    /// * `colors` - Colors for terminal output
84    /// * `verbosity` - Verbosity level for output
85    ///
86    /// # Returns
87    ///
88    /// A new `ClaudeParser` instance
89    ///
90    /// # Example
91    ///
92    /// ```ignore
93    /// use ralph_workflow::json_parser::ClaudeParser;
94    /// use ralph_workflow::logger::Colors;
95    /// use ralph_workflow::config::Verbosity;
96    ///
97    /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
98    /// ```
99    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
100        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
101    }
102
103    /// Create a new `ClaudeParser` with a custom printer.
104    ///
105    /// # Arguments
106    ///
107    /// * `colors` - Colors for terminal output
108    /// * `verbosity` - Verbosity level for output
109    /// * `printer` - Shared printer for output
110    ///
111    /// # Returns
112    ///
113    /// A new `ClaudeParser` instance
114    pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
115        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
116        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
117
118        // Use the printer's is_terminal method to validate it's connected correctly
119        // This is a sanity check that also satisfies the compiler that the method is used
120        let _printer_is_terminal = printer.borrow().is_terminal();
121
122        Self {
123            colors,
124            verbosity,
125            log_file: None,
126            display_name: "Claude".to_string(),
127            streaming_session: Rc::new(RefCell::new(streaming_session)),
128            terminal_mode: RefCell::new(TerminalMode::detect()),
129            show_streaming_metrics: false,
130            printer,
131        }
132    }
133
134    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
135        self.show_streaming_metrics = show;
136        self
137    }
138
139    /// Set the display name for this parser.
140    ///
141    /// # Arguments
142    ///
143    /// * `display_name` - The name to display in output
144    ///
145    /// # Returns
146    ///
147    /// Self for builder pattern chaining
148    pub fn with_display_name(mut self, display_name: &str) -> Self {
149        self.display_name = display_name.to_string();
150        self
151    }
152
153    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
154        self.log_file = Some(path.to_string());
155        self
156    }
157
158    /// Set the terminal mode for this parser.
159    ///
160    /// # Arguments
161    ///
162    /// * `mode` - The terminal mode to use
163    ///
164    /// # Returns
165    ///
166    /// Self for builder pattern chaining
167    #[cfg(any(test, feature = "test-utils"))]
168    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
169        *self.terminal_mode.borrow_mut() = mode;
170        self
171    }
172
173    /// Get a shared reference to the printer.
174    ///
175    /// This allows tests, monitoring, and other code to access the printer after parsing
176    /// to verify output content, check for duplicates, or capture output for analysis.
177    ///
178    /// # Returns
179    ///
180    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
181    ///
182    /// # Example
183    ///
184    /// ```ignore
185    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
186    /// use std::rc::Rc;
187    /// use std::cell::RefCell;
188    ///
189    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
190    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
191    ///
192    /// // Parse events...
193    ///
194    /// // Now access the printer to verify output
195    /// let printer_ref = parser.printer().borrow();
196    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
197    /// ```
198    /// Get a clone of the printer used by this parser.
199    ///
200    /// This is primarily useful for testing and monitoring.
201    /// Only available with the `test-utils` feature.
202    #[cfg(feature = "test-utils")]
203    pub fn printer(&self) -> SharedPrinter {
204        Rc::clone(&self.printer)
205    }
206
207    /// Get streaming quality metrics from the current session.
208    ///
209    /// This provides insight into the deduplication and streaming quality of the
210    /// parsing session, including:
211    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
212    /// - Number of large deltas (potential protocol violations)
213    /// - Total deltas processed
214    ///
215    /// Useful for testing, monitoring, and debugging streaming behavior.
216    /// Only available with the `test-utils` feature.
217    ///
218    /// # Returns
219    ///
220    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
221    ///
222    /// # Example
223    ///
224    /// ```ignore
225    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
226    /// use std::rc::Rc;
227    /// use std::cell::RefCell;
228    ///
229    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
230    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
231    ///
232    /// // Parse events...
233    ///
234    /// // Verify deduplication logic triggered
235    /// let metrics = parser.streaming_metrics();
236    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
237    /// ```
238    #[cfg(feature = "test-utils")]
239    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
240        self.streaming_session
241            .borrow()
242            .get_streaming_quality_metrics()
243    }
244
245    /// Parse and display a single Claude JSON event
246    ///
247    /// Returns `Some(formatted_output)` for valid events, or None for:
248    /// - Malformed JSON (logged at debug level)
249    /// - Unknown event types
250    /// - Empty or whitespace-only output
251    pub fn parse_event(&self, line: &str) -> Option<String> {
252        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
253            e
254        } else {
255            // Non-JSON line - could be raw text output from agent
256            // Pass through as-is if it looks like real output (not empty)
257            let trimmed = line.trim();
258            if !trimmed.is_empty() && !trimmed.starts_with('{') {
259                return Some(format!("{trimmed}\n"));
260            }
261            return None;
262        };
263        let c = &self.colors;
264        let prefix = &self.display_name;
265
266        let output = match event {
267            ClaudeEvent::System {
268                subtype,
269                session_id,
270                cwd,
271            } => self.format_system_event(subtype.as_ref(), session_id, cwd),
272            ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
273            ClaudeEvent::User { message } => self.format_user_event(message),
274            ClaudeEvent::Result {
275                subtype,
276                duration_ms,
277                total_cost_usd,
278                num_turns,
279                result,
280                error,
281            } => self.format_result_event(
282                subtype,
283                duration_ms,
284                total_cost_usd,
285                num_turns,
286                result,
287                error,
288            ),
289            ClaudeEvent::StreamEvent { event } => {
290                // Handle streaming events for delta/partial updates
291                self.parse_stream_event(event)
292            }
293            ClaudeEvent::Unknown => {
294                // Use the generic unknown event formatter for consistent handling
295                // In verbose mode, this will show the event type and key fields
296                // In normal mode, this returns empty string
297                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
298            }
299        };
300
301        if output.is_empty() {
302            None
303        } else {
304            Some(output)
305        }
306    }
307
308    /// Parse a streaming event for delta/partial updates
309    ///
310    /// Handles the nested events within `stream_event`:
311    /// - MessageStart/Stop: Manage session state
312    /// - `ContentBlockStart`: Initialize new content blocks
313    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
314    /// - `ContentBlockStop`: Finalize content blocks
315    /// - `MessageDelta`: Process message metadata without output
316    /// - Error: Display appropriately
317    ///
318    /// Returns String for display content, empty String for control events.
319    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
320        let mut session = self.streaming_session.borrow_mut();
321
322        match event {
323            StreamInnerEvent::MessageStart {
324                message,
325                message_id,
326            } => {
327                // Extract message_id from either the top-level field or nested message.id
328                // The Claude API typically puts the ID in message.id, not at the top level
329                let effective_message_id =
330                    message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
331                // Set message ID for tracking and clear session state on new message
332                session.set_current_message_id(effective_message_id);
333                session.on_message_start();
334                String::new()
335            }
336            StreamInnerEvent::ContentBlockStart {
337                index: Some(index),
338                content_block: Some(block),
339            } => {
340                // Initialize a new content block at this index
341                session.on_content_block_start(index);
342                match &block {
343                    ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
344                        // Initial text in ContentBlockStart - treat as first delta
345                        session.on_text_delta(index, t);
346                    }
347                    ContentBlock::ToolUse { name, input } => {
348                        // Track tool name for GLM/CCS deduplication.
349                        // IMPORTANT: Track the tool name when provided, even when input is None.
350                        // GLM may send ContentBlockStart with name but no input, then send input via delta.
351                        // We only store when we have a name to avoid overwriting a previous tool name with None.
352                        if let Some(n) = name {
353                            session.set_tool_name(index, Some(n.clone()));
354                        }
355
356                        // Initialize tool input accumulator only if input is present
357                        if let Some(i) = input {
358                            let input_str = if let serde_json::Value::String(s) = &i {
359                                s.clone()
360                            } else {
361                                format_tool_input(i)
362                            };
363                            session.on_tool_input_delta(index, &input_str);
364                        }
365                    }
366                    _ => {}
367                }
368                String::new()
369            }
370            StreamInnerEvent::ContentBlockStart {
371                index: Some(index),
372                content_block: None,
373            } => {
374                // Content block started but no initial content provided
375                session.on_content_block_start(index);
376                String::new()
377            }
378            StreamInnerEvent::ContentBlockStart { .. } => {
379                // Content block without index - ignore
380                String::new()
381            }
382            StreamInnerEvent::ContentBlockDelta {
383                index: Some(index),
384                delta: Some(delta),
385            } => self.handle_content_block_delta(&mut session, index, delta),
386            StreamInnerEvent::TextDelta { text: Some(text) } => {
387                self.handle_text_delta(&mut session, &text)
388            }
389            StreamInnerEvent::ContentBlockStop { .. } => {
390                // Content block completion event - no output needed
391                // This event marks the end of a content block but doesn't produce
392                // any displayable content. It's a control event for state management.
393                String::new()
394            }
395            StreamInnerEvent::MessageDelta { .. } => {
396                // Message delta event with usage/metadata - no output needed
397                // This event contains final message metadata (stop_reason, usage stats)
398                // but is used for tracking/monitoring purposes only, not display.
399                String::new()
400            }
401            StreamInnerEvent::ContentBlockDelta { .. }
402            | StreamInnerEvent::Ping
403            | StreamInnerEvent::TextDelta { text: None }
404            | StreamInnerEvent::Error { error: None } => String::new(),
405            StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
406            StreamInnerEvent::Error {
407                error: Some(err), ..
408            } => self.handle_error_event(err),
409            StreamInnerEvent::Unknown => self.handle_unknown_event(),
410        }
411    }
412
413    /// Format a system event
414    fn format_system_event(
415        &self,
416        subtype: Option<&String>,
417        session_id: Option<String>,
418        cwd: Option<String>,
419    ) -> String {
420        let c = &self.colors;
421        let prefix = &self.display_name;
422
423        if subtype.map(std::string::String::as_str) == Some("init") {
424            let sid = session_id.unwrap_or_else(|| "unknown".to_string());
425            let mut out = format!(
426                "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
427                c.dim(),
428                prefix,
429                c.reset(),
430                c.cyan(),
431                c.reset(),
432                c.dim(),
433                sid,
434                c.reset()
435            );
436            if let Some(cwd) = cwd {
437                let _ = writeln!(
438                    out,
439                    "{}[{}]{} {}Working dir: {}{}",
440                    c.dim(),
441                    prefix,
442                    c.reset(),
443                    c.dim(),
444                    cwd,
445                    c.reset()
446                );
447            }
448            out
449        } else {
450            format!(
451                "{}[{}]{} {}{}{}\n",
452                c.dim(),
453                prefix,
454                c.reset(),
455                c.cyan(),
456                subtype.map_or("system", |s| s.as_str()),
457                c.reset()
458            )
459        }
460    }
461
462    /// Extract content from assistant message for hash-based deduplication.
463    ///
464    /// This includes both text and tool_use blocks, normalized for comparison.
465    /// Tool_use blocks are serialized in a deterministic way (name + sorted input JSON)
466    /// to ensure semantically identical tool calls produce the same hash.
467    ///
468    /// # Returns
469    /// A tuple of (normalized_content, tool_names_by_index) where:
470    /// - normalized_content: The concatenated normalized content (text + tool_use markers)
471    /// - tool_names_by_index: Map from content block index to tool name (for tool_use blocks)
472    fn extract_text_content_for_hash(
473        message: Option<&crate::json_parser::types::AssistantMessage>,
474    ) -> Option<(String, std::collections::HashMap<usize, String>)> {
475        message?.content.as_ref().map(|content| {
476            let mut normalized_parts = Vec::new();
477            let mut tool_names = std::collections::HashMap::new();
478
479            for (index, block) in content.iter().enumerate() {
480                match block {
481                    ContentBlock::Text { text } => {
482                        if let Some(text) = text.as_deref() {
483                            normalized_parts.push(text.to_string());
484                        }
485                    }
486                    ContentBlock::ToolUse { name, input } => {
487                        // Track tool name by index for hash-based deduplication
488                        if let Some(name_str) = name.as_deref() {
489                            tool_names.insert(index, name_str.to_string());
490                        }
491
492                        // Normalize tool_use for hash comparison:
493                        // Format: "TOOL_USE:{name}:{sorted_json_input}"
494                        // Sorting JSON keys ensures identical inputs produce same hash
495                        let normalized = format!(
496                            "TOOL_USE:{}:{}",
497                            name.as_deref().unwrap_or(""),
498                            input
499                                .as_ref()
500                                .map(|v| {
501                                    // Sort JSON keys for deterministic serialization
502                                    if v.is_object() {
503                                        serde_json::to_string(v).ok()
504                                    } else if v.is_string() {
505                                        v.as_str().map(|s| s.to_string())
506                                    } else {
507                                        serde_json::to_string(v).ok()
508                                    }
509                                    .unwrap_or_default()
510                                })
511                                .unwrap_or_default()
512                        );
513                        normalized_parts.push(normalized);
514                    }
515                    _ => {}
516                }
517            }
518
519            (normalized_parts.join(""), tool_names)
520        })
521    }
522
523    /// Check if this assistant message is a duplicate of already-streamed content.
524    fn is_duplicate_assistant_message(
525        &self,
526        message: Option<&crate::json_parser::types::AssistantMessage>,
527    ) -> bool {
528        use std::collections::hash_map::DefaultHasher;
529        use std::hash::{Hash, Hasher};
530
531        let session = self.streaming_session.borrow();
532
533        // Extract message_id from the assistant message
534        let assistant_msg_id = message.and_then(|m| m.id.as_ref());
535
536        // Check if this assistant event has a message_id that matches the current streaming message
537        // If it does, and we have streamed content, then this assistant event is a duplicate
538        // because the content was already streamed via deltas.
539        if let Some(ast_msg_id) = assistant_msg_id {
540            // Check if message was already marked as displayed (after message_stop)
541            if session.is_duplicate_final_message(ast_msg_id) {
542                return true;
543            }
544
545            // Check if the assistant message_id matches the current streaming message_id
546            if session.get_current_message_id() == Some(ast_msg_id) {
547                // Same message - check if we have streamed any content
548                // If yes, the assistant event is a duplicate
549                if session.has_any_streamed_content() {
550                    return true;
551                }
552            }
553        }
554
555        // Check if this exact assistant content has been rendered before
556        // This handles the case where GLM sends multiple assistant events during
557        // streaming with the same content but different message_ids
558        let content_for_hash = Self::extract_text_content_for_hash(message);
559        if let Some((ref text_content, _)) = content_for_hash {
560            if !text_content.is_empty() {
561                // Compute hash of the assistant event content
562                let mut hasher = DefaultHasher::new();
563                text_content.hash(&mut hasher);
564                let content_hash = hasher.finish();
565
566                // Check if this content was already rendered
567                if session.is_assistant_content_rendered(content_hash) {
568                    return true;
569                }
570            }
571        }
572
573        // If no message_id match, fall back to hash-based deduplication
574        // extract_text_content_for_hash now returns (content, tool_names_by_index)
575        if let Some((ref text_content, ref tool_names)) = content_for_hash {
576            if !text_content.is_empty() {
577                return session.is_duplicate_by_hash(text_content, Some(tool_names));
578            }
579        }
580
581        // Fallback to coarse check
582        session.has_any_streamed_content()
583    }
584
585    /// Format a text content block for assistant output.
586    fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
587        let limit = self.verbosity.truncate_limit("text");
588        let preview = truncate_text(text, limit);
589        let _ = writeln!(
590            out,
591            "{}[{}]{} {}{}{}",
592            colors.dim(),
593            prefix,
594            colors.reset(),
595            colors.white(),
596            preview,
597            colors.reset()
598        );
599    }
600
601    /// Format a tool use content block for assistant output.
602    fn format_tool_use_block(
603        &self,
604        out: &mut String,
605        tool: Option<&String>,
606        input: Option<&serde_json::Value>,
607        prefix: &str,
608        colors: Colors,
609    ) {
610        let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
611        let _ = writeln!(
612            out,
613            "{}[{}]{} {}Tool{}: {}{}{}",
614            colors.dim(),
615            prefix,
616            colors.reset(),
617            colors.magenta(),
618            colors.reset(),
619            colors.bold(),
620            tool_name,
621            colors.reset(),
622        );
623
624        // Show tool input details at Normal and above (not just Verbose)
625        // Tool inputs provide crucial context for understanding agent actions
626        if self.verbosity.show_tool_input() {
627            if let Some(input_val) = input {
628                let input_str = format_tool_input(input_val);
629                let limit = self.verbosity.truncate_limit("tool_input");
630                let preview = truncate_text(&input_str, limit);
631                if !preview.is_empty() {
632                    let _ = writeln!(
633                        out,
634                        "{}[{}]{} {}  └─ {}{}",
635                        colors.dim(),
636                        prefix,
637                        colors.reset(),
638                        colors.dim(),
639                        preview,
640                        colors.reset()
641                    );
642                }
643            }
644        }
645    }
646
647    /// Format a tool result content block for assistant output.
648    fn format_tool_result_block(
649        &self,
650        out: &mut String,
651        content: &serde_json::Value,
652        prefix: &str,
653        colors: Colors,
654    ) {
655        let content_str = match content {
656            serde_json::Value::String(s) => s.clone(),
657            other => other.to_string(),
658        };
659        let limit = self.verbosity.truncate_limit("tool_result");
660        let preview = truncate_text(&content_str, limit);
661        let _ = writeln!(
662            out,
663            "{}[{}]{} {}Result:{} {}",
664            colors.dim(),
665            prefix,
666            colors.reset(),
667            colors.dim(),
668            colors.reset(),
669            preview
670        );
671    }
672
673    /// Format all content blocks from an assistant message.
674    fn format_content_blocks(
675        &self,
676        out: &mut String,
677        content: &[ContentBlock],
678        prefix: &str,
679        colors: Colors,
680    ) {
681        for block in content {
682            match block {
683                ContentBlock::Text { text } => {
684                    if let Some(text) = text {
685                        self.format_text_block(out, text, prefix, colors);
686                    }
687                }
688                ContentBlock::ToolUse { name, input } => {
689                    self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
690                }
691                ContentBlock::ToolResult { content } => {
692                    if let Some(content) = content {
693                        self.format_tool_result_block(out, content, prefix, colors);
694                    }
695                }
696                ContentBlock::Unknown => {}
697            }
698        }
699    }
700
701    /// Format an assistant event
702    fn format_assistant_event(
703        &self,
704        message: Option<crate::json_parser::types::AssistantMessage>,
705    ) -> String {
706        use std::collections::hash_map::DefaultHasher;
707        use std::hash::{Hash, Hasher};
708
709        // CRITICAL FIX: When ANY content has been streamed via deltas,
710        // the Assistant event should NOT display it again.
711        // The Assistant event represents the "complete" message, but if we've
712        // already shown the streaming deltas, showing it again causes duplication.
713        if self.is_duplicate_assistant_message(message.as_ref()) {
714            return String::new();
715        }
716
717        let mut out = String::new();
718        if let Some(ref msg) = message {
719            if let Some(ref content) = msg.content {
720                self.format_content_blocks(&mut out, content, &self.display_name, self.colors);
721
722                // If we successfully rendered content, mark it as rendered
723                if !out.is_empty() {
724                    let mut session = self.streaming_session.borrow_mut();
725
726                    // Mark the message as pre-rendered so that ALL subsequent streaming
727                    // deltas for this message are suppressed.
728                    // This handles the case where assistant event arrives BEFORE streaming starts.
729                    if let Some(ref message_id) = msg.id {
730                        session.mark_message_pre_rendered(message_id);
731                    }
732
733                    // Mark the assistant content as rendered by hash to prevent duplicate
734                    // assistant events with the same content but different message_ids
735                    if let Some((ref text_content, _)) =
736                        Self::extract_text_content_for_hash(message.as_ref())
737                    {
738                        if !text_content.is_empty() {
739                            let mut hasher = DefaultHasher::new();
740                            text_content.hash(&mut hasher);
741                            let content_hash = hasher.finish();
742                            session.mark_assistant_content_rendered(content_hash);
743                        }
744                    }
745                }
746            }
747        }
748        out
749    }
750
751    /// Format a user event
752    fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
753        let c = &self.colors;
754        let prefix = &self.display_name;
755
756        if let Some(msg) = message {
757            if let Some(content) = msg.content {
758                if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
759                    let limit = self.verbosity.truncate_limit("user");
760                    let preview = truncate_text(text, limit);
761                    return format!(
762                        "{}[{}]{} {}User{}: {}{}{}\n",
763                        c.dim(),
764                        prefix,
765                        c.reset(),
766                        c.blue(),
767                        c.reset(),
768                        c.dim(),
769                        preview,
770                        c.reset()
771                    );
772                }
773            }
774        }
775        String::new()
776    }
777
778    /// Format a result event
779    fn format_result_event(
780        &self,
781        subtype: Option<String>,
782        duration_ms: Option<u64>,
783        total_cost_usd: Option<f64>,
784        num_turns: Option<u32>,
785        result: Option<String>,
786        error: Option<String>,
787    ) -> String {
788        let c = &self.colors;
789        let prefix = &self.display_name;
790
791        let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
792        let duration_m = duration_total_secs / 60;
793        let duration_s_rem = duration_total_secs % 60;
794        let cost = total_cost_usd.unwrap_or(0.0);
795        let turns = num_turns.unwrap_or(0);
796
797        let mut out = if subtype.as_deref() == Some("success") {
798            format!(
799                "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
800                c.dim(),
801                prefix,
802                c.reset(),
803                c.green(),
804                CHECK,
805                c.reset(),
806                c.dim(),
807                duration_m,
808                duration_s_rem,
809                turns,
810                cost,
811                c.reset()
812            )
813        } else {
814            let err = error.unwrap_or_else(|| "unknown error".to_string());
815            format!(
816                "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
817                c.dim(),
818                prefix,
819                c.reset(),
820                c.red(),
821                CROSS,
822                subtype.unwrap_or_else(|| "error".to_string()),
823                c.reset(),
824                err,
825                c.dim(),
826                duration_m,
827                duration_s_rem,
828                c.reset()
829            )
830        };
831
832        if let Some(result) = result {
833            let limit = self.verbosity.truncate_limit("result");
834            let preview = truncate_text(&result, limit);
835            let _ = writeln!(
836                out,
837                "\n{}Result summary:{}\n{}{}{}",
838                c.bold(),
839                c.reset(),
840                c.dim(),
841                preview,
842                c.reset()
843            );
844        }
845        out
846    }
847
848    /// Handle content block delta events
849    fn handle_content_block_delta(
850        &self,
851        session: &mut std::cell::RefMut<'_, StreamingSession>,
852        index: u64,
853        delta: ContentBlockDelta,
854    ) -> String {
855        let c = &self.colors;
856        let prefix = &self.display_name;
857
858        match delta {
859            ContentBlockDelta::TextDelta { text: Some(text) } => {
860                let index_str = index.to_string();
861
862                // Track this delta with StreamingSession for state management.
863                //
864                // StreamingSession handles protocol/streaming quality concerns (including
865                // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
866                // whether a prefix should be displayed for this stream.
867                //
868                // The parser layer still applies additional deduplication:
869                // - Skip whitespace-only accumulated output
870                // - Hash-based deduplication after sanitization (whitespace-insensitive)
871                let show_prefix = session.on_text_delta(index, &text);
872
873                // Get accumulated text for streaming display
874                let accumulated_text = session
875                    .get_accumulated(ContentType::Text, &index_str)
876                    .unwrap_or("");
877
878                // Check if this message was pre-rendered from an assistant event.
879                // When an assistant event arrives BEFORE streaming deltas, we render it
880                // and mark the message_id as pre-rendered. ALL subsequent streaming deltas
881                // for this message should be suppressed to prevent duplication.
882                if let Some(message_id) = session.get_current_message_id() {
883                    if session.is_message_pre_rendered(message_id) {
884                        return String::new();
885                    }
886                }
887
888                // Sanitize the accumulated text to check if it's empty
889                // This is needed to skip rendering when the accumulated content is just whitespace
890                let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
891
892                // Skip rendering if the sanitized text is empty (e.g., only whitespace)
893                // This prevents rendering empty lines when the accumulated content is just whitespace
894                if sanitized_text.is_empty() {
895                    return String::new();
896                }
897
898                // Check if this sanitized content has already been rendered
899                // This prevents duplicates when accumulated content differs only by whitespace
900                if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
901                {
902                    return String::new();
903                }
904
905                // Use TextDeltaRenderer for consistent rendering
906                let terminal_mode = *self.terminal_mode.borrow();
907
908                // Use prefix trie to detect if new content extends previously rendered content
909                // If yes, we do an in-place update (carriage return + new content)
910                let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
911
912                let output = if show_prefix && !has_prefix {
913                    // First delta with no prefix match - use the renderer with prefix
914                    TextDeltaRenderer::render_first_delta(
915                        accumulated_text,
916                        prefix,
917                        *c,
918                        terminal_mode,
919                    )
920                } else {
921                    // Either continuation OR prefix match - use renderer for in-place update
922                    // This handles the case where "Hello" becomes "Hello World" - we REPLACE
923                    TextDeltaRenderer::render_subsequent_delta(
924                        accumulated_text,
925                        prefix,
926                        *c,
927                        terminal_mode,
928                    )
929                };
930
931                // Mark this sanitized content as rendered for future duplicate detection
932                // We use the sanitized text (not the rendered output) to avoid false positives
933                // when the same accumulated text is rendered with different terminal modes
934                session.mark_rendered(ContentType::Text, &index_str);
935                session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
936
937                output
938            }
939            ContentBlockDelta::ThinkingDelta {
940                thinking: Some(text),
941            } => {
942                // Track thinking deltas
943                session.on_thinking_delta(index, &text);
944                // Display thinking with visual distinction
945                Self::formatter().format_thinking(text.as_str(), prefix, *c)
946            }
947            ContentBlockDelta::ToolUseDelta {
948                tool_use: Some(tool_delta),
949            } => {
950                // Track tool name for GLM/CCS deduplication (if available in delta)
951                if let Some(serde_json::Value::String(name)) = tool_delta.get("name") {
952                    session.set_tool_name(index, Some(name.to_string()));
953                }
954
955                // Handle tool input streaming
956                // Extract the tool input from the delta
957                let input_str =
958                    tool_delta
959                        .get("input")
960                        .map_or_else(String::new, |input| match input {
961                            serde_json::Value::String(s) => s.clone(),
962                            other => format_tool_input(other),
963                        });
964
965                if input_str.is_empty() {
966                    String::new()
967                } else {
968                    // Accumulate tool input
969                    session.on_tool_input_delta(index, &input_str);
970
971                    // Show partial tool input in real-time
972                    let formatter = DeltaDisplayFormatter::new();
973                    formatter.format_tool_input(&input_str, prefix, *c)
974                }
975            }
976            _ => String::new(),
977        }
978    }
979
980    /// Handle text delta events
981    fn handle_text_delta(
982        &self,
983        session: &mut std::cell::RefMut<'_, StreamingSession>,
984        text: &str,
985    ) -> String {
986        let c = &self.colors;
987        let prefix = &self.display_name;
988
989        // Standalone text delta (not part of content block)
990        // Use default index "0" for standalone text
991        let default_index = 0u64;
992        let default_index_str = "0";
993
994        // Track this delta with StreamingSession for state management.
995        //
996        // StreamingSession handles protocol/streaming quality concerns (including
997        // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
998        // whether a prefix should be displayed for this stream.
999        //
1000        // The parser layer still applies additional deduplication:
1001        // - Skip whitespace-only accumulated output
1002        // - Hash-based deduplication after sanitization (whitespace-insensitive)
1003        let show_prefix = session.on_text_delta(default_index, text);
1004
1005        // Get accumulated text for streaming display
1006        let accumulated_text = session
1007            .get_accumulated(ContentType::Text, default_index_str)
1008            .unwrap_or("");
1009
1010        // Sanitize the accumulated text to check if it's empty
1011        // This is needed to skip rendering when the accumulated content is just whitespace
1012        let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
1013
1014        // Skip rendering if the sanitized text is empty (e.g., only whitespace)
1015        // This prevents rendering empty lines when the accumulated content is just whitespace
1016        if sanitized_text.is_empty() {
1017            return String::new();
1018        }
1019
1020        // Check if this sanitized content has already been rendered
1021        // This prevents duplicates when accumulated content differs only by whitespace
1022        if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
1023            return String::new();
1024        }
1025
1026        // Use TextDeltaRenderer for consistent rendering across all parsers
1027        let terminal_mode = *self.terminal_mode.borrow();
1028
1029        // Use prefix trie to detect if new content extends previously rendered content
1030        // If yes, we do an in-place update (carriage return + new content)
1031        let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
1032
1033        let output = if show_prefix && !has_prefix {
1034            // First delta with no prefix match - use the renderer with prefix
1035            TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
1036        } else {
1037            // Either continuation OR prefix match - use renderer for in-place update
1038            // This handles the case where "Hello" becomes "Hello World" - we REPLACE
1039            TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
1040        };
1041
1042        // Mark this sanitized content as rendered for future duplicate detection
1043        // We use the sanitized text (not the rendered output) to avoid false positives
1044        // when the same accumulated text is rendered with different terminal modes
1045        session.mark_rendered(ContentType::Text, default_index_str);
1046        session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
1047
1048        output
1049    }
1050
1051    /// Handle message stop events
1052    fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
1053        let c = &self.colors;
1054
1055        // Message complete - add final newline if we were in a content block
1056        // OR if any content was streamed (handles edge cases where block state
1057        // may not have been set but content was still streamed)
1058        let metrics = session.get_streaming_quality_metrics();
1059        let was_in_block = session.on_message_stop();
1060        let had_content = session.has_any_streamed_content();
1061        if was_in_block || had_content {
1062            // Use TextDeltaRenderer for completion - adds final newline
1063            let terminal_mode = *self.terminal_mode.borrow();
1064            let completion = format!(
1065                "{}{}",
1066                c.reset(),
1067                TextDeltaRenderer::render_completion(terminal_mode)
1068            );
1069            // Show streaming quality metrics in debug mode or when flag is set
1070            let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
1071                && metrics.total_deltas > 0;
1072            if show_metrics {
1073                format!("{}\n{}", completion, metrics.format(*c))
1074            } else {
1075                completion
1076            }
1077        } else {
1078            String::new()
1079        }
1080    }
1081
1082    /// Handle error events
1083    fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
1084        let c = &self.colors;
1085        let prefix = &self.display_name;
1086
1087        let msg = err
1088            .message
1089            .unwrap_or_else(|| "Unknown streaming error".to_string());
1090        format!(
1091            "{}[{}]{} {}Error: {}{}\n",
1092            c.dim(),
1093            prefix,
1094            c.reset(),
1095            c.red(),
1096            msg,
1097            c.reset()
1098        )
1099    }
1100
1101    /// Handle unknown events
1102    fn handle_unknown_event(&self) -> String {
1103        let c = &self.colors;
1104        let prefix = &self.display_name;
1105
1106        // Unknown stream event - in debug mode, log it
1107        if self.verbosity.is_debug() {
1108            format!(
1109                "{}[{}]{} {}Unknown streaming event{}\n",
1110                c.dim(),
1111                prefix,
1112                c.reset(),
1113                c.dim(),
1114                c.reset()
1115            )
1116        } else {
1117            String::new()
1118        }
1119    }
1120
1121    /// Check if a Claude event is a control event (state management with no user output)
1122    ///
1123    /// Control events are valid JSON that represent state transitions rather than
1124    /// user-facing content. They should be tracked separately from "ignored" events
1125    /// to avoid false health warnings.
1126    const fn is_control_event(event: &ClaudeEvent) -> bool {
1127        match event {
1128            // Stream events that are control events
1129            ClaudeEvent::StreamEvent { event } => matches!(
1130                event,
1131                StreamInnerEvent::MessageStart { .. }
1132                    | StreamInnerEvent::ContentBlockStart { .. }
1133                    | StreamInnerEvent::ContentBlockStop { .. }
1134                    | StreamInnerEvent::MessageDelta { .. }
1135                    | StreamInnerEvent::MessageStop
1136                    | StreamInnerEvent::Ping
1137            ),
1138            _ => false,
1139        }
1140    }
1141
1142    /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1143    ///
1144    /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1145    /// tool input deltas) that are shown to the user in real-time. These should be
1146    /// tracked separately to avoid inflating "ignored" percentages.
1147    const fn is_partial_event(event: &ClaudeEvent) -> bool {
1148        match event {
1149            // Stream events that produce incremental content
1150            ClaudeEvent::StreamEvent { event } => matches!(
1151                event,
1152                StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1153            ),
1154            _ => false,
1155        }
1156    }
1157
1158    /// Get a shared delta display formatter
1159    const fn formatter() -> DeltaDisplayFormatter {
1160        DeltaDisplayFormatter::new()
1161    }
1162
1163    /// Parse a stream of Claude NDJSON events
1164    pub fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
1165        use super::incremental_parser::IncrementalNdjsonParser;
1166
1167        let c = &self.colors;
1168        let monitor = HealthMonitor::new("Claude");
1169        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
1170            std::fs::OpenOptions::new()
1171                .create(true)
1172                .append(true)
1173                .open(log_path)
1174                .ok()
1175                .map(std::io::BufWriter::new)
1176        });
1177
1178        // Use incremental parser for true real-time streaming
1179        // This processes JSON as soon as it's complete, not waiting for newlines
1180        let mut incremental_parser = IncrementalNdjsonParser::new();
1181        let mut byte_buffer = Vec::new();
1182
1183        // Track whether we've seen a success result event for GLM/ccs-glm compatibility
1184        // Some agents (GLM via CCS) emit both a success result and an error_during_execution
1185        // result when they exit with code 1 despite producing valid output. We suppress
1186        // the spurious error event to avoid confusing duplicate output.
1187        let mut seen_success_result = false;
1188
1189        loop {
1190            // Read available bytes
1191            byte_buffer.clear();
1192            let chunk = reader.fill_buf()?;
1193            if chunk.is_empty() {
1194                break;
1195            }
1196
1197            // Process all bytes immediately
1198            byte_buffer.extend_from_slice(chunk);
1199            let consumed = chunk.len();
1200            reader.consume(consumed);
1201
1202            // Feed bytes to incremental parser
1203            let json_events = incremental_parser.feed(&byte_buffer);
1204
1205            // Process each complete JSON event immediately
1206            for line in json_events {
1207                let trimmed = line.trim();
1208                if trimmed.is_empty() {
1209                    continue;
1210                }
1211
1212                // Check for Result events to handle GLM/ccs-glm duplicate event bug
1213                // Some agents emit both success and error_during_execution results
1214                let should_skip_result = if trimmed.starts_with('{') {
1215                    // First, check if the JSON has an 'errors' field with actual error messages.
1216                    // This is important because Claude events can have either 'error' (string)
1217                    // or 'errors' (array of strings), and we need to check both.
1218                    let has_errors_with_content =
1219                        if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
1220                            // Check for 'errors' array with at least one non-empty string
1221                            json.get("errors")
1222                                .and_then(|v| v.as_array())
1223                                .is_some_and(|arr| {
1224                                    arr.iter()
1225                                        .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
1226                                })
1227                        } else {
1228                            false
1229                        };
1230
1231                    if let Ok(ClaudeEvent::Result {
1232                        subtype,
1233                        duration_ms,
1234                        error,
1235                        ..
1236                    }) = serde_json::from_str::<ClaudeEvent>(trimmed)
1237                    {
1238                        let is_error_result = subtype.as_deref() != Some("success");
1239
1240                        // Suppress spurious GLM error events based on these characteristics:
1241                        // 1. Error event (subtype != "success")
1242                        // 2. duration_ms is 0 or very small (< 100ms, indicating synthetic event)
1243                        // 3. error field is null or empty (no actual error message)
1244                        // 4. NO 'errors' field with actual error messages (this indicates a real error)
1245                        //
1246                        // These criteria identify the spurious error_during_execution events
1247                        // that GLM emits when exiting with code 1 despite producing valid output.
1248                        //
1249                        // We DON'T suppress if there's an 'errors' array with content, because
1250                        // that indicates a real error condition that the user should see.
1251                        let is_spurious_glm_error = is_error_result
1252                            && duration_ms.unwrap_or(0) < 100
1253                            && (error.is_none() || error.as_ref().is_some_and(|e| e.is_empty()))
1254                            && !has_errors_with_content;
1255
1256                        if is_spurious_glm_error && seen_success_result {
1257                            // Error after success - suppress (original fix)
1258                            true
1259                        } else if subtype.as_deref() == Some("success") {
1260                            seen_success_result = true;
1261                            false
1262                        } else if is_spurious_glm_error {
1263                            // Spurious error BEFORE success - still suppress based on characteristics
1264                            // This handles the reverse-order case where error arrives first
1265                            true
1266                        } else {
1267                            false
1268                        }
1269                    } else {
1270                        false
1271                    }
1272                } else {
1273                    false
1274                };
1275
1276                // In debug mode, also show the raw JSON
1277                if self.verbosity.is_debug() {
1278                    eprintln!(
1279                        "{}[DEBUG]{} {}{}{}",
1280                        c.dim(),
1281                        c.reset(),
1282                        c.dim(),
1283                        &line,
1284                        c.reset()
1285                    );
1286                }
1287
1288                // Skip suppressed result events but still log them
1289                if should_skip_result {
1290                    if let Some(ref mut file) = log_writer {
1291                        writeln!(file, "{line}")?;
1292                        file.get_mut().sync_all()?;
1293                    }
1294                    monitor.record_control_event();
1295                    continue;
1296                }
1297
1298                // Parse the event once - parse_event handles malformed JSON by returning None
1299                match self.parse_event(&line) {
1300                    Some(output) => {
1301                        // Check if this is a partial/delta event (streaming content)
1302                        if trimmed.starts_with('{') {
1303                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1304                                if Self::is_partial_event(&event) {
1305                                    monitor.record_partial_event();
1306                                } else {
1307                                    monitor.record_parsed();
1308                                }
1309                            } else {
1310                                monitor.record_parsed();
1311                            }
1312                        } else {
1313                            monitor.record_parsed();
1314                        }
1315                        // Write output to printer
1316                        let mut printer = self.printer.borrow_mut();
1317                        write!(printer, "{output}")?;
1318                        printer.flush()?;
1319                    }
1320                    None => {
1321                        // Check if this was a control event (state management with no user output)
1322                        // Control events are valid JSON that return empty output but aren't "ignored"
1323                        if trimmed.starts_with('{') {
1324                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1325                                if Self::is_control_event(&event) {
1326                                    monitor.record_control_event();
1327                                } else {
1328                                    // Valid JSON but not a control event - track as unknown
1329                                    monitor.record_unknown_event();
1330                                }
1331                            } else {
1332                                // Failed to deserialize - track as parse error
1333                                monitor.record_parse_error();
1334                            }
1335                        } else {
1336                            monitor.record_ignored();
1337                        }
1338                    }
1339                }
1340
1341                // Log raw JSON to file if configured
1342                if let Some(ref mut file) = log_writer {
1343                    writeln!(file, "{line}")?;
1344                }
1345            }
1346        }
1347
1348        if let Some(ref mut file) = log_writer {
1349            file.flush()?;
1350            // Ensure data is written to disk before continuing
1351            // This prevents race conditions where extraction runs before OS commits writes
1352            let _ = file.get_mut().sync_all();
1353        }
1354        if let Some(warning) = monitor.check_and_warn(*c) {
1355            let mut printer = self.printer.borrow_mut();
1356            writeln!(printer, "{warning}")?;
1357            printer.flush()?;
1358        }
1359        Ok(())
1360    }
1361}
1362
1363#[cfg(all(test, feature = "test-utils"))]
1364mod tests {
1365    use super::*;
1366    use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1367
1368    #[test]
1369    fn test_printer_method_accessible() {
1370        // Test that the printer() method is accessible and returns a SharedPrinter
1371        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1372        let parser =
1373            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1374
1375        // This test verifies the printer() method is accessible
1376        let _printer_ref = parser.printer();
1377    }
1378
1379    #[test]
1380    fn test_streaming_metrics_method_accessible() {
1381        // Test that the streaming_metrics() method is accessible
1382        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1383        let parser =
1384            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1385
1386        // This test verifies the streaming_metrics() method is accessible
1387        let _metrics = parser.streaming_metrics();
1388    }
1389}