ralph_workflow/json_parser/claude.rs
1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//! creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46#[cfg(feature = "test-utils")]
47use super::health::StreamingQualityMetrics;
48use super::printer::SharedPrinter;
49use super::streaming_state::StreamingSession;
50use super::terminal::TerminalMode;
51use super::types::{
52 format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
53 ContentType, StreamInnerEvent,
54};
55
56/// Claude event parser
57///
58/// Note: This parser is designed for single-threaded use only.
59/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
60/// Do not share this parser across threads.
61pub struct ClaudeParser {
62 colors: Colors,
63 pub(crate) verbosity: Verbosity,
64 /// Relative path to log file (if logging enabled)
65 log_path: Option<std::path::PathBuf>,
66 display_name: String,
67 /// Unified streaming session tracker
68 /// Provides single source of truth for streaming state across all content types
69 streaming_session: Rc<RefCell<StreamingSession>>,
70 /// Terminal mode for output formatting
71 /// Detected at parse time and cached for performance
72 terminal_mode: RefCell<TerminalMode>,
73 /// Whether to show streaming quality metrics
74 show_streaming_metrics: bool,
75 /// Output printer for capturing or displaying output
76 printer: SharedPrinter,
77}
78
79impl ClaudeParser {
80 /// Create a new `ClaudeParser` with the given colors and verbosity.
81 ///
82 /// # Arguments
83 ///
84 /// * `colors` - Colors for terminal output
85 /// * `verbosity` - Verbosity level for output
86 ///
87 /// # Returns
88 ///
89 /// A new `ClaudeParser` instance
90 ///
91 /// # Example
92 ///
93 /// ```ignore
94 /// use ralph_workflow::json_parser::ClaudeParser;
95 /// use ralph_workflow::logger::Colors;
96 /// use ralph_workflow::config::Verbosity;
97 ///
98 /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
99 /// ```
100 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
101 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
102 }
103
104 /// Create a new `ClaudeParser` with a custom printer.
105 ///
106 /// # Arguments
107 ///
108 /// * `colors` - Colors for terminal output
109 /// * `verbosity` - Verbosity level for output
110 /// * `printer` - Shared printer for output
111 ///
112 /// # Returns
113 ///
114 /// A new `ClaudeParser` instance
115 pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
116 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
117 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
118
119 // Use the printer's is_terminal method to validate it's connected correctly
120 // This is a sanity check that also satisfies the compiler that the method is used
121 let _printer_is_terminal = printer.borrow().is_terminal();
122
123 Self {
124 colors,
125 verbosity,
126 log_path: None,
127 display_name: "Claude".to_string(),
128 streaming_session: Rc::new(RefCell::new(streaming_session)),
129 terminal_mode: RefCell::new(TerminalMode::detect()),
130 show_streaming_metrics: false,
131 printer,
132 }
133 }
134
135 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
136 self.show_streaming_metrics = show;
137 self
138 }
139
140 /// Set the display name for this parser.
141 ///
142 /// # Arguments
143 ///
144 /// * `display_name` - The name to display in output
145 ///
146 /// # Returns
147 ///
148 /// Self for builder pattern chaining
149 pub fn with_display_name(mut self, display_name: &str) -> Self {
150 self.display_name = display_name.to_string();
151 self
152 }
153
154 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
155 self.log_path = Some(std::path::PathBuf::from(path));
156 self
157 }
158
159 /// Set the terminal mode for this parser.
160 ///
161 /// # Arguments
162 ///
163 /// * `mode` - The terminal mode to use
164 ///
165 /// # Returns
166 ///
167 /// Self for builder pattern chaining
168 #[cfg(any(test, feature = "test-utils"))]
169 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
170 *self.terminal_mode.borrow_mut() = mode;
171 self
172 }
173
174 /// Get a shared reference to the printer.
175 ///
176 /// This allows tests, monitoring, and other code to access the printer after parsing
177 /// to verify output content, check for duplicates, or capture output for analysis.
178 ///
179 /// # Returns
180 ///
181 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
182 ///
183 /// # Example
184 ///
185 /// ```ignore
186 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
187 /// use std::rc::Rc;
188 /// use std::cell::RefCell;
189 ///
190 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
191 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
192 ///
193 /// // Parse events...
194 ///
195 /// // Now access the printer to verify output
196 /// let printer_ref = parser.printer().borrow();
197 /// assert!(!printer_ref.has_duplicate_consecutive_lines());
198 /// ```
199 /// Get a clone of the printer used by this parser.
200 ///
201 /// This is primarily useful for testing and monitoring.
202 /// Only available with the `test-utils` feature.
203 #[cfg(feature = "test-utils")]
204 pub fn printer(&self) -> SharedPrinter {
205 Rc::clone(&self.printer)
206 }
207
208 /// Get streaming quality metrics from the current session.
209 ///
210 /// This provides insight into the deduplication and streaming quality of the
211 /// parsing session, including:
212 /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
213 /// - Number of large deltas (potential protocol violations)
214 /// - Total deltas processed
215 ///
216 /// Useful for testing, monitoring, and debugging streaming behavior.
217 /// Only available with the `test-utils` feature.
218 ///
219 /// # Returns
220 ///
221 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
222 ///
223 /// # Example
224 ///
225 /// ```ignore
226 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
227 /// use std::rc::Rc;
228 /// use std::cell::RefCell;
229 ///
230 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
231 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
232 ///
233 /// // Parse events...
234 ///
235 /// // Verify deduplication logic triggered
236 /// let metrics = parser.streaming_metrics();
237 /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
238 /// ```
239 #[cfg(feature = "test-utils")]
240 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
241 self.streaming_session
242 .borrow()
243 .get_streaming_quality_metrics()
244 }
245
246 /// Parse and display a single Claude JSON event
247 ///
248 /// Returns `Some(formatted_output)` for valid events, or None for:
249 /// - Malformed JSON (logged at debug level)
250 /// - Unknown event types
251 /// - Empty or whitespace-only output
252 pub fn parse_event(&self, line: &str) -> Option<String> {
253 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
254 e
255 } else {
256 // Non-JSON line - could be raw text output from agent
257 // Pass through as-is if it looks like real output (not empty)
258 let trimmed = line.trim();
259 if !trimmed.is_empty() && !trimmed.starts_with('{') {
260 return Some(format!("{trimmed}\n"));
261 }
262 return None;
263 };
264 let c = &self.colors;
265 let prefix = &self.display_name;
266
267 let output = match event {
268 ClaudeEvent::System {
269 subtype,
270 session_id,
271 cwd,
272 } => self.format_system_event(subtype.as_ref(), session_id, cwd),
273 ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
274 ClaudeEvent::User { message } => self.format_user_event(message),
275 ClaudeEvent::Result {
276 subtype,
277 duration_ms,
278 total_cost_usd,
279 num_turns,
280 result,
281 error,
282 } => self.format_result_event(
283 subtype,
284 duration_ms,
285 total_cost_usd,
286 num_turns,
287 result,
288 error,
289 ),
290 ClaudeEvent::StreamEvent { event } => {
291 // Handle streaming events for delta/partial updates
292 self.parse_stream_event(event)
293 }
294 ClaudeEvent::Unknown => {
295 // Use the generic unknown event formatter for consistent handling
296 // In verbose mode, this will show the event type and key fields
297 // In normal mode, this returns empty string
298 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
299 }
300 };
301
302 if output.is_empty() {
303 None
304 } else {
305 Some(output)
306 }
307 }
308
309 /// Parse a streaming event for delta/partial updates
310 ///
311 /// Handles the nested events within `stream_event`:
312 /// - MessageStart/Stop: Manage session state
313 /// - `ContentBlockStart`: Initialize new content blocks
314 /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
315 /// - `ContentBlockStop`: Finalize content blocks
316 /// - `MessageDelta`: Process message metadata without output
317 /// - Error: Display appropriately
318 ///
319 /// Returns String for display content, empty String for control events.
320 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
321 let mut session = self.streaming_session.borrow_mut();
322
323 match event {
324 StreamInnerEvent::MessageStart {
325 message,
326 message_id,
327 } => {
328 // Extract message_id from either the top-level field or nested message.id
329 // The Claude API typically puts the ID in message.id, not at the top level
330 let effective_message_id =
331 message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
332 // Set message ID for tracking and clear session state on new message
333 session.set_current_message_id(effective_message_id);
334 session.on_message_start();
335 String::new()
336 }
337 StreamInnerEvent::ContentBlockStart {
338 index: Some(index),
339 content_block: Some(block),
340 } => {
341 // Initialize a new content block at this index
342 session.on_content_block_start(index);
343 match &block {
344 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
345 // Initial text in ContentBlockStart - treat as first delta
346 session.on_text_delta(index, t);
347 }
348 ContentBlock::ToolUse { name, input } => {
349 // Track tool name for GLM/CCS deduplication.
350 // IMPORTANT: Track the tool name when provided, even when input is None.
351 // GLM may send ContentBlockStart with name but no input, then send input via delta.
352 // We only store when we have a name to avoid overwriting a previous tool name with None.
353 if let Some(n) = name {
354 session.set_tool_name(index, Some(n.clone()));
355 }
356
357 // Initialize tool input accumulator only if input is present
358 if let Some(i) = input {
359 let input_str = if let serde_json::Value::String(s) = &i {
360 s.clone()
361 } else {
362 format_tool_input(i)
363 };
364 session.on_tool_input_delta(index, &input_str);
365 }
366 }
367 _ => {}
368 }
369 String::new()
370 }
371 StreamInnerEvent::ContentBlockStart {
372 index: Some(index),
373 content_block: None,
374 } => {
375 // Content block started but no initial content provided
376 session.on_content_block_start(index);
377 String::new()
378 }
379 StreamInnerEvent::ContentBlockStart { .. } => {
380 // Content block without index - ignore
381 String::new()
382 }
383 StreamInnerEvent::ContentBlockDelta {
384 index: Some(index),
385 delta: Some(delta),
386 } => self.handle_content_block_delta(&mut session, index, delta),
387 StreamInnerEvent::TextDelta { text: Some(text) } => {
388 self.handle_text_delta(&mut session, &text)
389 }
390 StreamInnerEvent::ContentBlockStop { .. } => {
391 // Content block completion event - no output needed
392 // This event marks the end of a content block but doesn't produce
393 // any displayable content. It's a control event for state management.
394 String::new()
395 }
396 StreamInnerEvent::MessageDelta { .. } => {
397 // Message delta event with usage/metadata - no output needed
398 // This event contains final message metadata (stop_reason, usage stats)
399 // but is used for tracking/monitoring purposes only, not display.
400 String::new()
401 }
402 StreamInnerEvent::ContentBlockDelta { .. }
403 | StreamInnerEvent::Ping
404 | StreamInnerEvent::TextDelta { text: None }
405 | StreamInnerEvent::Error { error: None } => String::new(),
406 StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
407 StreamInnerEvent::Error {
408 error: Some(err), ..
409 } => self.handle_error_event(err),
410 StreamInnerEvent::Unknown => self.handle_unknown_event(),
411 }
412 }
413
414 /// Format a system event
415 fn format_system_event(
416 &self,
417 subtype: Option<&String>,
418 session_id: Option<String>,
419 cwd: Option<String>,
420 ) -> String {
421 let c = &self.colors;
422 let prefix = &self.display_name;
423
424 if subtype.map(std::string::String::as_str) == Some("init") {
425 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
426 let mut out = format!(
427 "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
428 c.dim(),
429 prefix,
430 c.reset(),
431 c.cyan(),
432 c.reset(),
433 c.dim(),
434 sid,
435 c.reset()
436 );
437 if let Some(cwd) = cwd {
438 let _ = writeln!(
439 out,
440 "{}[{}]{} {}Working dir: {}{}",
441 c.dim(),
442 prefix,
443 c.reset(),
444 c.dim(),
445 cwd,
446 c.reset()
447 );
448 }
449 out
450 } else {
451 format!(
452 "{}[{}]{} {}{}{}\n",
453 c.dim(),
454 prefix,
455 c.reset(),
456 c.cyan(),
457 subtype.map_or("system", |s| s.as_str()),
458 c.reset()
459 )
460 }
461 }
462
463 /// Extract content from assistant message for hash-based deduplication.
464 ///
465 /// This includes both text and tool_use blocks, normalized for comparison.
466 /// Tool_use blocks are serialized in a deterministic way (name + sorted input JSON)
467 /// to ensure semantically identical tool calls produce the same hash.
468 ///
469 /// # Returns
470 /// A tuple of (normalized_content, tool_names_by_index) where:
471 /// - normalized_content: The concatenated normalized content (text + tool_use markers)
472 /// - tool_names_by_index: Map from content block index to tool name (for tool_use blocks)
473 fn extract_text_content_for_hash(
474 message: Option<&crate::json_parser::types::AssistantMessage>,
475 ) -> Option<(String, std::collections::HashMap<usize, String>)> {
476 message?.content.as_ref().map(|content| {
477 let mut normalized_parts = Vec::new();
478 let mut tool_names = std::collections::HashMap::new();
479
480 for (index, block) in content.iter().enumerate() {
481 match block {
482 ContentBlock::Text { text } => {
483 if let Some(text) = text.as_deref() {
484 normalized_parts.push(text.to_string());
485 }
486 }
487 ContentBlock::ToolUse { name, input } => {
488 // Track tool name by index for hash-based deduplication
489 if let Some(name_str) = name.as_deref() {
490 tool_names.insert(index, name_str.to_string());
491 }
492
493 // Normalize tool_use for hash comparison:
494 // Format: "TOOL_USE:{name}:{sorted_json_input}"
495 // Sorting JSON keys ensures identical inputs produce same hash
496 let normalized = format!(
497 "TOOL_USE:{}:{}",
498 name.as_deref().unwrap_or(""),
499 input
500 .as_ref()
501 .map(|v| {
502 // Sort JSON keys for deterministic serialization
503 if v.is_object() {
504 serde_json::to_string(v).ok()
505 } else if v.is_string() {
506 v.as_str().map(|s| s.to_string())
507 } else {
508 serde_json::to_string(v).ok()
509 }
510 .unwrap_or_default()
511 })
512 .unwrap_or_default()
513 );
514 normalized_parts.push(normalized);
515 }
516 _ => {}
517 }
518 }
519
520 (normalized_parts.join(""), tool_names)
521 })
522 }
523
524 /// Check if this assistant message is a duplicate of already-streamed content.
525 fn is_duplicate_assistant_message(
526 &self,
527 message: Option<&crate::json_parser::types::AssistantMessage>,
528 ) -> bool {
529 use std::collections::hash_map::DefaultHasher;
530 use std::hash::{Hash, Hasher};
531
532 let session = self.streaming_session.borrow();
533
534 // Extract message_id from the assistant message
535 let assistant_msg_id = message.and_then(|m| m.id.as_ref());
536
537 // Check if this assistant event has a message_id that matches the current streaming message
538 // If it does, and we have streamed content, then this assistant event is a duplicate
539 // because the content was already streamed via deltas.
540 if let Some(ast_msg_id) = assistant_msg_id {
541 // Check if message was already marked as displayed (after message_stop)
542 if session.is_duplicate_final_message(ast_msg_id) {
543 return true;
544 }
545
546 // Check if the assistant message_id matches the current streaming message_id
547 if session.get_current_message_id() == Some(ast_msg_id) {
548 // Same message - check if we have streamed any content
549 // If yes, the assistant event is a duplicate
550 if session.has_any_streamed_content() {
551 return true;
552 }
553 }
554 }
555
556 // Check if this exact assistant content has been rendered before
557 // This handles the case where GLM sends multiple assistant events during
558 // streaming with the same content but different message_ids
559 let content_for_hash = Self::extract_text_content_for_hash(message);
560 if let Some((ref text_content, _)) = content_for_hash {
561 if !text_content.is_empty() {
562 // Compute hash of the assistant event content
563 let mut hasher = DefaultHasher::new();
564 text_content.hash(&mut hasher);
565 let content_hash = hasher.finish();
566
567 // Check if this content was already rendered
568 if session.is_assistant_content_rendered(content_hash) {
569 return true;
570 }
571 }
572 }
573
574 // If no message_id match, fall back to hash-based deduplication
575 // extract_text_content_for_hash now returns (content, tool_names_by_index)
576 if let Some((ref text_content, ref tool_names)) = content_for_hash {
577 if !text_content.is_empty() {
578 return session.is_duplicate_by_hash(text_content, Some(tool_names));
579 }
580 }
581
582 // Fallback to coarse check
583 session.has_any_streamed_content()
584 }
585
586 /// Format a text content block for assistant output.
587 fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
588 let limit = self.verbosity.truncate_limit("text");
589 let preview = truncate_text(text, limit);
590 let _ = writeln!(
591 out,
592 "{}[{}]{} {}{}{}",
593 colors.dim(),
594 prefix,
595 colors.reset(),
596 colors.white(),
597 preview,
598 colors.reset()
599 );
600 }
601
602 /// Format a tool use content block for assistant output.
603 fn format_tool_use_block(
604 &self,
605 out: &mut String,
606 tool: Option<&String>,
607 input: Option<&serde_json::Value>,
608 prefix: &str,
609 colors: Colors,
610 ) {
611 let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
612 let _ = writeln!(
613 out,
614 "{}[{}]{} {}Tool{}: {}{}{}",
615 colors.dim(),
616 prefix,
617 colors.reset(),
618 colors.magenta(),
619 colors.reset(),
620 colors.bold(),
621 tool_name,
622 colors.reset(),
623 );
624
625 // Show tool input details at Normal and above (not just Verbose)
626 // Tool inputs provide crucial context for understanding agent actions
627 if self.verbosity.show_tool_input() {
628 if let Some(input_val) = input {
629 let input_str = format_tool_input(input_val);
630 let limit = self.verbosity.truncate_limit("tool_input");
631 let preview = truncate_text(&input_str, limit);
632 if !preview.is_empty() {
633 let _ = writeln!(
634 out,
635 "{}[{}]{} {} └─ {}{}",
636 colors.dim(),
637 prefix,
638 colors.reset(),
639 colors.dim(),
640 preview,
641 colors.reset()
642 );
643 }
644 }
645 }
646 }
647
648 /// Format a tool result content block for assistant output.
649 fn format_tool_result_block(
650 &self,
651 out: &mut String,
652 content: &serde_json::Value,
653 prefix: &str,
654 colors: Colors,
655 ) {
656 let content_str = match content {
657 serde_json::Value::String(s) => s.clone(),
658 other => other.to_string(),
659 };
660 let limit = self.verbosity.truncate_limit("tool_result");
661 let preview = truncate_text(&content_str, limit);
662 let _ = writeln!(
663 out,
664 "{}[{}]{} {}Result:{} {}",
665 colors.dim(),
666 prefix,
667 colors.reset(),
668 colors.dim(),
669 colors.reset(),
670 preview
671 );
672 }
673
674 /// Format all content blocks from an assistant message.
675 fn format_content_blocks(
676 &self,
677 out: &mut String,
678 content: &[ContentBlock],
679 prefix: &str,
680 colors: Colors,
681 ) {
682 for block in content {
683 match block {
684 ContentBlock::Text { text } => {
685 if let Some(text) = text {
686 self.format_text_block(out, text, prefix, colors);
687 }
688 }
689 ContentBlock::ToolUse { name, input } => {
690 self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
691 }
692 ContentBlock::ToolResult { content } => {
693 if let Some(content) = content {
694 self.format_tool_result_block(out, content, prefix, colors);
695 }
696 }
697 ContentBlock::Unknown => {}
698 }
699 }
700 }
701
702 /// Format an assistant event
703 fn format_assistant_event(
704 &self,
705 message: Option<crate::json_parser::types::AssistantMessage>,
706 ) -> String {
707 use std::collections::hash_map::DefaultHasher;
708 use std::hash::{Hash, Hasher};
709
710 // CRITICAL FIX: When ANY content has been streamed via deltas,
711 // the Assistant event should NOT display it again.
712 // The Assistant event represents the "complete" message, but if we've
713 // already shown the streaming deltas, showing it again causes duplication.
714 if self.is_duplicate_assistant_message(message.as_ref()) {
715 return String::new();
716 }
717
718 let mut out = String::new();
719 if let Some(ref msg) = message {
720 if let Some(ref content) = msg.content {
721 self.format_content_blocks(&mut out, content, &self.display_name, self.colors);
722
723 // If we successfully rendered content, mark it as rendered
724 if !out.is_empty() {
725 let mut session = self.streaming_session.borrow_mut();
726
727 // Mark the message as pre-rendered so that ALL subsequent streaming
728 // deltas for this message are suppressed.
729 // This handles the case where assistant event arrives BEFORE streaming starts.
730 if let Some(ref message_id) = msg.id {
731 session.mark_message_pre_rendered(message_id);
732 }
733
734 // Mark the assistant content as rendered by hash to prevent duplicate
735 // assistant events with the same content but different message_ids
736 if let Some((ref text_content, _)) =
737 Self::extract_text_content_for_hash(message.as_ref())
738 {
739 if !text_content.is_empty() {
740 let mut hasher = DefaultHasher::new();
741 text_content.hash(&mut hasher);
742 let content_hash = hasher.finish();
743 session.mark_assistant_content_rendered(content_hash);
744 }
745 }
746 }
747 }
748 }
749 out
750 }
751
752 /// Format a user event
753 fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
754 let c = &self.colors;
755 let prefix = &self.display_name;
756
757 if let Some(msg) = message {
758 if let Some(content) = msg.content {
759 if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
760 let limit = self.verbosity.truncate_limit("user");
761 let preview = truncate_text(text, limit);
762 return format!(
763 "{}[{}]{} {}User{}: {}{}{}\n",
764 c.dim(),
765 prefix,
766 c.reset(),
767 c.blue(),
768 c.reset(),
769 c.dim(),
770 preview,
771 c.reset()
772 );
773 }
774 }
775 }
776 String::new()
777 }
778
779 /// Format a result event
780 fn format_result_event(
781 &self,
782 subtype: Option<String>,
783 duration_ms: Option<u64>,
784 total_cost_usd: Option<f64>,
785 num_turns: Option<u32>,
786 result: Option<String>,
787 error: Option<String>,
788 ) -> String {
789 let c = &self.colors;
790 let prefix = &self.display_name;
791
792 let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
793 let duration_m = duration_total_secs / 60;
794 let duration_s_rem = duration_total_secs % 60;
795 let cost = total_cost_usd.unwrap_or(0.0);
796 let turns = num_turns.unwrap_or(0);
797
798 let mut out = if subtype.as_deref() == Some("success") {
799 format!(
800 "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
801 c.dim(),
802 prefix,
803 c.reset(),
804 c.green(),
805 CHECK,
806 c.reset(),
807 c.dim(),
808 duration_m,
809 duration_s_rem,
810 turns,
811 cost,
812 c.reset()
813 )
814 } else {
815 let err = error.unwrap_or_else(|| "unknown error".to_string());
816 format!(
817 "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
818 c.dim(),
819 prefix,
820 c.reset(),
821 c.red(),
822 CROSS,
823 subtype.unwrap_or_else(|| "error".to_string()),
824 c.reset(),
825 err,
826 c.dim(),
827 duration_m,
828 duration_s_rem,
829 c.reset()
830 )
831 };
832
833 if let Some(result) = result {
834 let limit = self.verbosity.truncate_limit("result");
835 let preview = truncate_text(&result, limit);
836 let _ = writeln!(
837 out,
838 "\n{}Result summary:{}\n{}{}{}",
839 c.bold(),
840 c.reset(),
841 c.dim(),
842 preview,
843 c.reset()
844 );
845 }
846 out
847 }
848
849 /// Handle content block delta events
850 fn handle_content_block_delta(
851 &self,
852 session: &mut std::cell::RefMut<'_, StreamingSession>,
853 index: u64,
854 delta: ContentBlockDelta,
855 ) -> String {
856 let c = &self.colors;
857 let prefix = &self.display_name;
858
859 match delta {
860 ContentBlockDelta::TextDelta { text: Some(text) } => {
861 let index_str = index.to_string();
862
863 // Track this delta with StreamingSession for state management.
864 //
865 // StreamingSession handles protocol/streaming quality concerns (including
866 // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
867 // whether a prefix should be displayed for this stream.
868 //
869 // The parser layer still applies additional deduplication:
870 // - Skip whitespace-only accumulated output
871 // - Hash-based deduplication after sanitization (whitespace-insensitive)
872 let show_prefix = session.on_text_delta(index, &text);
873
874 // Get accumulated text for streaming display
875 let accumulated_text = session
876 .get_accumulated(ContentType::Text, &index_str)
877 .unwrap_or("");
878
879 // Check if this message was pre-rendered from an assistant event.
880 // When an assistant event arrives BEFORE streaming deltas, we render it
881 // and mark the message_id as pre-rendered. ALL subsequent streaming deltas
882 // for this message should be suppressed to prevent duplication.
883 if let Some(message_id) = session.get_current_message_id() {
884 if session.is_message_pre_rendered(message_id) {
885 return String::new();
886 }
887 }
888
889 // Sanitize the accumulated text to check if it's empty
890 // This is needed to skip rendering when the accumulated content is just whitespace
891 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
892
893 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
894 // This prevents rendering empty lines when the accumulated content is just whitespace
895 if sanitized_text.is_empty() {
896 return String::new();
897 }
898
899 // Check if this sanitized content has already been rendered
900 // This prevents duplicates when accumulated content differs only by whitespace
901 if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
902 {
903 return String::new();
904 }
905
906 // Use TextDeltaRenderer for consistent rendering
907 let terminal_mode = *self.terminal_mode.borrow();
908
909 // Use prefix trie to detect if new content extends previously rendered content
910 // If yes, we do an in-place update (carriage return + new content)
911 let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
912
913 let output = if show_prefix && !has_prefix {
914 // First delta with no prefix match - use the renderer with prefix
915 TextDeltaRenderer::render_first_delta(
916 accumulated_text,
917 prefix,
918 *c,
919 terminal_mode,
920 )
921 } else {
922 // Either continuation OR prefix match - use renderer for in-place update
923 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
924 TextDeltaRenderer::render_subsequent_delta(
925 accumulated_text,
926 prefix,
927 *c,
928 terminal_mode,
929 )
930 };
931
932 // Mark this sanitized content as rendered for future duplicate detection
933 // We use the sanitized text (not the rendered output) to avoid false positives
934 // when the same accumulated text is rendered with different terminal modes
935 session.mark_rendered(ContentType::Text, &index_str);
936 session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
937
938 output
939 }
940 ContentBlockDelta::ThinkingDelta {
941 thinking: Some(text),
942 } => {
943 // Track thinking deltas
944 session.on_thinking_delta(index, &text);
945 // Display thinking with visual distinction
946 Self::formatter().format_thinking(text.as_str(), prefix, *c)
947 }
948 ContentBlockDelta::ToolUseDelta {
949 tool_use: Some(tool_delta),
950 } => {
951 // Track tool name for GLM/CCS deduplication (if available in delta)
952 if let Some(serde_json::Value::String(name)) = tool_delta.get("name") {
953 session.set_tool_name(index, Some(name.to_string()));
954 }
955
956 // Handle tool input streaming
957 // Extract the tool input from the delta
958 let input_str =
959 tool_delta
960 .get("input")
961 .map_or_else(String::new, |input| match input {
962 serde_json::Value::String(s) => s.clone(),
963 other => format_tool_input(other),
964 });
965
966 if input_str.is_empty() {
967 String::new()
968 } else {
969 // Accumulate tool input
970 session.on_tool_input_delta(index, &input_str);
971
972 // Show partial tool input in real-time
973 let formatter = DeltaDisplayFormatter::new();
974 formatter.format_tool_input(&input_str, prefix, *c)
975 }
976 }
977 _ => String::new(),
978 }
979 }
980
981 /// Handle text delta events
982 fn handle_text_delta(
983 &self,
984 session: &mut std::cell::RefMut<'_, StreamingSession>,
985 text: &str,
986 ) -> String {
987 let c = &self.colors;
988 let prefix = &self.display_name;
989
990 // Standalone text delta (not part of content block)
991 // Use default index "0" for standalone text
992 let default_index = 0u64;
993 let default_index_str = "0";
994
995 // Track this delta with StreamingSession for state management.
996 //
997 // StreamingSession handles protocol/streaming quality concerns (including
998 // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
999 // whether a prefix should be displayed for this stream.
1000 //
1001 // The parser layer still applies additional deduplication:
1002 // - Skip whitespace-only accumulated output
1003 // - Hash-based deduplication after sanitization (whitespace-insensitive)
1004 let show_prefix = session.on_text_delta(default_index, text);
1005
1006 // Get accumulated text for streaming display
1007 let accumulated_text = session
1008 .get_accumulated(ContentType::Text, default_index_str)
1009 .unwrap_or("");
1010
1011 // Sanitize the accumulated text to check if it's empty
1012 // This is needed to skip rendering when the accumulated content is just whitespace
1013 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
1014
1015 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
1016 // This prevents rendering empty lines when the accumulated content is just whitespace
1017 if sanitized_text.is_empty() {
1018 return String::new();
1019 }
1020
1021 // Check if this sanitized content has already been rendered
1022 // This prevents duplicates when accumulated content differs only by whitespace
1023 if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
1024 return String::new();
1025 }
1026
1027 // Use TextDeltaRenderer for consistent rendering across all parsers
1028 let terminal_mode = *self.terminal_mode.borrow();
1029
1030 // Use prefix trie to detect if new content extends previously rendered content
1031 // If yes, we do an in-place update (carriage return + new content)
1032 let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
1033
1034 let output = if show_prefix && !has_prefix {
1035 // First delta with no prefix match - use the renderer with prefix
1036 TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
1037 } else {
1038 // Either continuation OR prefix match - use renderer for in-place update
1039 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
1040 TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
1041 };
1042
1043 // Mark this sanitized content as rendered for future duplicate detection
1044 // We use the sanitized text (not the rendered output) to avoid false positives
1045 // when the same accumulated text is rendered with different terminal modes
1046 session.mark_rendered(ContentType::Text, default_index_str);
1047 session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
1048
1049 output
1050 }
1051
1052 /// Handle message stop events
1053 fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
1054 let c = &self.colors;
1055
1056 // Message complete - add final newline if we were in a content block
1057 // OR if any content was streamed (handles edge cases where block state
1058 // may not have been set but content was still streamed)
1059 let metrics = session.get_streaming_quality_metrics();
1060 let was_in_block = session.on_message_stop();
1061 let had_content = session.has_any_streamed_content();
1062 if was_in_block || had_content {
1063 // Use TextDeltaRenderer for completion - adds final newline
1064 let terminal_mode = *self.terminal_mode.borrow();
1065 let completion = format!(
1066 "{}{}",
1067 c.reset(),
1068 TextDeltaRenderer::render_completion(terminal_mode)
1069 );
1070 // Show streaming quality metrics in debug mode or when flag is set
1071 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
1072 && metrics.total_deltas > 0;
1073 if show_metrics {
1074 format!("{}\n{}", completion, metrics.format(*c))
1075 } else {
1076 completion
1077 }
1078 } else {
1079 String::new()
1080 }
1081 }
1082
1083 /// Handle error events
1084 fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
1085 let c = &self.colors;
1086 let prefix = &self.display_name;
1087
1088 let msg = err
1089 .message
1090 .unwrap_or_else(|| "Unknown streaming error".to_string());
1091 format!(
1092 "{}[{}]{} {}Error: {}{}\n",
1093 c.dim(),
1094 prefix,
1095 c.reset(),
1096 c.red(),
1097 msg,
1098 c.reset()
1099 )
1100 }
1101
1102 /// Handle unknown events
1103 fn handle_unknown_event(&self) -> String {
1104 let c = &self.colors;
1105 let prefix = &self.display_name;
1106
1107 // Unknown stream event - in debug mode, log it
1108 if self.verbosity.is_debug() {
1109 format!(
1110 "{}[{}]{} {}Unknown streaming event{}\n",
1111 c.dim(),
1112 prefix,
1113 c.reset(),
1114 c.dim(),
1115 c.reset()
1116 )
1117 } else {
1118 String::new()
1119 }
1120 }
1121
1122 /// Check if a Claude event is a control event (state management with no user output)
1123 ///
1124 /// Control events are valid JSON that represent state transitions rather than
1125 /// user-facing content. They should be tracked separately from "ignored" events
1126 /// to avoid false health warnings.
1127 const fn is_control_event(event: &ClaudeEvent) -> bool {
1128 match event {
1129 // Stream events that are control events
1130 ClaudeEvent::StreamEvent { event } => matches!(
1131 event,
1132 StreamInnerEvent::MessageStart { .. }
1133 | StreamInnerEvent::ContentBlockStart { .. }
1134 | StreamInnerEvent::ContentBlockStop { .. }
1135 | StreamInnerEvent::MessageDelta { .. }
1136 | StreamInnerEvent::MessageStop
1137 | StreamInnerEvent::Ping
1138 ),
1139 _ => false,
1140 }
1141 }
1142
1143 /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1144 ///
1145 /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1146 /// tool input deltas) that are shown to the user in real-time. These should be
1147 /// tracked separately to avoid inflating "ignored" percentages.
1148 const fn is_partial_event(event: &ClaudeEvent) -> bool {
1149 match event {
1150 // Stream events that produce incremental content
1151 ClaudeEvent::StreamEvent { event } => matches!(
1152 event,
1153 StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1154 ),
1155 _ => false,
1156 }
1157 }
1158
1159 /// Get a shared delta display formatter
1160 const fn formatter() -> DeltaDisplayFormatter {
1161 DeltaDisplayFormatter::new()
1162 }
1163
1164 /// Parse a stream of Claude NDJSON events
1165 pub fn parse_stream<R: BufRead>(
1166 &self,
1167 mut reader: R,
1168 workspace: &dyn crate::workspace::Workspace,
1169 ) -> io::Result<()> {
1170 use super::incremental_parser::IncrementalNdjsonParser;
1171
1172 let c = &self.colors;
1173 let monitor = HealthMonitor::new("Claude");
1174 // Accumulate log content in memory, write to workspace at the end
1175 let logging_enabled = self.log_path.is_some();
1176 let mut log_buffer: Vec<u8> = Vec::new();
1177
1178 // Use incremental parser for true real-time streaming
1179 // This processes JSON as soon as it's complete, not waiting for newlines
1180 let mut incremental_parser = IncrementalNdjsonParser::new();
1181 let mut byte_buffer = Vec::new();
1182
1183 // Track whether we've seen a success result event for GLM/ccs-glm compatibility
1184 // Some agents (GLM via CCS) emit both a success result and an error_during_execution
1185 // result when they exit with code 1 despite producing valid output. We suppress
1186 // the spurious error event to avoid confusing duplicate output.
1187 let mut seen_success_result = false;
1188
1189 loop {
1190 // Read available bytes
1191 byte_buffer.clear();
1192 let chunk = reader.fill_buf()?;
1193 if chunk.is_empty() {
1194 break;
1195 }
1196
1197 // Process all bytes immediately
1198 byte_buffer.extend_from_slice(chunk);
1199 let consumed = chunk.len();
1200 reader.consume(consumed);
1201
1202 // Feed bytes to incremental parser
1203 let json_events = incremental_parser.feed(&byte_buffer);
1204
1205 // Process each complete JSON event immediately
1206 for line in json_events {
1207 let trimmed = line.trim();
1208 if trimmed.is_empty() {
1209 continue;
1210 }
1211
1212 // Check for Result events to handle GLM/ccs-glm duplicate event bug
1213 // Some agents emit both success and error_during_execution results
1214 let should_skip_result = if trimmed.starts_with('{') {
1215 // First, check if the JSON has an 'errors' field with actual error messages.
1216 // This is important because Claude events can have either 'error' (string)
1217 // or 'errors' (array of strings), and we need to check both.
1218 let has_errors_with_content =
1219 if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
1220 // Check for 'errors' array with at least one non-empty string
1221 json.get("errors")
1222 .and_then(|v| v.as_array())
1223 .is_some_and(|arr| {
1224 arr.iter()
1225 .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
1226 })
1227 } else {
1228 false
1229 };
1230
1231 if let Ok(ClaudeEvent::Result {
1232 subtype,
1233 duration_ms,
1234 error,
1235 ..
1236 }) = serde_json::from_str::<ClaudeEvent>(trimmed)
1237 {
1238 let is_error_result = subtype.as_deref() != Some("success");
1239
1240 // Suppress spurious GLM error events based on these characteristics:
1241 // 1. Error event (subtype != "success")
1242 // 2. duration_ms is 0 or very small (< 100ms, indicating synthetic event)
1243 // 3. error field is null or empty (no actual error message)
1244 // 4. NO 'errors' field with actual error messages (this indicates a real error)
1245 //
1246 // These criteria identify the spurious error_during_execution events
1247 // that GLM emits when exiting with code 1 despite producing valid output.
1248 //
1249 // We DON'T suppress if there's an 'errors' array with content, because
1250 // that indicates a real error condition that the user should see.
1251 let is_spurious_glm_error = is_error_result
1252 && duration_ms.unwrap_or(0) < 100
1253 && (error.is_none() || error.as_ref().is_some_and(|e| e.is_empty()))
1254 && !has_errors_with_content;
1255
1256 if is_spurious_glm_error && seen_success_result {
1257 // Error after success - suppress (original fix)
1258 true
1259 } else if subtype.as_deref() == Some("success") {
1260 seen_success_result = true;
1261 false
1262 } else if is_spurious_glm_error {
1263 // Spurious error BEFORE success - still suppress based on characteristics
1264 // This handles the reverse-order case where error arrives first
1265 true
1266 } else {
1267 false
1268 }
1269 } else {
1270 false
1271 }
1272 } else {
1273 false
1274 };
1275
1276 // In debug mode, also show the raw JSON
1277 if self.verbosity.is_debug() {
1278 eprintln!(
1279 "{}[DEBUG]{} {}{}{}",
1280 c.dim(),
1281 c.reset(),
1282 c.dim(),
1283 &line,
1284 c.reset()
1285 );
1286 }
1287
1288 // Skip suppressed result events but still log them
1289 if should_skip_result {
1290 if logging_enabled {
1291 writeln!(log_buffer, "{line}")?;
1292 }
1293 monitor.record_control_event();
1294 continue;
1295 }
1296
1297 // Parse the event once - parse_event handles malformed JSON by returning None
1298 match self.parse_event(&line) {
1299 Some(output) => {
1300 // Check if this is a partial/delta event (streaming content)
1301 if trimmed.starts_with('{') {
1302 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1303 if Self::is_partial_event(&event) {
1304 monitor.record_partial_event();
1305 } else {
1306 monitor.record_parsed();
1307 }
1308 } else {
1309 monitor.record_parsed();
1310 }
1311 } else {
1312 monitor.record_parsed();
1313 }
1314 // Write output to printer
1315 let mut printer = self.printer.borrow_mut();
1316 write!(printer, "{output}")?;
1317 printer.flush()?;
1318 }
1319 None => {
1320 // Check if this was a control event (state management with no user output)
1321 // Control events are valid JSON that return empty output but aren't "ignored"
1322 if trimmed.starts_with('{') {
1323 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1324 if Self::is_control_event(&event) {
1325 monitor.record_control_event();
1326 } else {
1327 // Valid JSON but not a control event - track as unknown
1328 monitor.record_unknown_event();
1329 }
1330 } else {
1331 // Failed to deserialize - track as parse error
1332 monitor.record_parse_error();
1333 }
1334 } else {
1335 monitor.record_ignored();
1336 }
1337 }
1338 }
1339
1340 // Log raw JSON to buffer if configured
1341 if logging_enabled {
1342 writeln!(log_buffer, "{line}")?;
1343 }
1344 }
1345 }
1346
1347 // Write accumulated log content to workspace
1348 if let Some(log_path) = &self.log_path {
1349 workspace.append_bytes(log_path, &log_buffer)?;
1350 }
1351 if let Some(warning) = monitor.check_and_warn(*c) {
1352 let mut printer = self.printer.borrow_mut();
1353 writeln!(printer, "{warning}")?;
1354 printer.flush()?;
1355 }
1356 Ok(())
1357 }
1358}
1359
1360#[cfg(all(test, feature = "test-utils"))]
1361mod tests {
1362 use super::*;
1363 use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1364
1365 #[test]
1366 fn test_printer_method_accessible() {
1367 // Test that the printer() method is accessible and returns a SharedPrinter
1368 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1369 let parser =
1370 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1371
1372 // This test verifies the printer() method is accessible
1373 let _printer_ref = parser.printer();
1374 }
1375
1376 #[test]
1377 fn test_streaming_metrics_method_accessible() {
1378 // Test that the streaming_metrics() method is accessible
1379 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1380 let parser =
1381 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1382
1383 // This test verifies the streaming_metrics() method is accessible
1384 let _metrics = parser.streaming_metrics();
1385 }
1386}