ralph_workflow/json_parser/
claude.rs

1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r          (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r  (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n    (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46#[cfg(feature = "test-utils")]
47use super::health::StreamingQualityMetrics;
48use super::printer::SharedPrinter;
49use super::streaming_state::StreamingSession;
50use super::terminal::TerminalMode;
51use super::types::{
52    format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
53    ContentType, StreamInnerEvent,
54};
55
56/// Claude event parser
57///
58/// Note: This parser is designed for single-threaded use only.
59/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
60/// Do not share this parser across threads.
61pub struct ClaudeParser {
62    colors: Colors,
63    pub(crate) verbosity: Verbosity,
64    log_file: Option<String>,
65    display_name: String,
66    /// Unified streaming session tracker
67    /// Provides single source of truth for streaming state across all content types
68    streaming_session: Rc<RefCell<StreamingSession>>,
69    /// Terminal mode for output formatting
70    /// Detected at parse time and cached for performance
71    terminal_mode: RefCell<TerminalMode>,
72    /// Whether to show streaming quality metrics
73    show_streaming_metrics: bool,
74    /// Output printer for capturing or displaying output
75    printer: SharedPrinter,
76}
77
78impl ClaudeParser {
79    /// Create a new `ClaudeParser` with the given colors and verbosity.
80    ///
81    /// # Arguments
82    ///
83    /// * `colors` - Colors for terminal output
84    /// * `verbosity` - Verbosity level for output
85    ///
86    /// # Returns
87    ///
88    /// A new `ClaudeParser` instance
89    ///
90    /// # Example
91    ///
92    /// ```ignore
93    /// use ralph_workflow::json_parser::ClaudeParser;
94    /// use ralph_workflow::logger::Colors;
95    /// use ralph_workflow::config::Verbosity;
96    ///
97    /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
98    /// ```
99    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
100        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
101    }
102
103    /// Create a new `ClaudeParser` with a custom printer.
104    ///
105    /// # Arguments
106    ///
107    /// * `colors` - Colors for terminal output
108    /// * `verbosity` - Verbosity level for output
109    /// * `printer` - Shared printer for output
110    ///
111    /// # Returns
112    ///
113    /// A new `ClaudeParser` instance
114    pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
115        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
116        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
117
118        // Use the printer's is_terminal method to validate it's connected correctly
119        // This is a sanity check that also satisfies the compiler that the method is used
120        let _printer_is_terminal = printer.borrow().is_terminal();
121
122        Self {
123            colors,
124            verbosity,
125            log_file: None,
126            display_name: "Claude".to_string(),
127            streaming_session: Rc::new(RefCell::new(streaming_session)),
128            terminal_mode: RefCell::new(TerminalMode::detect()),
129            show_streaming_metrics: false,
130            printer,
131        }
132    }
133
134    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
135        self.show_streaming_metrics = show;
136        self
137    }
138
139    /// Set the display name for this parser.
140    ///
141    /// # Arguments
142    ///
143    /// * `display_name` - The name to display in output
144    ///
145    /// # Returns
146    ///
147    /// Self for builder pattern chaining
148    pub fn with_display_name(mut self, display_name: &str) -> Self {
149        self.display_name = display_name.to_string();
150        self
151    }
152
153    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
154        self.log_file = Some(path.to_string());
155        self
156    }
157
158    /// Set the terminal mode for this parser.
159    ///
160    /// # Arguments
161    ///
162    /// * `mode` - The terminal mode to use
163    ///
164    /// # Returns
165    ///
166    /// Self for builder pattern chaining
167    #[cfg(any(test, feature = "test-utils"))]
168    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
169        *self.terminal_mode.borrow_mut() = mode;
170        self
171    }
172
173    /// Get a shared reference to the printer.
174    ///
175    /// This allows tests, monitoring, and other code to access the printer after parsing
176    /// to verify output content, check for duplicates, or capture output for analysis.
177    ///
178    /// # Returns
179    ///
180    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
181    ///
182    /// # Example
183    ///
184    /// ```ignore
185    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
186    /// use std::rc::Rc;
187    /// use std::cell::RefCell;
188    ///
189    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
190    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
191    ///
192    /// // Parse events...
193    ///
194    /// // Now access the printer to verify output
195    /// let printer_ref = parser.printer().borrow();
196    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
197    /// ```
198    /// Get a clone of the printer used by this parser.
199    ///
200    /// This is primarily useful for testing and monitoring.
201    /// Only available with the `test-utils` feature.
202    #[cfg(feature = "test-utils")]
203    pub fn printer(&self) -> SharedPrinter {
204        Rc::clone(&self.printer)
205    }
206
207    /// Get streaming quality metrics from the current session.
208    ///
209    /// This provides insight into the deduplication and streaming quality of the
210    /// parsing session, including:
211    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
212    /// - Number of large deltas (potential protocol violations)
213    /// - Total deltas processed
214    ///
215    /// Useful for testing, monitoring, and debugging streaming behavior.
216    /// Only available with the `test-utils` feature.
217    ///
218    /// # Returns
219    ///
220    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
221    ///
222    /// # Example
223    ///
224    /// ```ignore
225    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
226    /// use std::rc::Rc;
227    /// use std::cell::RefCell;
228    ///
229    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
230    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
231    ///
232    /// // Parse events...
233    ///
234    /// // Verify deduplication logic triggered
235    /// let metrics = parser.streaming_metrics();
236    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
237    /// ```
238    #[cfg(feature = "test-utils")]
239    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
240        self.streaming_session
241            .borrow()
242            .get_streaming_quality_metrics()
243    }
244
245    /// Parse and display a single Claude JSON event
246    ///
247    /// Returns `Some(formatted_output)` for valid events, or None for:
248    /// - Malformed JSON (logged at debug level)
249    /// - Unknown event types
250    /// - Empty or whitespace-only output
251    pub fn parse_event(&self, line: &str) -> Option<String> {
252        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
253            e
254        } else {
255            // Non-JSON line - could be raw text output from agent
256            // Pass through as-is if it looks like real output (not empty)
257            let trimmed = line.trim();
258            if !trimmed.is_empty() && !trimmed.starts_with('{') {
259                return Some(format!("{trimmed}\n"));
260            }
261            return None;
262        };
263        let c = &self.colors;
264        let prefix = &self.display_name;
265
266        let output = match event {
267            ClaudeEvent::System {
268                subtype,
269                session_id,
270                cwd,
271            } => self.format_system_event(subtype.as_ref(), session_id, cwd),
272            ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
273            ClaudeEvent::User { message } => self.format_user_event(message),
274            ClaudeEvent::Result {
275                subtype,
276                duration_ms,
277                total_cost_usd,
278                num_turns,
279                result,
280                error,
281            } => self.format_result_event(
282                subtype,
283                duration_ms,
284                total_cost_usd,
285                num_turns,
286                result,
287                error,
288            ),
289            ClaudeEvent::StreamEvent { event } => {
290                // Handle streaming events for delta/partial updates
291                self.parse_stream_event(event)
292            }
293            ClaudeEvent::Unknown => {
294                // Use the generic unknown event formatter for consistent handling
295                // In verbose mode, this will show the event type and key fields
296                // In normal mode, this returns empty string
297                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
298            }
299        };
300
301        if output.is_empty() {
302            None
303        } else {
304            Some(output)
305        }
306    }
307
308    /// Parse a streaming event for delta/partial updates
309    ///
310    /// Handles the nested events within `stream_event`:
311    /// - MessageStart/Stop: Manage session state
312    /// - `ContentBlockStart`: Initialize new content blocks
313    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
314    /// - `ContentBlockStop`: Finalize content blocks
315    /// - `MessageDelta`: Process message metadata without output
316    /// - Error: Display appropriately
317    ///
318    /// Returns String for display content, empty String for control events.
319    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
320        let mut session = self.streaming_session.borrow_mut();
321
322        match event {
323            StreamInnerEvent::MessageStart {
324                message,
325                message_id,
326            } => {
327                // Extract message_id from either the top-level field or nested message.id
328                // The Claude API typically puts the ID in message.id, not at the top level
329                let effective_message_id =
330                    message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
331                // Set message ID for tracking and clear session state on new message
332                session.set_current_message_id(effective_message_id);
333                session.on_message_start();
334                String::new()
335            }
336            StreamInnerEvent::ContentBlockStart {
337                index: Some(index),
338                content_block: Some(block),
339            } => {
340                // Initialize a new content block at this index
341                session.on_content_block_start(index);
342                match &block {
343                    ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
344                        // Initial text in ContentBlockStart - treat as first delta
345                        session.on_text_delta(index, t);
346                    }
347                    ContentBlock::ToolUse {
348                        name: _,
349                        input: Some(i),
350                    } => {
351                        // Initialize tool input accumulator
352                        let input_str = if let serde_json::Value::String(s) = &i {
353                            s.clone()
354                        } else {
355                            format_tool_input(i)
356                        };
357                        session.on_tool_input_delta(index, &input_str);
358                    }
359                    _ => {}
360                }
361                String::new()
362            }
363            StreamInnerEvent::ContentBlockStart {
364                index: Some(index),
365                content_block: None,
366            } => {
367                // Content block started but no initial content provided
368                session.on_content_block_start(index);
369                String::new()
370            }
371            StreamInnerEvent::ContentBlockStart { .. } => {
372                // Content block without index - ignore
373                String::new()
374            }
375            StreamInnerEvent::ContentBlockDelta {
376                index: Some(index),
377                delta: Some(delta),
378            } => self.handle_content_block_delta(&mut session, index, delta),
379            StreamInnerEvent::TextDelta { text: Some(text) } => {
380                self.handle_text_delta(&mut session, &text)
381            }
382            StreamInnerEvent::ContentBlockStop { .. } => {
383                // Content block completion event - no output needed
384                // This event marks the end of a content block but doesn't produce
385                // any displayable content. It's a control event for state management.
386                String::new()
387            }
388            StreamInnerEvent::MessageDelta { .. } => {
389                // Message delta event with usage/metadata - no output needed
390                // This event contains final message metadata (stop_reason, usage stats)
391                // but is used for tracking/monitoring purposes only, not display.
392                String::new()
393            }
394            StreamInnerEvent::ContentBlockDelta { .. }
395            | StreamInnerEvent::Ping
396            | StreamInnerEvent::TextDelta { text: None }
397            | StreamInnerEvent::Error { error: None } => String::new(),
398            StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
399            StreamInnerEvent::Error {
400                error: Some(err), ..
401            } => self.handle_error_event(err),
402            StreamInnerEvent::Unknown => self.handle_unknown_event(),
403        }
404    }
405
406    /// Format a system event
407    fn format_system_event(
408        &self,
409        subtype: Option<&String>,
410        session_id: Option<String>,
411        cwd: Option<String>,
412    ) -> String {
413        let c = &self.colors;
414        let prefix = &self.display_name;
415
416        if subtype.map(std::string::String::as_str) == Some("init") {
417            let sid = session_id.unwrap_or_else(|| "unknown".to_string());
418            let mut out = format!(
419                "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
420                c.dim(),
421                prefix,
422                c.reset(),
423                c.cyan(),
424                c.reset(),
425                c.dim(),
426                sid,
427                c.reset()
428            );
429            if let Some(cwd) = cwd {
430                let _ = writeln!(
431                    out,
432                    "{}[{}]{} {}Working dir: {}{}",
433                    c.dim(),
434                    prefix,
435                    c.reset(),
436                    c.dim(),
437                    cwd,
438                    c.reset()
439                );
440            }
441            out
442        } else {
443            format!(
444                "{}[{}]{} {}{}{}\n",
445                c.dim(),
446                prefix,
447                c.reset(),
448                c.cyan(),
449                subtype.map_or("system", |s| s.as_str()),
450                c.reset()
451            )
452        }
453    }
454
455    /// Extract text content from a message for hash-based deduplication.
456    fn extract_text_content_for_hash(
457        message: Option<&crate::json_parser::types::AssistantMessage>,
458    ) -> Option<String> {
459        message?.content.as_ref().map(|content| {
460            content
461                .iter()
462                .filter_map(|block| {
463                    if let ContentBlock::Text { text } = block {
464                        text.as_deref()
465                    } else {
466                        None
467                    }
468                })
469                .collect::<Vec<_>>()
470                .join("")
471        })
472    }
473
474    /// Check if this assistant message is a duplicate of already-streamed content.
475    fn is_duplicate_assistant_message(
476        &self,
477        message: Option<&crate::json_parser::types::AssistantMessage>,
478    ) -> bool {
479        let session = self.streaming_session.borrow();
480
481        // Extract message_id from the assistant message
482        let assistant_msg_id = message.and_then(|m| m.id.as_ref());
483
484        // Check if this assistant event has a message_id that matches the current streaming message
485        // If it does, and we have streamed content, then this assistant event is a duplicate
486        // because the content was already streamed via deltas.
487        if let Some(ast_msg_id) = assistant_msg_id {
488            // Check if message was already marked as displayed (after message_stop)
489            if session.is_duplicate_final_message(ast_msg_id) {
490                return true;
491            }
492
493            // Check if the assistant message_id matches the current streaming message_id
494            if session.get_current_message_id() == Some(ast_msg_id) {
495                // Same message - check if we have streamed any content
496                // If yes, the assistant event is a duplicate
497                if session.has_any_streamed_content() {
498                    return true;
499                }
500            }
501        }
502
503        // If no message_id match, fall back to hash-based deduplication
504        let text_content_for_hash = Self::extract_text_content_for_hash(message);
505        if let Some(ref text_content) = text_content_for_hash {
506            if !text_content.is_empty() {
507                return session.is_duplicate_by_hash(text_content);
508            }
509        }
510
511        // Fallback to coarse check
512        session.has_any_streamed_content()
513    }
514
515    /// Format a text content block for assistant output.
516    fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
517        let limit = self.verbosity.truncate_limit("text");
518        let preview = truncate_text(text, limit);
519        let _ = writeln!(
520            out,
521            "{}[{}]{} {}{}{}",
522            colors.dim(),
523            prefix,
524            colors.reset(),
525            colors.white(),
526            preview,
527            colors.reset()
528        );
529    }
530
531    /// Format a tool use content block for assistant output.
532    fn format_tool_use_block(
533        &self,
534        out: &mut String,
535        tool: Option<&String>,
536        input: Option<&serde_json::Value>,
537        prefix: &str,
538        colors: Colors,
539    ) {
540        let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
541        let _ = writeln!(
542            out,
543            "{}[{}]{} {}Tool{}: {}{}{}",
544            colors.dim(),
545            prefix,
546            colors.reset(),
547            colors.magenta(),
548            colors.reset(),
549            colors.bold(),
550            tool_name,
551            colors.reset(),
552        );
553
554        // Show tool input details at Normal and above (not just Verbose)
555        // Tool inputs provide crucial context for understanding agent actions
556        if self.verbosity.show_tool_input() {
557            if let Some(input_val) = input {
558                let input_str = format_tool_input(input_val);
559                let limit = self.verbosity.truncate_limit("tool_input");
560                let preview = truncate_text(&input_str, limit);
561                if !preview.is_empty() {
562                    let _ = writeln!(
563                        out,
564                        "{}[{}]{} {}  └─ {}{}",
565                        colors.dim(),
566                        prefix,
567                        colors.reset(),
568                        colors.dim(),
569                        preview,
570                        colors.reset()
571                    );
572                }
573            }
574        }
575    }
576
577    /// Format a tool result content block for assistant output.
578    fn format_tool_result_block(
579        &self,
580        out: &mut String,
581        content: &serde_json::Value,
582        prefix: &str,
583        colors: Colors,
584    ) {
585        let content_str = match content {
586            serde_json::Value::String(s) => s.clone(),
587            other => other.to_string(),
588        };
589        let limit = self.verbosity.truncate_limit("tool_result");
590        let preview = truncate_text(&content_str, limit);
591        let _ = writeln!(
592            out,
593            "{}[{}]{} {}Result:{} {}",
594            colors.dim(),
595            prefix,
596            colors.reset(),
597            colors.dim(),
598            colors.reset(),
599            preview
600        );
601    }
602
603    /// Format all content blocks from an assistant message.
604    fn format_content_blocks(
605        &self,
606        out: &mut String,
607        content: &[ContentBlock],
608        prefix: &str,
609        colors: Colors,
610    ) {
611        for block in content {
612            match block {
613                ContentBlock::Text { text } => {
614                    if let Some(text) = text {
615                        self.format_text_block(out, text, prefix, colors);
616                    }
617                }
618                ContentBlock::ToolUse { name, input } => {
619                    self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
620                }
621                ContentBlock::ToolResult { content } => {
622                    if let Some(content) = content {
623                        self.format_tool_result_block(out, content, prefix, colors);
624                    }
625                }
626                ContentBlock::Unknown => {}
627            }
628        }
629    }
630
631    /// Format an assistant event
632    fn format_assistant_event(
633        &self,
634        message: Option<crate::json_parser::types::AssistantMessage>,
635    ) -> String {
636        // CRITICAL FIX: When ANY content has been streamed via deltas,
637        // the Assistant event should NOT display it again.
638        // The Assistant event represents the "complete" message, but if we've
639        // already shown the streaming deltas, showing it again causes duplication.
640        if self.is_duplicate_assistant_message(message.as_ref()) {
641            return String::new();
642        }
643
644        let mut out = String::new();
645        if let Some(ref msg) = message {
646            if let Some(ref content) = msg.content {
647                self.format_content_blocks(&mut out, content, &self.display_name, self.colors);
648
649                // If we successfully rendered content, mark the message as pre-rendered
650                // so that ALL subsequent streaming deltas for this message are suppressed.
651                // This handles the case where assistant event arrives BEFORE streaming starts.
652                if !out.is_empty() {
653                    if let Some(ref message_id) = msg.id {
654                        let mut session = self.streaming_session.borrow_mut();
655                        session.mark_message_pre_rendered(message_id);
656                    }
657                }
658            }
659        }
660        out
661    }
662
663    /// Format a user event
664    fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
665        let c = &self.colors;
666        let prefix = &self.display_name;
667
668        if let Some(msg) = message {
669            if let Some(content) = msg.content {
670                if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
671                    let limit = self.verbosity.truncate_limit("user");
672                    let preview = truncate_text(text, limit);
673                    return format!(
674                        "{}[{}]{} {}User{}: {}{}{}\n",
675                        c.dim(),
676                        prefix,
677                        c.reset(),
678                        c.blue(),
679                        c.reset(),
680                        c.dim(),
681                        preview,
682                        c.reset()
683                    );
684                }
685            }
686        }
687        String::new()
688    }
689
690    /// Format a result event
691    fn format_result_event(
692        &self,
693        subtype: Option<String>,
694        duration_ms: Option<u64>,
695        total_cost_usd: Option<f64>,
696        num_turns: Option<u32>,
697        result: Option<String>,
698        error: Option<String>,
699    ) -> String {
700        let c = &self.colors;
701        let prefix = &self.display_name;
702
703        let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
704        let duration_m = duration_total_secs / 60;
705        let duration_s_rem = duration_total_secs % 60;
706        let cost = total_cost_usd.unwrap_or(0.0);
707        let turns = num_turns.unwrap_or(0);
708
709        let mut out = if subtype.as_deref() == Some("success") {
710            format!(
711                "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
712                c.dim(),
713                prefix,
714                c.reset(),
715                c.green(),
716                CHECK,
717                c.reset(),
718                c.dim(),
719                duration_m,
720                duration_s_rem,
721                turns,
722                cost,
723                c.reset()
724            )
725        } else {
726            let err = error.unwrap_or_else(|| "unknown error".to_string());
727            format!(
728                "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
729                c.dim(),
730                prefix,
731                c.reset(),
732                c.red(),
733                CROSS,
734                subtype.unwrap_or_else(|| "error".to_string()),
735                c.reset(),
736                err,
737                c.dim(),
738                duration_m,
739                duration_s_rem,
740                c.reset()
741            )
742        };
743
744        if let Some(result) = result {
745            let limit = self.verbosity.truncate_limit("result");
746            let preview = truncate_text(&result, limit);
747            let _ = writeln!(
748                out,
749                "\n{}Result summary:{}\n{}{}{}",
750                c.bold(),
751                c.reset(),
752                c.dim(),
753                preview,
754                c.reset()
755            );
756        }
757        out
758    }
759
760    /// Handle content block delta events
761    fn handle_content_block_delta(
762        &self,
763        session: &mut std::cell::RefMut<'_, StreamingSession>,
764        index: u64,
765        delta: ContentBlockDelta,
766    ) -> String {
767        let c = &self.colors;
768        let prefix = &self.display_name;
769
770        match delta {
771            ContentBlockDelta::TextDelta { text: Some(text) } => {
772                let index_str = index.to_string();
773
774                // Track this delta with StreamingSession for state management.
775                //
776                // StreamingSession handles protocol/streaming quality concerns (including
777                // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
778                // whether a prefix should be displayed for this stream.
779                //
780                // The parser layer still applies additional deduplication:
781                // - Skip whitespace-only accumulated output
782                // - Hash-based deduplication after sanitization (whitespace-insensitive)
783                let show_prefix = session.on_text_delta(index, &text);
784
785                // Get accumulated text for streaming display
786                let accumulated_text = session
787                    .get_accumulated(ContentType::Text, &index_str)
788                    .unwrap_or("");
789
790                // Check if this message was pre-rendered from an assistant event.
791                // When an assistant event arrives BEFORE streaming deltas, we render it
792                // and mark the message_id as pre-rendered. ALL subsequent streaming deltas
793                // for this message should be suppressed to prevent duplication.
794                if let Some(message_id) = session.get_current_message_id() {
795                    if session.is_message_pre_rendered(message_id) {
796                        return String::new();
797                    }
798                }
799
800                // Sanitize the accumulated text to check if it's empty
801                // This is needed to skip rendering when the accumulated content is just whitespace
802                let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
803
804                // Skip rendering if the sanitized text is empty (e.g., only whitespace)
805                // This prevents rendering empty lines when the accumulated content is just whitespace
806                if sanitized_text.is_empty() {
807                    return String::new();
808                }
809
810                // Check if this sanitized content has already been rendered
811                // This prevents duplicates when accumulated content differs only by whitespace
812                if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
813                {
814                    return String::new();
815                }
816
817                // Use TextDeltaRenderer for consistent rendering
818                let terminal_mode = *self.terminal_mode.borrow();
819
820                // Use prefix trie to detect if new content extends previously rendered content
821                // If yes, we do an in-place update (carriage return + new content)
822                let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
823
824                let output = if show_prefix && !has_prefix {
825                    // First delta with no prefix match - use the renderer with prefix
826                    TextDeltaRenderer::render_first_delta(
827                        accumulated_text,
828                        prefix,
829                        *c,
830                        terminal_mode,
831                    )
832                } else {
833                    // Either continuation OR prefix match - use renderer for in-place update
834                    // This handles the case where "Hello" becomes "Hello World" - we REPLACE
835                    TextDeltaRenderer::render_subsequent_delta(
836                        accumulated_text,
837                        prefix,
838                        *c,
839                        terminal_mode,
840                    )
841                };
842
843                // Mark this sanitized content as rendered for future duplicate detection
844                // We use the sanitized text (not the rendered output) to avoid false positives
845                // when the same accumulated text is rendered with different terminal modes
846                session.mark_rendered(ContentType::Text, &index_str);
847                session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
848
849                output
850            }
851            ContentBlockDelta::ThinkingDelta {
852                thinking: Some(text),
853            } => {
854                // Track thinking deltas
855                session.on_thinking_delta(index, &text);
856                // Display thinking with visual distinction
857                Self::formatter().format_thinking(text.as_str(), prefix, *c)
858            }
859            ContentBlockDelta::ToolUseDelta {
860                tool_use: Some(tool_delta),
861            } => {
862                // Handle tool input streaming
863                // Extract the tool input from the delta
864                let input_str =
865                    tool_delta
866                        .get("input")
867                        .map_or_else(String::new, |input| match input {
868                            serde_json::Value::String(s) => s.clone(),
869                            other => format_tool_input(other),
870                        });
871
872                if input_str.is_empty() {
873                    String::new()
874                } else {
875                    // Accumulate tool input
876                    session.on_tool_input_delta(index, &input_str);
877
878                    // Show partial tool input in real-time
879                    let formatter = DeltaDisplayFormatter::new();
880                    formatter.format_tool_input(&input_str, prefix, *c)
881                }
882            }
883            _ => String::new(),
884        }
885    }
886
887    /// Handle text delta events
888    fn handle_text_delta(
889        &self,
890        session: &mut std::cell::RefMut<'_, StreamingSession>,
891        text: &str,
892    ) -> String {
893        let c = &self.colors;
894        let prefix = &self.display_name;
895
896        // Standalone text delta (not part of content block)
897        // Use default index "0" for standalone text
898        let default_index = 0u64;
899        let default_index_str = "0";
900
901        // Track this delta with StreamingSession for state management.
902        //
903        // StreamingSession handles protocol/streaming quality concerns (including
904        // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
905        // whether a prefix should be displayed for this stream.
906        //
907        // The parser layer still applies additional deduplication:
908        // - Skip whitespace-only accumulated output
909        // - Hash-based deduplication after sanitization (whitespace-insensitive)
910        let show_prefix = session.on_text_delta(default_index, text);
911
912        // Get accumulated text for streaming display
913        let accumulated_text = session
914            .get_accumulated(ContentType::Text, default_index_str)
915            .unwrap_or("");
916
917        // Sanitize the accumulated text to check if it's empty
918        // This is needed to skip rendering when the accumulated content is just whitespace
919        let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
920
921        // Skip rendering if the sanitized text is empty (e.g., only whitespace)
922        // This prevents rendering empty lines when the accumulated content is just whitespace
923        if sanitized_text.is_empty() {
924            return String::new();
925        }
926
927        // Check if this sanitized content has already been rendered
928        // This prevents duplicates when accumulated content differs only by whitespace
929        if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
930            return String::new();
931        }
932
933        // Use TextDeltaRenderer for consistent rendering across all parsers
934        let terminal_mode = *self.terminal_mode.borrow();
935
936        // Use prefix trie to detect if new content extends previously rendered content
937        // If yes, we do an in-place update (carriage return + new content)
938        let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
939
940        let output = if show_prefix && !has_prefix {
941            // First delta with no prefix match - use the renderer with prefix
942            TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
943        } else {
944            // Either continuation OR prefix match - use renderer for in-place update
945            // This handles the case where "Hello" becomes "Hello World" - we REPLACE
946            TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
947        };
948
949        // Mark this sanitized content as rendered for future duplicate detection
950        // We use the sanitized text (not the rendered output) to avoid false positives
951        // when the same accumulated text is rendered with different terminal modes
952        session.mark_rendered(ContentType::Text, default_index_str);
953        session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
954
955        output
956    }
957
958    /// Handle message stop events
959    fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
960        let c = &self.colors;
961
962        // Message complete - add final newline if we were in a content block
963        // OR if any content was streamed (handles edge cases where block state
964        // may not have been set but content was still streamed)
965        let metrics = session.get_streaming_quality_metrics();
966        let was_in_block = session.on_message_stop();
967        let had_content = session.has_any_streamed_content();
968        if was_in_block || had_content {
969            // Use TextDeltaRenderer for completion - adds final newline
970            let terminal_mode = *self.terminal_mode.borrow();
971            let completion = format!(
972                "{}{}",
973                c.reset(),
974                TextDeltaRenderer::render_completion(terminal_mode)
975            );
976            // Show streaming quality metrics in debug mode or when flag is set
977            let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
978                && metrics.total_deltas > 0;
979            if show_metrics {
980                format!("{}\n{}", completion, metrics.format(*c))
981            } else {
982                completion
983            }
984        } else {
985            String::new()
986        }
987    }
988
989    /// Handle error events
990    fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
991        let c = &self.colors;
992        let prefix = &self.display_name;
993
994        let msg = err
995            .message
996            .unwrap_or_else(|| "Unknown streaming error".to_string());
997        format!(
998            "{}[{}]{} {}Error: {}{}\n",
999            c.dim(),
1000            prefix,
1001            c.reset(),
1002            c.red(),
1003            msg,
1004            c.reset()
1005        )
1006    }
1007
1008    /// Handle unknown events
1009    fn handle_unknown_event(&self) -> String {
1010        let c = &self.colors;
1011        let prefix = &self.display_name;
1012
1013        // Unknown stream event - in debug mode, log it
1014        if self.verbosity.is_debug() {
1015            format!(
1016                "{}[{}]{} {}Unknown streaming event{}\n",
1017                c.dim(),
1018                prefix,
1019                c.reset(),
1020                c.dim(),
1021                c.reset()
1022            )
1023        } else {
1024            String::new()
1025        }
1026    }
1027
1028    /// Check if a Claude event is a control event (state management with no user output)
1029    ///
1030    /// Control events are valid JSON that represent state transitions rather than
1031    /// user-facing content. They should be tracked separately from "ignored" events
1032    /// to avoid false health warnings.
1033    const fn is_control_event(event: &ClaudeEvent) -> bool {
1034        match event {
1035            // Stream events that are control events
1036            ClaudeEvent::StreamEvent { event } => matches!(
1037                event,
1038                StreamInnerEvent::MessageStart { .. }
1039                    | StreamInnerEvent::ContentBlockStart { .. }
1040                    | StreamInnerEvent::ContentBlockStop { .. }
1041                    | StreamInnerEvent::MessageDelta { .. }
1042                    | StreamInnerEvent::MessageStop
1043                    | StreamInnerEvent::Ping
1044            ),
1045            _ => false,
1046        }
1047    }
1048
1049    /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1050    ///
1051    /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1052    /// tool input deltas) that are shown to the user in real-time. These should be
1053    /// tracked separately to avoid inflating "ignored" percentages.
1054    const fn is_partial_event(event: &ClaudeEvent) -> bool {
1055        match event {
1056            // Stream events that produce incremental content
1057            ClaudeEvent::StreamEvent { event } => matches!(
1058                event,
1059                StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1060            ),
1061            _ => false,
1062        }
1063    }
1064
1065    /// Get a shared delta display formatter
1066    const fn formatter() -> DeltaDisplayFormatter {
1067        DeltaDisplayFormatter::new()
1068    }
1069
1070    /// Parse a stream of Claude NDJSON events
1071    pub fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
1072        use super::incremental_parser::IncrementalNdjsonParser;
1073
1074        let c = &self.colors;
1075        let monitor = HealthMonitor::new("Claude");
1076        let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
1077            std::fs::OpenOptions::new()
1078                .create(true)
1079                .append(true)
1080                .open(log_path)
1081                .ok()
1082                .map(std::io::BufWriter::new)
1083        });
1084
1085        // Use incremental parser for true real-time streaming
1086        // This processes JSON as soon as it's complete, not waiting for newlines
1087        let mut incremental_parser = IncrementalNdjsonParser::new();
1088        let mut byte_buffer = Vec::new();
1089
1090        // Track whether we've seen a success result event for GLM/ccs-glm compatibility
1091        // Some agents (GLM via CCS) emit both a success result and an error_during_execution
1092        // result when they exit with code 1 despite producing valid output. We suppress
1093        // the spurious error event to avoid confusing duplicate output.
1094        let mut seen_success_result = false;
1095
1096        loop {
1097            // Read available bytes
1098            byte_buffer.clear();
1099            let chunk = reader.fill_buf()?;
1100            if chunk.is_empty() {
1101                break;
1102            }
1103
1104            // Process all bytes immediately
1105            byte_buffer.extend_from_slice(chunk);
1106            let consumed = chunk.len();
1107            reader.consume(consumed);
1108
1109            // Feed bytes to incremental parser
1110            let json_events = incremental_parser.feed(&byte_buffer);
1111
1112            // Process each complete JSON event immediately
1113            for line in json_events {
1114                let trimmed = line.trim();
1115                if trimmed.is_empty() {
1116                    continue;
1117                }
1118
1119                // Check for Result events to handle GLM/ccs-glm duplicate event bug
1120                // Some agents emit both success and error_during_execution results
1121                let should_skip_result = if trimmed.starts_with('{') {
1122                    // First, check if the JSON has an 'errors' field with actual error messages.
1123                    // This is important because Claude events can have either 'error' (string)
1124                    // or 'errors' (array of strings), and we need to check both.
1125                    let has_errors_with_content =
1126                        if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
1127                            // Check for 'errors' array with at least one non-empty string
1128                            json.get("errors")
1129                                .and_then(|v| v.as_array())
1130                                .is_some_and(|arr| {
1131                                    arr.iter()
1132                                        .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
1133                                })
1134                        } else {
1135                            false
1136                        };
1137
1138                    if let Ok(ClaudeEvent::Result {
1139                        subtype,
1140                        duration_ms,
1141                        error,
1142                        ..
1143                    }) = serde_json::from_str::<ClaudeEvent>(trimmed)
1144                    {
1145                        let is_error_result = subtype.as_deref() != Some("success");
1146
1147                        // Suppress spurious GLM error events based on these characteristics:
1148                        // 1. Error event (subtype != "success")
1149                        // 2. duration_ms is 0 or very small (< 100ms, indicating synthetic event)
1150                        // 3. error field is null or empty (no actual error message)
1151                        // 4. NO 'errors' field with actual error messages (this indicates a real error)
1152                        //
1153                        // These criteria identify the spurious error_during_execution events
1154                        // that GLM emits when exiting with code 1 despite producing valid output.
1155                        //
1156                        // We DON'T suppress if there's an 'errors' array with content, because
1157                        // that indicates a real error condition that the user should see.
1158                        let is_spurious_glm_error = is_error_result
1159                            && duration_ms.unwrap_or(0) < 100
1160                            && (error.is_none() || error.as_ref().is_some_and(|e| e.is_empty()))
1161                            && !has_errors_with_content;
1162
1163                        if is_spurious_glm_error && seen_success_result {
1164                            // Error after success - suppress (original fix)
1165                            true
1166                        } else if subtype.as_deref() == Some("success") {
1167                            seen_success_result = true;
1168                            false
1169                        } else if is_spurious_glm_error {
1170                            // Spurious error BEFORE success - still suppress based on characteristics
1171                            // This handles the reverse-order case where error arrives first
1172                            true
1173                        } else {
1174                            false
1175                        }
1176                    } else {
1177                        false
1178                    }
1179                } else {
1180                    false
1181                };
1182
1183                // In debug mode, also show the raw JSON
1184                if self.verbosity.is_debug() {
1185                    eprintln!(
1186                        "{}[DEBUG]{} {}{}{}",
1187                        c.dim(),
1188                        c.reset(),
1189                        c.dim(),
1190                        &line,
1191                        c.reset()
1192                    );
1193                }
1194
1195                // Skip suppressed result events but still log them
1196                if should_skip_result {
1197                    if let Some(ref mut file) = log_writer {
1198                        writeln!(file, "{line}")?;
1199                        file.get_mut().sync_all()?;
1200                    }
1201                    monitor.record_control_event();
1202                    continue;
1203                }
1204
1205                // Parse the event once - parse_event handles malformed JSON by returning None
1206                match self.parse_event(&line) {
1207                    Some(output) => {
1208                        // Check if this is a partial/delta event (streaming content)
1209                        if trimmed.starts_with('{') {
1210                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1211                                if Self::is_partial_event(&event) {
1212                                    monitor.record_partial_event();
1213                                } else {
1214                                    monitor.record_parsed();
1215                                }
1216                            } else {
1217                                monitor.record_parsed();
1218                            }
1219                        } else {
1220                            monitor.record_parsed();
1221                        }
1222                        // Write output to printer
1223                        let mut printer = self.printer.borrow_mut();
1224                        write!(printer, "{output}")?;
1225                        printer.flush()?;
1226                    }
1227                    None => {
1228                        // Check if this was a control event (state management with no user output)
1229                        // Control events are valid JSON that return empty output but aren't "ignored"
1230                        if trimmed.starts_with('{') {
1231                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1232                                if Self::is_control_event(&event) {
1233                                    monitor.record_control_event();
1234                                } else {
1235                                    // Valid JSON but not a control event - track as unknown
1236                                    monitor.record_unknown_event();
1237                                }
1238                            } else {
1239                                // Failed to deserialize - track as parse error
1240                                monitor.record_parse_error();
1241                            }
1242                        } else {
1243                            monitor.record_ignored();
1244                        }
1245                    }
1246                }
1247
1248                // Log raw JSON to file if configured
1249                if let Some(ref mut file) = log_writer {
1250                    writeln!(file, "{line}")?;
1251                }
1252            }
1253        }
1254
1255        if let Some(ref mut file) = log_writer {
1256            file.flush()?;
1257            // Ensure data is written to disk before continuing
1258            // This prevents race conditions where extraction runs before OS commits writes
1259            let _ = file.get_mut().sync_all();
1260        }
1261        if let Some(warning) = monitor.check_and_warn(*c) {
1262            let mut printer = self.printer.borrow_mut();
1263            writeln!(printer, "{warning}")?;
1264            printer.flush()?;
1265        }
1266        Ok(())
1267    }
1268}
1269
1270#[cfg(all(test, feature = "test-utils"))]
1271mod tests {
1272    use super::*;
1273    use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1274
1275    #[test]
1276    fn test_printer_method_accessible() {
1277        // Test that the printer() method is accessible and returns a SharedPrinter
1278        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1279        let parser =
1280            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1281
1282        // This test verifies the printer() method is accessible
1283        let _printer_ref = parser.printer();
1284    }
1285
1286    #[test]
1287    fn test_streaming_metrics_method_accessible() {
1288        // Test that the streaming_metrics() method is accessible
1289        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1290        let parser =
1291            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1292
1293        // This test verifies the streaming_metrics() method is accessible
1294        let _metrics = parser.streaming_metrics();
1295    }
1296}