Skip to main content

ralph_workflow/json_parser/
claude.rs

1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//!    creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r          (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r  (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n    (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46#[cfg(feature = "test-utils")]
47use super::health::StreamingQualityMetrics;
48use super::printer::SharedPrinter;
49use super::streaming_state::StreamingSession;
50use super::terminal::TerminalMode;
51use super::types::{
52    format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
53    ContentType, StreamInnerEvent,
54};
55
56/// Claude event parser
57///
58/// Note: This parser is designed for single-threaded use only.
59/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
60/// Do not share this parser across threads.
61pub struct ClaudeParser {
62    colors: Colors,
63    pub(crate) verbosity: Verbosity,
64    /// Relative path to log file (if logging enabled)
65    log_path: Option<std::path::PathBuf>,
66    display_name: String,
67    /// Unified streaming session tracker
68    /// Provides single source of truth for streaming state across all content types
69    streaming_session: Rc<RefCell<StreamingSession>>,
70    /// Terminal mode for output formatting
71    /// Detected at parse time and cached for performance
72    terminal_mode: RefCell<TerminalMode>,
73    /// Whether to show streaming quality metrics
74    show_streaming_metrics: bool,
75    /// Output printer for capturing or displaying output
76    printer: SharedPrinter,
77}
78
79impl ClaudeParser {
80    /// Create a new `ClaudeParser` with the given colors and verbosity.
81    ///
82    /// # Arguments
83    ///
84    /// * `colors` - Colors for terminal output
85    /// * `verbosity` - Verbosity level for output
86    ///
87    /// # Returns
88    ///
89    /// A new `ClaudeParser` instance
90    ///
91    /// # Example
92    ///
93    /// ```ignore
94    /// use ralph_workflow::json_parser::ClaudeParser;
95    /// use ralph_workflow::logger::Colors;
96    /// use ralph_workflow::config::Verbosity;
97    ///
98    /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
99    /// ```
100    pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
101        Self::with_printer(colors, verbosity, super::printer::shared_stdout())
102    }
103
104    /// Create a new `ClaudeParser` with a custom printer.
105    ///
106    /// # Arguments
107    ///
108    /// * `colors` - Colors for terminal output
109    /// * `verbosity` - Verbosity level for output
110    /// * `printer` - Shared printer for output
111    ///
112    /// # Returns
113    ///
114    /// A new `ClaudeParser` instance
115    pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
116        let verbose_warnings = matches!(verbosity, Verbosity::Debug);
117        let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
118
119        // Use the printer's is_terminal method to validate it's connected correctly
120        // This is a sanity check that also satisfies the compiler that the method is used
121        let _printer_is_terminal = printer.borrow().is_terminal();
122
123        Self {
124            colors,
125            verbosity,
126            log_path: None,
127            display_name: "Claude".to_string(),
128            streaming_session: Rc::new(RefCell::new(streaming_session)),
129            terminal_mode: RefCell::new(TerminalMode::detect()),
130            show_streaming_metrics: false,
131            printer,
132        }
133    }
134
135    pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
136        self.show_streaming_metrics = show;
137        self
138    }
139
140    /// Set the display name for this parser.
141    ///
142    /// # Arguments
143    ///
144    /// * `display_name` - The name to display in output
145    ///
146    /// # Returns
147    ///
148    /// Self for builder pattern chaining
149    pub fn with_display_name(mut self, display_name: &str) -> Self {
150        self.display_name = display_name.to_string();
151        self
152    }
153
154    pub(crate) fn with_log_file(mut self, path: &str) -> Self {
155        self.log_path = Some(std::path::PathBuf::from(path));
156        self
157    }
158
159    /// Set the terminal mode for this parser.
160    ///
161    /// # Arguments
162    ///
163    /// * `mode` - The terminal mode to use
164    ///
165    /// # Returns
166    ///
167    /// Self for builder pattern chaining
168    #[cfg(any(test, feature = "test-utils"))]
169    pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
170        *self.terminal_mode.borrow_mut() = mode;
171        self
172    }
173
174    /// Get a shared reference to the printer.
175    ///
176    /// This allows tests, monitoring, and other code to access the printer after parsing
177    /// to verify output content, check for duplicates, or capture output for analysis.
178    ///
179    /// # Returns
180    ///
181    /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
182    ///
183    /// # Example
184    ///
185    /// ```ignore
186    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
187    /// use std::rc::Rc;
188    /// use std::cell::RefCell;
189    ///
190    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
191    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
192    ///
193    /// // Parse events...
194    ///
195    /// // Now access the printer to verify output
196    /// let printer_ref = parser.printer().borrow();
197    /// assert!(!printer_ref.has_duplicate_consecutive_lines());
198    /// ```
199    /// Get a clone of the printer used by this parser.
200    ///
201    /// This is primarily useful for testing and monitoring.
202    /// Only available with the `test-utils` feature.
203    #[cfg(feature = "test-utils")]
204    pub fn printer(&self) -> SharedPrinter {
205        Rc::clone(&self.printer)
206    }
207
208    /// Get streaming quality metrics from the current session.
209    ///
210    /// This provides insight into the deduplication and streaming quality of the
211    /// parsing session, including:
212    /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
213    /// - Number of large deltas (potential protocol violations)
214    /// - Total deltas processed
215    ///
216    /// Useful for testing, monitoring, and debugging streaming behavior.
217    /// Only available with the `test-utils` feature.
218    ///
219    /// # Returns
220    ///
221    /// A copy of the streaming quality metrics from the internal `StreamingSession`.
222    ///
223    /// # Example
224    ///
225    /// ```ignore
226    /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
227    /// use std::rc::Rc;
228    /// use std::cell::RefCell;
229    ///
230    /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
231    /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
232    ///
233    /// // Parse events...
234    ///
235    /// // Verify deduplication logic triggered
236    /// let metrics = parser.streaming_metrics();
237    /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
238    /// ```
239    #[cfg(feature = "test-utils")]
240    pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
241        self.streaming_session
242            .borrow()
243            .get_streaming_quality_metrics()
244    }
245
246    /// Parse and display a single Claude JSON event
247    ///
248    /// Returns `Some(formatted_output)` for valid events, or None for:
249    /// - Malformed JSON (logged at debug level)
250    /// - Unknown event types
251    /// - Empty or whitespace-only output
252    pub fn parse_event(&self, line: &str) -> Option<String> {
253        let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
254            e
255        } else {
256            // Non-JSON line - could be raw text output from agent
257            // Pass through as-is if it looks like real output (not empty)
258            let trimmed = line.trim();
259            if !trimmed.is_empty() && !trimmed.starts_with('{') {
260                return Some(format!("{trimmed}\n"));
261            }
262            return None;
263        };
264        let c = &self.colors;
265        let prefix = &self.display_name;
266
267        let output = match event {
268            ClaudeEvent::System {
269                subtype,
270                session_id,
271                cwd,
272            } => self.format_system_event(subtype.as_ref(), session_id, cwd),
273            ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
274            ClaudeEvent::User { message } => self.format_user_event(message),
275            ClaudeEvent::Result {
276                subtype,
277                duration_ms,
278                total_cost_usd,
279                num_turns,
280                result,
281                error,
282            } => self.format_result_event(
283                subtype,
284                duration_ms,
285                total_cost_usd,
286                num_turns,
287                result,
288                error,
289            ),
290            ClaudeEvent::StreamEvent { event } => {
291                // Handle streaming events for delta/partial updates
292                self.parse_stream_event(event)
293            }
294            ClaudeEvent::Unknown => {
295                // Use the generic unknown event formatter for consistent handling
296                // In verbose mode, this will show the event type and key fields
297                // In normal mode, this returns empty string
298                format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
299            }
300        };
301
302        if output.is_empty() {
303            None
304        } else {
305            Some(output)
306        }
307    }
308
309    /// Parse a streaming event for delta/partial updates
310    ///
311    /// Handles the nested events within `stream_event`:
312    /// - MessageStart/Stop: Manage session state
313    /// - `ContentBlockStart`: Initialize new content blocks
314    /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
315    /// - `ContentBlockStop`: Finalize content blocks
316    /// - `MessageDelta`: Process message metadata without output
317    /// - Error: Display appropriately
318    ///
319    /// Returns String for display content, empty String for control events.
320    fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
321        let mut session = self.streaming_session.borrow_mut();
322
323        match event {
324            StreamInnerEvent::MessageStart {
325                message,
326                message_id,
327            } => {
328                // Extract message_id from either the top-level field or nested message.id
329                // The Claude API typically puts the ID in message.id, not at the top level
330                let effective_message_id =
331                    message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
332                // Set message ID for tracking and clear session state on new message
333                session.set_current_message_id(effective_message_id);
334                session.on_message_start();
335                String::new()
336            }
337            StreamInnerEvent::ContentBlockStart {
338                index: Some(index),
339                content_block: Some(block),
340            } => {
341                // Initialize a new content block at this index
342                session.on_content_block_start(index);
343                match &block {
344                    ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
345                        // Initial text in ContentBlockStart - treat as first delta
346                        session.on_text_delta(index, t);
347                    }
348                    ContentBlock::ToolUse { name, input } => {
349                        // Track tool name for GLM/CCS deduplication.
350                        // IMPORTANT: Track the tool name when provided, even when input is None.
351                        // GLM may send ContentBlockStart with name but no input, then send input via delta.
352                        // We only store when we have a name to avoid overwriting a previous tool name with None.
353                        if let Some(n) = name {
354                            session.set_tool_name(index, Some(n.clone()));
355                        }
356
357                        // Initialize tool input accumulator only if input is present
358                        if let Some(i) = input {
359                            let input_str = if let serde_json::Value::String(s) = &i {
360                                s.clone()
361                            } else {
362                                format_tool_input(i)
363                            };
364                            session.on_tool_input_delta(index, &input_str);
365                        }
366                    }
367                    _ => {}
368                }
369                String::new()
370            }
371            StreamInnerEvent::ContentBlockStart {
372                index: Some(index),
373                content_block: None,
374            } => {
375                // Content block started but no initial content provided
376                session.on_content_block_start(index);
377                String::new()
378            }
379            StreamInnerEvent::ContentBlockStart { .. } => {
380                // Content block without index - ignore
381                String::new()
382            }
383            StreamInnerEvent::ContentBlockDelta {
384                index: Some(index),
385                delta: Some(delta),
386            } => self.handle_content_block_delta(&mut session, index, delta),
387            StreamInnerEvent::TextDelta { text: Some(text) } => {
388                self.handle_text_delta(&mut session, &text)
389            }
390            StreamInnerEvent::ContentBlockStop { .. } => {
391                // Content block completion event - no output needed
392                // This event marks the end of a content block but doesn't produce
393                // any displayable content. It's a control event for state management.
394                String::new()
395            }
396            StreamInnerEvent::MessageDelta { .. } => {
397                // Message delta event with usage/metadata - no output needed
398                // This event contains final message metadata (stop_reason, usage stats)
399                // but is used for tracking/monitoring purposes only, not display.
400                String::new()
401            }
402            StreamInnerEvent::ContentBlockDelta { .. }
403            | StreamInnerEvent::Ping
404            | StreamInnerEvent::TextDelta { text: None }
405            | StreamInnerEvent::Error { error: None } => String::new(),
406            StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
407            StreamInnerEvent::Error {
408                error: Some(err), ..
409            } => self.handle_error_event(err),
410            StreamInnerEvent::Unknown => self.handle_unknown_event(),
411        }
412    }
413
414    /// Format a system event
415    fn format_system_event(
416        &self,
417        subtype: Option<&String>,
418        session_id: Option<String>,
419        cwd: Option<String>,
420    ) -> String {
421        let c = &self.colors;
422        let prefix = &self.display_name;
423
424        if subtype.map(std::string::String::as_str) == Some("init") {
425            let sid = session_id.unwrap_or_else(|| "unknown".to_string());
426            let mut out = format!(
427                "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
428                c.dim(),
429                prefix,
430                c.reset(),
431                c.cyan(),
432                c.reset(),
433                c.dim(),
434                sid,
435                c.reset()
436            );
437            if let Some(cwd) = cwd {
438                let _ = writeln!(
439                    out,
440                    "{}[{}]{} {}Working dir: {}{}",
441                    c.dim(),
442                    prefix,
443                    c.reset(),
444                    c.dim(),
445                    cwd,
446                    c.reset()
447                );
448            }
449            out
450        } else {
451            format!(
452                "{}[{}]{} {}{}{}\n",
453                c.dim(),
454                prefix,
455                c.reset(),
456                c.cyan(),
457                subtype.map_or("system", |s| s.as_str()),
458                c.reset()
459            )
460        }
461    }
462
463    /// Extract content from assistant message for hash-based deduplication.
464    ///
465    /// This includes both text and tool_use blocks, normalized for comparison.
466    /// Tool_use blocks are serialized in a deterministic way (name + sorted input JSON)
467    /// to ensure semantically identical tool calls produce the same hash.
468    ///
469    /// # Returns
470    /// A tuple of (normalized_content, tool_names_by_index) where:
471    /// - normalized_content: The concatenated normalized content (text + tool_use markers)
472    /// - tool_names_by_index: Map from content block index to tool name (for tool_use blocks)
473    fn extract_text_content_for_hash(
474        message: Option<&crate::json_parser::types::AssistantMessage>,
475    ) -> Option<(String, std::collections::HashMap<usize, String>)> {
476        message?.content.as_ref().map(|content| {
477            let mut normalized_parts = Vec::new();
478            let mut tool_names = std::collections::HashMap::new();
479
480            for (index, block) in content.iter().enumerate() {
481                match block {
482                    ContentBlock::Text { text } => {
483                        if let Some(text) = text.as_deref() {
484                            normalized_parts.push(text.to_string());
485                        }
486                    }
487                    ContentBlock::ToolUse { name, input } => {
488                        // Track tool name by index for hash-based deduplication
489                        if let Some(name_str) = name.as_deref() {
490                            tool_names.insert(index, name_str.to_string());
491                        }
492
493                        // Normalize tool_use for hash comparison:
494                        // Format: "TOOL_USE:{name}:{sorted_json_input}"
495                        // Sorting JSON keys ensures identical inputs produce same hash
496                        let normalized = format!(
497                            "TOOL_USE:{}:{}",
498                            name.as_deref().unwrap_or(""),
499                            input
500                                .as_ref()
501                                .map(|v| {
502                                    // Sort JSON keys for deterministic serialization
503                                    if v.is_object() {
504                                        serde_json::to_string(v).ok()
505                                    } else if v.is_string() {
506                                        v.as_str().map(|s| s.to_string())
507                                    } else {
508                                        serde_json::to_string(v).ok()
509                                    }
510                                    .unwrap_or_default()
511                                })
512                                .unwrap_or_default()
513                        );
514                        normalized_parts.push(normalized);
515                    }
516                    _ => {}
517                }
518            }
519
520            (normalized_parts.join(""), tool_names)
521        })
522    }
523
524    /// Check if this assistant message is a duplicate of already-streamed content.
525    fn is_duplicate_assistant_message(
526        &self,
527        message: Option<&crate::json_parser::types::AssistantMessage>,
528    ) -> bool {
529        use std::collections::hash_map::DefaultHasher;
530        use std::hash::{Hash, Hasher};
531
532        let session = self.streaming_session.borrow();
533
534        // Extract message_id from the assistant message
535        let assistant_msg_id = message.and_then(|m| m.id.as_ref());
536
537        // Check if this assistant event has a message_id that matches the current streaming message
538        // If it does, and we have streamed content, then this assistant event is a duplicate
539        // because the content was already streamed via deltas.
540        if let Some(ast_msg_id) = assistant_msg_id {
541            // Check if message was already marked as displayed (after message_stop)
542            if session.is_duplicate_final_message(ast_msg_id) {
543                return true;
544            }
545
546            // Check if the assistant message_id matches the current streaming message_id
547            if session.get_current_message_id() == Some(ast_msg_id) {
548                // Same message - check if we have streamed any content
549                // If yes, the assistant event is a duplicate
550                if session.has_any_streamed_content() {
551                    return true;
552                }
553            }
554        }
555
556        // Check if this exact assistant content has been rendered before
557        // This handles the case where GLM sends multiple assistant events during
558        // streaming with the same content but different message_ids
559        let content_for_hash = Self::extract_text_content_for_hash(message);
560        if let Some((ref text_content, _)) = content_for_hash {
561            if !text_content.is_empty() {
562                // Compute hash of the assistant event content
563                let mut hasher = DefaultHasher::new();
564                text_content.hash(&mut hasher);
565                let content_hash = hasher.finish();
566
567                // Check if this content was already rendered
568                if session.is_assistant_content_rendered(content_hash) {
569                    return true;
570                }
571            }
572        }
573
574        // If no message_id match, fall back to hash-based deduplication
575        // extract_text_content_for_hash now returns (content, tool_names_by_index)
576        if let Some((ref text_content, ref tool_names)) = content_for_hash {
577            if !text_content.is_empty() {
578                return session.is_duplicate_by_hash(text_content, Some(tool_names));
579            }
580        }
581
582        // Fallback to coarse check
583        session.has_any_streamed_content()
584    }
585
586    /// Format a text content block for assistant output.
587    fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
588        let limit = self.verbosity.truncate_limit("text");
589        let preview = truncate_text(text, limit);
590        let _ = writeln!(
591            out,
592            "{}[{}]{} {}{}{}",
593            colors.dim(),
594            prefix,
595            colors.reset(),
596            colors.white(),
597            preview,
598            colors.reset()
599        );
600    }
601
602    /// Format a tool use content block for assistant output.
603    fn format_tool_use_block(
604        &self,
605        out: &mut String,
606        tool: Option<&String>,
607        input: Option<&serde_json::Value>,
608        prefix: &str,
609        colors: Colors,
610    ) {
611        let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
612        let _ = writeln!(
613            out,
614            "{}[{}]{} {}Tool{}: {}{}{}",
615            colors.dim(),
616            prefix,
617            colors.reset(),
618            colors.magenta(),
619            colors.reset(),
620            colors.bold(),
621            tool_name,
622            colors.reset(),
623        );
624
625        // Show tool input details at Normal and above (not just Verbose)
626        // Tool inputs provide crucial context for understanding agent actions
627        if self.verbosity.show_tool_input() {
628            if let Some(input_val) = input {
629                let input_str = format_tool_input(input_val);
630                let limit = self.verbosity.truncate_limit("tool_input");
631                let preview = truncate_text(&input_str, limit);
632                if !preview.is_empty() {
633                    let _ = writeln!(
634                        out,
635                        "{}[{}]{} {}  └─ {}{}",
636                        colors.dim(),
637                        prefix,
638                        colors.reset(),
639                        colors.dim(),
640                        preview,
641                        colors.reset()
642                    );
643                }
644            }
645        }
646    }
647
648    /// Format a tool result content block for assistant output.
649    fn format_tool_result_block(
650        &self,
651        out: &mut String,
652        content: &serde_json::Value,
653        prefix: &str,
654        colors: Colors,
655    ) {
656        let content_str = match content {
657            serde_json::Value::String(s) => s.clone(),
658            other => other.to_string(),
659        };
660        let limit = self.verbosity.truncate_limit("tool_result");
661        let preview = truncate_text(&content_str, limit);
662        let _ = writeln!(
663            out,
664            "{}[{}]{} {}Result:{} {}",
665            colors.dim(),
666            prefix,
667            colors.reset(),
668            colors.dim(),
669            colors.reset(),
670            preview
671        );
672    }
673
674    /// Format all content blocks from an assistant message.
675    fn format_content_blocks(
676        &self,
677        out: &mut String,
678        content: &[ContentBlock],
679        prefix: &str,
680        colors: Colors,
681    ) {
682        for block in content {
683            match block {
684                ContentBlock::Text { text } => {
685                    if let Some(text) = text {
686                        self.format_text_block(out, text, prefix, colors);
687                    }
688                }
689                ContentBlock::ToolUse { name, input } => {
690                    self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
691                }
692                ContentBlock::ToolResult { content } => {
693                    if let Some(content) = content {
694                        self.format_tool_result_block(out, content, prefix, colors);
695                    }
696                }
697                ContentBlock::Unknown => {}
698            }
699        }
700    }
701
702    /// Format an assistant event
703    fn format_assistant_event(
704        &self,
705        message: Option<crate::json_parser::types::AssistantMessage>,
706    ) -> String {
707        use std::collections::hash_map::DefaultHasher;
708        use std::hash::{Hash, Hasher};
709
710        // CRITICAL FIX: When ANY content has been streamed via deltas,
711        // the Assistant event should NOT display it again.
712        // The Assistant event represents the "complete" message, but if we've
713        // already shown the streaming deltas, showing it again causes duplication.
714        if self.is_duplicate_assistant_message(message.as_ref()) {
715            return String::new();
716        }
717
718        let mut out = String::new();
719        if let Some(ref msg) = message {
720            if let Some(ref content) = msg.content {
721                self.format_content_blocks(&mut out, content, &self.display_name, self.colors);
722
723                // If we successfully rendered content, mark it as rendered
724                if !out.is_empty() {
725                    let mut session = self.streaming_session.borrow_mut();
726
727                    // Mark the message as pre-rendered so that ALL subsequent streaming
728                    // deltas for this message are suppressed.
729                    // This handles the case where assistant event arrives BEFORE streaming starts.
730                    if let Some(ref message_id) = msg.id {
731                        session.mark_message_pre_rendered(message_id);
732                    }
733
734                    // Mark the assistant content as rendered by hash to prevent duplicate
735                    // assistant events with the same content but different message_ids
736                    if let Some((ref text_content, _)) =
737                        Self::extract_text_content_for_hash(message.as_ref())
738                    {
739                        if !text_content.is_empty() {
740                            let mut hasher = DefaultHasher::new();
741                            text_content.hash(&mut hasher);
742                            let content_hash = hasher.finish();
743                            session.mark_assistant_content_rendered(content_hash);
744                        }
745                    }
746                }
747            }
748        }
749        out
750    }
751
752    /// Format a user event
753    fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
754        let c = &self.colors;
755        let prefix = &self.display_name;
756
757        if let Some(msg) = message {
758            if let Some(content) = msg.content {
759                if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
760                    let limit = self.verbosity.truncate_limit("user");
761                    let preview = truncate_text(text, limit);
762                    return format!(
763                        "{}[{}]{} {}User{}: {}{}{}\n",
764                        c.dim(),
765                        prefix,
766                        c.reset(),
767                        c.blue(),
768                        c.reset(),
769                        c.dim(),
770                        preview,
771                        c.reset()
772                    );
773                }
774            }
775        }
776        String::new()
777    }
778
779    /// Format a result event
780    fn format_result_event(
781        &self,
782        subtype: Option<String>,
783        duration_ms: Option<u64>,
784        total_cost_usd: Option<f64>,
785        num_turns: Option<u32>,
786        result: Option<String>,
787        error: Option<String>,
788    ) -> String {
789        let c = &self.colors;
790        let prefix = &self.display_name;
791
792        let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
793        let duration_m = duration_total_secs / 60;
794        let duration_s_rem = duration_total_secs % 60;
795        let cost = total_cost_usd.unwrap_or(0.0);
796        let turns = num_turns.unwrap_or(0);
797
798        let mut out = if subtype.as_deref() == Some("success") {
799            format!(
800                "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
801                c.dim(),
802                prefix,
803                c.reset(),
804                c.green(),
805                CHECK,
806                c.reset(),
807                c.dim(),
808                duration_m,
809                duration_s_rem,
810                turns,
811                cost,
812                c.reset()
813            )
814        } else {
815            let err = error.unwrap_or_else(|| "unknown error".to_string());
816            format!(
817                "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
818                c.dim(),
819                prefix,
820                c.reset(),
821                c.red(),
822                CROSS,
823                subtype.unwrap_or_else(|| "error".to_string()),
824                c.reset(),
825                err,
826                c.dim(),
827                duration_m,
828                duration_s_rem,
829                c.reset()
830            )
831        };
832
833        if let Some(result) = result {
834            let limit = self.verbosity.truncate_limit("result");
835            let preview = truncate_text(&result, limit);
836            let _ = writeln!(
837                out,
838                "\n{}Result summary:{}\n{}{}{}",
839                c.bold(),
840                c.reset(),
841                c.dim(),
842                preview,
843                c.reset()
844            );
845        }
846        out
847    }
848
849    /// Handle content block delta events
850    fn handle_content_block_delta(
851        &self,
852        session: &mut std::cell::RefMut<'_, StreamingSession>,
853        index: u64,
854        delta: ContentBlockDelta,
855    ) -> String {
856        let c = &self.colors;
857        let prefix = &self.display_name;
858
859        match delta {
860            ContentBlockDelta::TextDelta { text: Some(text) } => {
861                let index_str = index.to_string();
862
863                // Track this delta with StreamingSession for state management.
864                //
865                // StreamingSession handles protocol/streaming quality concerns (including
866                // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
867                // whether a prefix should be displayed for this stream.
868                //
869                // The parser layer still applies additional deduplication:
870                // - Skip whitespace-only accumulated output
871                // - Hash-based deduplication after sanitization (whitespace-insensitive)
872                let show_prefix = session.on_text_delta(index, &text);
873
874                // Get accumulated text for streaming display
875                let accumulated_text = session
876                    .get_accumulated(ContentType::Text, &index_str)
877                    .unwrap_or("");
878
879                // Check if this message was pre-rendered from an assistant event.
880                // When an assistant event arrives BEFORE streaming deltas, we render it
881                // and mark the message_id as pre-rendered. ALL subsequent streaming deltas
882                // for this message should be suppressed to prevent duplication.
883                if let Some(message_id) = session.get_current_message_id() {
884                    if session.is_message_pre_rendered(message_id) {
885                        return String::new();
886                    }
887                }
888
889                // Sanitize the accumulated text to check if it's empty
890                // This is needed to skip rendering when the accumulated content is just whitespace
891                let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
892
893                // Skip rendering if the sanitized text is empty (e.g., only whitespace)
894                // This prevents rendering empty lines when the accumulated content is just whitespace
895                if sanitized_text.is_empty() {
896                    return String::new();
897                }
898
899                // Check if this sanitized content has already been rendered
900                // This prevents duplicates when accumulated content differs only by whitespace
901                if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
902                {
903                    return String::new();
904                }
905
906                // Use TextDeltaRenderer for consistent rendering
907                let terminal_mode = *self.terminal_mode.borrow();
908
909                // Use prefix trie to detect if new content extends previously rendered content
910                // If yes, we do an in-place update (carriage return + new content)
911                let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
912
913                let output = if show_prefix && !has_prefix {
914                    // First delta with no prefix match - use the renderer with prefix
915                    TextDeltaRenderer::render_first_delta(
916                        accumulated_text,
917                        prefix,
918                        *c,
919                        terminal_mode,
920                    )
921                } else {
922                    // Either continuation OR prefix match - use renderer for in-place update
923                    // This handles the case where "Hello" becomes "Hello World" - we REPLACE
924                    TextDeltaRenderer::render_subsequent_delta(
925                        accumulated_text,
926                        prefix,
927                        *c,
928                        terminal_mode,
929                    )
930                };
931
932                // Mark this sanitized content as rendered for future duplicate detection
933                // We use the sanitized text (not the rendered output) to avoid false positives
934                // when the same accumulated text is rendered with different terminal modes
935                session.mark_rendered(ContentType::Text, &index_str);
936                session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
937
938                output
939            }
940            ContentBlockDelta::ThinkingDelta {
941                thinking: Some(text),
942            } => {
943                // Track thinking deltas
944                session.on_thinking_delta(index, &text);
945                // Display thinking with visual distinction
946                Self::formatter().format_thinking(text.as_str(), prefix, *c)
947            }
948            ContentBlockDelta::ToolUseDelta {
949                tool_use: Some(tool_delta),
950            } => {
951                // Track tool name for GLM/CCS deduplication (if available in delta)
952                if let Some(serde_json::Value::String(name)) = tool_delta.get("name") {
953                    session.set_tool_name(index, Some(name.to_string()));
954                }
955
956                // Handle tool input streaming
957                // Extract the tool input from the delta
958                let input_str =
959                    tool_delta
960                        .get("input")
961                        .map_or_else(String::new, |input| match input {
962                            serde_json::Value::String(s) => s.clone(),
963                            other => format_tool_input(other),
964                        });
965
966                if input_str.is_empty() {
967                    String::new()
968                } else {
969                    // Accumulate tool input
970                    session.on_tool_input_delta(index, &input_str);
971
972                    // Show partial tool input in real-time
973                    let formatter = DeltaDisplayFormatter::new();
974                    formatter.format_tool_input(&input_str, prefix, *c)
975                }
976            }
977            _ => String::new(),
978        }
979    }
980
981    /// Handle text delta events
982    fn handle_text_delta(
983        &self,
984        session: &mut std::cell::RefMut<'_, StreamingSession>,
985        text: &str,
986    ) -> String {
987        let c = &self.colors;
988        let prefix = &self.display_name;
989
990        // Standalone text delta (not part of content block)
991        // Use default index "0" for standalone text
992        let default_index = 0u64;
993        let default_index_str = "0";
994
995        // Track this delta with StreamingSession for state management.
996        //
997        // StreamingSession handles protocol/streaming quality concerns (including
998        // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
999        // whether a prefix should be displayed for this stream.
1000        //
1001        // The parser layer still applies additional deduplication:
1002        // - Skip whitespace-only accumulated output
1003        // - Hash-based deduplication after sanitization (whitespace-insensitive)
1004        let show_prefix = session.on_text_delta(default_index, text);
1005
1006        // Get accumulated text for streaming display
1007        let accumulated_text = session
1008            .get_accumulated(ContentType::Text, default_index_str)
1009            .unwrap_or("");
1010
1011        // Sanitize the accumulated text to check if it's empty
1012        // This is needed to skip rendering when the accumulated content is just whitespace
1013        let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
1014
1015        // Skip rendering if the sanitized text is empty (e.g., only whitespace)
1016        // This prevents rendering empty lines when the accumulated content is just whitespace
1017        if sanitized_text.is_empty() {
1018            return String::new();
1019        }
1020
1021        // Check if this sanitized content has already been rendered
1022        // This prevents duplicates when accumulated content differs only by whitespace
1023        if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
1024            return String::new();
1025        }
1026
1027        // Use TextDeltaRenderer for consistent rendering across all parsers
1028        let terminal_mode = *self.terminal_mode.borrow();
1029
1030        // Use prefix trie to detect if new content extends previously rendered content
1031        // If yes, we do an in-place update (carriage return + new content)
1032        let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
1033
1034        let output = if show_prefix && !has_prefix {
1035            // First delta with no prefix match - use the renderer with prefix
1036            TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
1037        } else {
1038            // Either continuation OR prefix match - use renderer for in-place update
1039            // This handles the case where "Hello" becomes "Hello World" - we REPLACE
1040            TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
1041        };
1042
1043        // Mark this sanitized content as rendered for future duplicate detection
1044        // We use the sanitized text (not the rendered output) to avoid false positives
1045        // when the same accumulated text is rendered with different terminal modes
1046        session.mark_rendered(ContentType::Text, default_index_str);
1047        session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
1048
1049        output
1050    }
1051
1052    /// Handle message stop events
1053    fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
1054        let c = &self.colors;
1055
1056        // Message complete - add final newline if we were in a content block
1057        // OR if any content was streamed (handles edge cases where block state
1058        // may not have been set but content was still streamed)
1059        let metrics = session.get_streaming_quality_metrics();
1060        let was_in_block = session.on_message_stop();
1061        let had_content = session.has_any_streamed_content();
1062        if was_in_block || had_content {
1063            // Use TextDeltaRenderer for completion - adds final newline
1064            let terminal_mode = *self.terminal_mode.borrow();
1065            let completion = format!(
1066                "{}{}",
1067                c.reset(),
1068                TextDeltaRenderer::render_completion(terminal_mode)
1069            );
1070            // Show streaming quality metrics in debug mode or when flag is set
1071            let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
1072                && metrics.total_deltas > 0;
1073            if show_metrics {
1074                format!("{}\n{}", completion, metrics.format(*c))
1075            } else {
1076                completion
1077            }
1078        } else {
1079            String::new()
1080        }
1081    }
1082
1083    /// Handle error events
1084    fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
1085        let c = &self.colors;
1086        let prefix = &self.display_name;
1087
1088        let msg = err
1089            .message
1090            .unwrap_or_else(|| "Unknown streaming error".to_string());
1091        format!(
1092            "{}[{}]{} {}Error: {}{}\n",
1093            c.dim(),
1094            prefix,
1095            c.reset(),
1096            c.red(),
1097            msg,
1098            c.reset()
1099        )
1100    }
1101
1102    /// Handle unknown events
1103    fn handle_unknown_event(&self) -> String {
1104        let c = &self.colors;
1105        let prefix = &self.display_name;
1106
1107        // Unknown stream event - in debug mode, log it
1108        if self.verbosity.is_debug() {
1109            format!(
1110                "{}[{}]{} {}Unknown streaming event{}\n",
1111                c.dim(),
1112                prefix,
1113                c.reset(),
1114                c.dim(),
1115                c.reset()
1116            )
1117        } else {
1118            String::new()
1119        }
1120    }
1121
1122    /// Check if a Claude event is a control event (state management with no user output)
1123    ///
1124    /// Control events are valid JSON that represent state transitions rather than
1125    /// user-facing content. They should be tracked separately from "ignored" events
1126    /// to avoid false health warnings.
1127    const fn is_control_event(event: &ClaudeEvent) -> bool {
1128        match event {
1129            // Stream events that are control events
1130            ClaudeEvent::StreamEvent { event } => matches!(
1131                event,
1132                StreamInnerEvent::MessageStart { .. }
1133                    | StreamInnerEvent::ContentBlockStart { .. }
1134                    | StreamInnerEvent::ContentBlockStop { .. }
1135                    | StreamInnerEvent::MessageDelta { .. }
1136                    | StreamInnerEvent::MessageStop
1137                    | StreamInnerEvent::Ping
1138            ),
1139            _ => false,
1140        }
1141    }
1142
1143    /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1144    ///
1145    /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1146    /// tool input deltas) that are shown to the user in real-time. These should be
1147    /// tracked separately to avoid inflating "ignored" percentages.
1148    const fn is_partial_event(event: &ClaudeEvent) -> bool {
1149        match event {
1150            // Stream events that produce incremental content
1151            ClaudeEvent::StreamEvent { event } => matches!(
1152                event,
1153                StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1154            ),
1155            _ => false,
1156        }
1157    }
1158
1159    /// Get a shared delta display formatter
1160    const fn formatter() -> DeltaDisplayFormatter {
1161        DeltaDisplayFormatter::new()
1162    }
1163
1164    /// Parse a stream of Claude NDJSON events
1165    pub fn parse_stream<R: BufRead>(
1166        &self,
1167        mut reader: R,
1168        workspace: &dyn crate::workspace::Workspace,
1169    ) -> io::Result<()> {
1170        use super::incremental_parser::IncrementalNdjsonParser;
1171
1172        let c = &self.colors;
1173        let monitor = HealthMonitor::new("Claude");
1174        // Accumulate log content in memory, write to workspace at the end
1175        let logging_enabled = self.log_path.is_some();
1176        let mut log_buffer: Vec<u8> = Vec::new();
1177
1178        // Use incremental parser for true real-time streaming
1179        // This processes JSON as soon as it's complete, not waiting for newlines
1180        let mut incremental_parser = IncrementalNdjsonParser::new();
1181        let mut byte_buffer = Vec::new();
1182
1183        // Track whether we've seen a success result event for GLM/ccs-glm compatibility
1184        // Some agents (GLM via CCS) emit both a success result and an error_during_execution
1185        // result when they exit with code 1 despite producing valid output. We suppress
1186        // the spurious error event to avoid confusing duplicate output.
1187        let mut seen_success_result = false;
1188
1189        loop {
1190            // Read available bytes
1191            byte_buffer.clear();
1192            let chunk = reader.fill_buf()?;
1193            if chunk.is_empty() {
1194                break;
1195            }
1196
1197            // Process all bytes immediately
1198            byte_buffer.extend_from_slice(chunk);
1199            let consumed = chunk.len();
1200            reader.consume(consumed);
1201
1202            // Feed bytes to incremental parser
1203            let json_events = incremental_parser.feed(&byte_buffer);
1204
1205            // Process each complete JSON event immediately
1206            for line in json_events {
1207                let trimmed = line.trim();
1208                if trimmed.is_empty() {
1209                    continue;
1210                }
1211
1212                // Check for Result events to handle GLM/ccs-glm duplicate event bug
1213                // Some agents emit both success and error_during_execution results
1214                let should_skip_result = if trimmed.starts_with('{') {
1215                    // First, check if the JSON has an 'errors' field with actual error messages.
1216                    // This is important because Claude events can have either 'error' (string)
1217                    // or 'errors' (array of strings), and we need to check both.
1218                    let has_errors_with_content =
1219                        if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
1220                            // Check for 'errors' array with at least one non-empty string
1221                            json.get("errors")
1222                                .and_then(|v| v.as_array())
1223                                .is_some_and(|arr| {
1224                                    arr.iter()
1225                                        .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
1226                                })
1227                        } else {
1228                            false
1229                        };
1230
1231                    if let Ok(ClaudeEvent::Result {
1232                        subtype,
1233                        duration_ms,
1234                        error,
1235                        ..
1236                    }) = serde_json::from_str::<ClaudeEvent>(trimmed)
1237                    {
1238                        let is_error_result = subtype.as_deref() != Some("success");
1239
1240                        // Suppress spurious GLM error events based on these characteristics:
1241                        // 1. Error event (subtype != "success")
1242                        // 2. duration_ms is 0 or very small (< 100ms, indicating synthetic event)
1243                        // 3. error field is null or empty (no actual error message)
1244                        // 4. NO 'errors' field with actual error messages (this indicates a real error)
1245                        //
1246                        // These criteria identify the spurious error_during_execution events
1247                        // that GLM emits when exiting with code 1 despite producing valid output.
1248                        //
1249                        // We DON'T suppress if there's an 'errors' array with content, because
1250                        // that indicates a real error condition that the user should see.
1251                        let is_spurious_glm_error = is_error_result
1252                            && duration_ms.unwrap_or(0) < 100
1253                            && (error.is_none() || error.as_ref().is_some_and(|e| e.is_empty()))
1254                            && !has_errors_with_content;
1255
1256                        if is_spurious_glm_error && seen_success_result {
1257                            // Error after success - suppress (original fix)
1258                            true
1259                        } else if subtype.as_deref() == Some("success") {
1260                            seen_success_result = true;
1261                            false
1262                        } else if is_spurious_glm_error {
1263                            // Spurious error BEFORE success - still suppress based on characteristics
1264                            // This handles the reverse-order case where error arrives first
1265                            true
1266                        } else {
1267                            false
1268                        }
1269                    } else {
1270                        false
1271                    }
1272                } else {
1273                    false
1274                };
1275
1276                // In debug mode, also show the raw JSON
1277                if self.verbosity.is_debug() {
1278                    eprintln!(
1279                        "{}[DEBUG]{} {}{}{}",
1280                        c.dim(),
1281                        c.reset(),
1282                        c.dim(),
1283                        &line,
1284                        c.reset()
1285                    );
1286                }
1287
1288                // Skip suppressed result events but still log them
1289                if should_skip_result {
1290                    if logging_enabled {
1291                        writeln!(log_buffer, "{line}")?;
1292                    }
1293                    monitor.record_control_event();
1294                    continue;
1295                }
1296
1297                // Parse the event once - parse_event handles malformed JSON by returning None
1298                match self.parse_event(&line) {
1299                    Some(output) => {
1300                        // Check if this is a partial/delta event (streaming content)
1301                        if trimmed.starts_with('{') {
1302                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1303                                if Self::is_partial_event(&event) {
1304                                    monitor.record_partial_event();
1305                                } else {
1306                                    monitor.record_parsed();
1307                                }
1308                            } else {
1309                                monitor.record_parsed();
1310                            }
1311                        } else {
1312                            monitor.record_parsed();
1313                        }
1314                        // Write output to printer
1315                        let mut printer = self.printer.borrow_mut();
1316                        write!(printer, "{output}")?;
1317                        printer.flush()?;
1318                    }
1319                    None => {
1320                        // Check if this was a control event (state management with no user output)
1321                        // Control events are valid JSON that return empty output but aren't "ignored"
1322                        if trimmed.starts_with('{') {
1323                            if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1324                                if Self::is_control_event(&event) {
1325                                    monitor.record_control_event();
1326                                } else {
1327                                    // Valid JSON but not a control event - track as unknown
1328                                    monitor.record_unknown_event();
1329                                }
1330                            } else {
1331                                // Failed to deserialize - track as parse error
1332                                monitor.record_parse_error();
1333                            }
1334                        } else {
1335                            monitor.record_ignored();
1336                        }
1337                    }
1338                }
1339
1340                // Log raw JSON to buffer if configured
1341                if logging_enabled {
1342                    writeln!(log_buffer, "{line}")?;
1343                }
1344            }
1345        }
1346
1347        // Write accumulated log content to workspace
1348        if let Some(log_path) = &self.log_path {
1349            workspace.append_bytes(log_path, &log_buffer)?;
1350        }
1351        if let Some(warning) = monitor.check_and_warn(*c) {
1352            let mut printer = self.printer.borrow_mut();
1353            writeln!(printer, "{warning}")?;
1354            printer.flush()?;
1355        }
1356        Ok(())
1357    }
1358}
1359
1360#[cfg(all(test, feature = "test-utils"))]
1361mod tests {
1362    use super::*;
1363    use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1364
1365    #[test]
1366    fn test_printer_method_accessible() {
1367        // Test that the printer() method is accessible and returns a SharedPrinter
1368        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1369        let parser =
1370            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1371
1372        // This test verifies the printer() method is accessible
1373        let _printer_ref = parser.printer();
1374    }
1375
1376    #[test]
1377    fn test_streaming_metrics_method_accessible() {
1378        // Test that the streaming_metrics() method is accessible
1379        let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1380        let parser =
1381            ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1382
1383        // This test verifies the streaming_metrics() method is accessible
1384        let _metrics = parser.streaming_metrics();
1385    }
1386}