ralph_workflow/json_parser/claude.rs
1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//! creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46#[cfg(feature = "test-utils")]
47use super::health::StreamingQualityMetrics;
48use super::printer::SharedPrinter;
49use super::streaming_state::StreamingSession;
50use super::terminal::TerminalMode;
51use super::types::{
52 format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
53 ContentType, StreamInnerEvent,
54};
55
56/// Claude event parser
57///
58/// Note: This parser is designed for single-threaded use only.
59/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
60/// Do not share this parser across threads.
61pub struct ClaudeParser {
62 colors: Colors,
63 pub(crate) verbosity: Verbosity,
64 log_file: Option<String>,
65 display_name: String,
66 /// Unified streaming session tracker
67 /// Provides single source of truth for streaming state across all content types
68 streaming_session: Rc<RefCell<StreamingSession>>,
69 /// Terminal mode for output formatting
70 /// Detected at parse time and cached for performance
71 terminal_mode: RefCell<TerminalMode>,
72 /// Whether to show streaming quality metrics
73 show_streaming_metrics: bool,
74 /// Output printer for capturing or displaying output
75 printer: SharedPrinter,
76}
77
78impl ClaudeParser {
79 /// Create a new `ClaudeParser` with the given colors and verbosity.
80 ///
81 /// # Arguments
82 ///
83 /// * `colors` - Colors for terminal output
84 /// * `verbosity` - Verbosity level for output
85 ///
86 /// # Returns
87 ///
88 /// A new `ClaudeParser` instance
89 ///
90 /// # Example
91 ///
92 /// ```ignore
93 /// use ralph_workflow::json_parser::ClaudeParser;
94 /// use ralph_workflow::logger::Colors;
95 /// use ralph_workflow::config::Verbosity;
96 ///
97 /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
98 /// ```
99 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
100 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
101 }
102
103 /// Create a new `ClaudeParser` with a custom printer.
104 ///
105 /// # Arguments
106 ///
107 /// * `colors` - Colors for terminal output
108 /// * `verbosity` - Verbosity level for output
109 /// * `printer` - Shared printer for output
110 ///
111 /// # Returns
112 ///
113 /// A new `ClaudeParser` instance
114 pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
115 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
116 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
117
118 // Use the printer's is_terminal method to validate it's connected correctly
119 // This is a sanity check that also satisfies the compiler that the method is used
120 let _printer_is_terminal = printer.borrow().is_terminal();
121
122 Self {
123 colors,
124 verbosity,
125 log_file: None,
126 display_name: "Claude".to_string(),
127 streaming_session: Rc::new(RefCell::new(streaming_session)),
128 terminal_mode: RefCell::new(TerminalMode::detect()),
129 show_streaming_metrics: false,
130 printer,
131 }
132 }
133
134 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
135 self.show_streaming_metrics = show;
136 self
137 }
138
139 /// Set the display name for this parser.
140 ///
141 /// # Arguments
142 ///
143 /// * `display_name` - The name to display in output
144 ///
145 /// # Returns
146 ///
147 /// Self for builder pattern chaining
148 pub fn with_display_name(mut self, display_name: &str) -> Self {
149 self.display_name = display_name.to_string();
150 self
151 }
152
153 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
154 self.log_file = Some(path.to_string());
155 self
156 }
157
158 /// Set the terminal mode for this parser.
159 ///
160 /// # Arguments
161 ///
162 /// * `mode` - The terminal mode to use
163 ///
164 /// # Returns
165 ///
166 /// Self for builder pattern chaining
167 #[cfg(any(test, feature = "test-utils"))]
168 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
169 *self.terminal_mode.borrow_mut() = mode;
170 self
171 }
172
173 /// Get a shared reference to the printer.
174 ///
175 /// This allows tests, monitoring, and other code to access the printer after parsing
176 /// to verify output content, check for duplicates, or capture output for analysis.
177 ///
178 /// # Returns
179 ///
180 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
181 ///
182 /// # Example
183 ///
184 /// ```ignore
185 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
186 /// use std::rc::Rc;
187 /// use std::cell::RefCell;
188 ///
189 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
190 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
191 ///
192 /// // Parse events...
193 ///
194 /// // Now access the printer to verify output
195 /// let printer_ref = parser.printer().borrow();
196 /// assert!(!printer_ref.has_duplicate_consecutive_lines());
197 /// ```
198 /// Get a clone of the printer used by this parser.
199 ///
200 /// This is primarily useful for testing and monitoring.
201 /// Only available with the `test-utils` feature.
202 #[cfg(feature = "test-utils")]
203 pub fn printer(&self) -> SharedPrinter {
204 Rc::clone(&self.printer)
205 }
206
207 /// Get streaming quality metrics from the current session.
208 ///
209 /// This provides insight into the deduplication and streaming quality of the
210 /// parsing session, including:
211 /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
212 /// - Number of large deltas (potential protocol violations)
213 /// - Total deltas processed
214 ///
215 /// Useful for testing, monitoring, and debugging streaming behavior.
216 /// Only available with the `test-utils` feature.
217 ///
218 /// # Returns
219 ///
220 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
221 ///
222 /// # Example
223 ///
224 /// ```ignore
225 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
226 /// use std::rc::Rc;
227 /// use std::cell::RefCell;
228 ///
229 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
230 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
231 ///
232 /// // Parse events...
233 ///
234 /// // Verify deduplication logic triggered
235 /// let metrics = parser.streaming_metrics();
236 /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
237 /// ```
238 #[cfg(feature = "test-utils")]
239 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
240 self.streaming_session
241 .borrow()
242 .get_streaming_quality_metrics()
243 }
244
245 /// Parse and display a single Claude JSON event
246 ///
247 /// Returns `Some(formatted_output)` for valid events, or None for:
248 /// - Malformed JSON (logged at debug level)
249 /// - Unknown event types
250 /// - Empty or whitespace-only output
251 pub fn parse_event(&self, line: &str) -> Option<String> {
252 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
253 e
254 } else {
255 // Non-JSON line - could be raw text output from agent
256 // Pass through as-is if it looks like real output (not empty)
257 let trimmed = line.trim();
258 if !trimmed.is_empty() && !trimmed.starts_with('{') {
259 return Some(format!("{trimmed}\n"));
260 }
261 return None;
262 };
263 let c = &self.colors;
264 let prefix = &self.display_name;
265
266 let output = match event {
267 ClaudeEvent::System {
268 subtype,
269 session_id,
270 cwd,
271 } => self.format_system_event(subtype.as_ref(), session_id, cwd),
272 ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
273 ClaudeEvent::User { message } => self.format_user_event(message),
274 ClaudeEvent::Result {
275 subtype,
276 duration_ms,
277 total_cost_usd,
278 num_turns,
279 result,
280 error,
281 } => self.format_result_event(
282 subtype,
283 duration_ms,
284 total_cost_usd,
285 num_turns,
286 result,
287 error,
288 ),
289 ClaudeEvent::StreamEvent { event } => {
290 // Handle streaming events for delta/partial updates
291 self.parse_stream_event(event)
292 }
293 ClaudeEvent::Unknown => {
294 // Use the generic unknown event formatter for consistent handling
295 // In verbose mode, this will show the event type and key fields
296 // In normal mode, this returns empty string
297 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
298 }
299 };
300
301 if output.is_empty() {
302 None
303 } else {
304 Some(output)
305 }
306 }
307
308 /// Parse a streaming event for delta/partial updates
309 ///
310 /// Handles the nested events within `stream_event`:
311 /// - MessageStart/Stop: Manage session state
312 /// - `ContentBlockStart`: Initialize new content blocks
313 /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
314 /// - `ContentBlockStop`: Finalize content blocks
315 /// - `MessageDelta`: Process message metadata without output
316 /// - Error: Display appropriately
317 ///
318 /// Returns String for display content, empty String for control events.
319 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
320 let mut session = self.streaming_session.borrow_mut();
321
322 match event {
323 StreamInnerEvent::MessageStart {
324 message,
325 message_id,
326 } => {
327 // Extract message_id from either the top-level field or nested message.id
328 // The Claude API typically puts the ID in message.id, not at the top level
329 let effective_message_id =
330 message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
331 // Set message ID for tracking and clear session state on new message
332 session.set_current_message_id(effective_message_id);
333 session.on_message_start();
334 String::new()
335 }
336 StreamInnerEvent::ContentBlockStart {
337 index: Some(index),
338 content_block: Some(block),
339 } => {
340 // Initialize a new content block at this index
341 session.on_content_block_start(index);
342 match &block {
343 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
344 // Initial text in ContentBlockStart - treat as first delta
345 session.on_text_delta(index, t);
346 }
347 ContentBlock::ToolUse { name, input } => {
348 // Track tool name for GLM/CCS deduplication.
349 // IMPORTANT: Track the tool name when provided, even when input is None.
350 // GLM may send ContentBlockStart with name but no input, then send input via delta.
351 // We only store when we have a name to avoid overwriting a previous tool name with None.
352 if let Some(n) = name {
353 session.set_tool_name(index, Some(n.clone()));
354 }
355
356 // Initialize tool input accumulator only if input is present
357 if let Some(i) = input {
358 let input_str = if let serde_json::Value::String(s) = &i {
359 s.clone()
360 } else {
361 format_tool_input(i)
362 };
363 session.on_tool_input_delta(index, &input_str);
364 }
365 }
366 _ => {}
367 }
368 String::new()
369 }
370 StreamInnerEvent::ContentBlockStart {
371 index: Some(index),
372 content_block: None,
373 } => {
374 // Content block started but no initial content provided
375 session.on_content_block_start(index);
376 String::new()
377 }
378 StreamInnerEvent::ContentBlockStart { .. } => {
379 // Content block without index - ignore
380 String::new()
381 }
382 StreamInnerEvent::ContentBlockDelta {
383 index: Some(index),
384 delta: Some(delta),
385 } => self.handle_content_block_delta(&mut session, index, delta),
386 StreamInnerEvent::TextDelta { text: Some(text) } => {
387 self.handle_text_delta(&mut session, &text)
388 }
389 StreamInnerEvent::ContentBlockStop { .. } => {
390 // Content block completion event - no output needed
391 // This event marks the end of a content block but doesn't produce
392 // any displayable content. It's a control event for state management.
393 String::new()
394 }
395 StreamInnerEvent::MessageDelta { .. } => {
396 // Message delta event with usage/metadata - no output needed
397 // This event contains final message metadata (stop_reason, usage stats)
398 // but is used for tracking/monitoring purposes only, not display.
399 String::new()
400 }
401 StreamInnerEvent::ContentBlockDelta { .. }
402 | StreamInnerEvent::Ping
403 | StreamInnerEvent::TextDelta { text: None }
404 | StreamInnerEvent::Error { error: None } => String::new(),
405 StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
406 StreamInnerEvent::Error {
407 error: Some(err), ..
408 } => self.handle_error_event(err),
409 StreamInnerEvent::Unknown => self.handle_unknown_event(),
410 }
411 }
412
413 /// Format a system event
414 fn format_system_event(
415 &self,
416 subtype: Option<&String>,
417 session_id: Option<String>,
418 cwd: Option<String>,
419 ) -> String {
420 let c = &self.colors;
421 let prefix = &self.display_name;
422
423 if subtype.map(std::string::String::as_str) == Some("init") {
424 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
425 let mut out = format!(
426 "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
427 c.dim(),
428 prefix,
429 c.reset(),
430 c.cyan(),
431 c.reset(),
432 c.dim(),
433 sid,
434 c.reset()
435 );
436 if let Some(cwd) = cwd {
437 let _ = writeln!(
438 out,
439 "{}[{}]{} {}Working dir: {}{}",
440 c.dim(),
441 prefix,
442 c.reset(),
443 c.dim(),
444 cwd,
445 c.reset()
446 );
447 }
448 out
449 } else {
450 format!(
451 "{}[{}]{} {}{}{}\n",
452 c.dim(),
453 prefix,
454 c.reset(),
455 c.cyan(),
456 subtype.map_or("system", |s| s.as_str()),
457 c.reset()
458 )
459 }
460 }
461
462 /// Extract content from assistant message for hash-based deduplication.
463 ///
464 /// This includes both text and tool_use blocks, normalized for comparison.
465 /// Tool_use blocks are serialized in a deterministic way (name + sorted input JSON)
466 /// to ensure semantically identical tool calls produce the same hash.
467 ///
468 /// # Returns
469 /// A tuple of (normalized_content, tool_names_by_index) where:
470 /// - normalized_content: The concatenated normalized content (text + tool_use markers)
471 /// - tool_names_by_index: Map from content block index to tool name (for tool_use blocks)
472 fn extract_text_content_for_hash(
473 message: Option<&crate::json_parser::types::AssistantMessage>,
474 ) -> Option<(String, std::collections::HashMap<usize, String>)> {
475 message?.content.as_ref().map(|content| {
476 let mut normalized_parts = Vec::new();
477 let mut tool_names = std::collections::HashMap::new();
478
479 for (index, block) in content.iter().enumerate() {
480 match block {
481 ContentBlock::Text { text } => {
482 if let Some(text) = text.as_deref() {
483 normalized_parts.push(text.to_string());
484 }
485 }
486 ContentBlock::ToolUse { name, input } => {
487 // Track tool name by index for hash-based deduplication
488 if let Some(name_str) = name.as_deref() {
489 tool_names.insert(index, name_str.to_string());
490 }
491
492 // Normalize tool_use for hash comparison:
493 // Format: "TOOL_USE:{name}:{sorted_json_input}"
494 // Sorting JSON keys ensures identical inputs produce same hash
495 let normalized = format!(
496 "TOOL_USE:{}:{}",
497 name.as_deref().unwrap_or(""),
498 input
499 .as_ref()
500 .map(|v| {
501 // Sort JSON keys for deterministic serialization
502 if v.is_object() {
503 serde_json::to_string(v).ok()
504 } else if v.is_string() {
505 v.as_str().map(|s| s.to_string())
506 } else {
507 serde_json::to_string(v).ok()
508 }
509 .unwrap_or_default()
510 })
511 .unwrap_or_default()
512 );
513 normalized_parts.push(normalized);
514 }
515 _ => {}
516 }
517 }
518
519 (normalized_parts.join(""), tool_names)
520 })
521 }
522
523 /// Check if this assistant message is a duplicate of already-streamed content.
524 fn is_duplicate_assistant_message(
525 &self,
526 message: Option<&crate::json_parser::types::AssistantMessage>,
527 ) -> bool {
528 use std::collections::hash_map::DefaultHasher;
529 use std::hash::{Hash, Hasher};
530
531 let session = self.streaming_session.borrow();
532
533 // Extract message_id from the assistant message
534 let assistant_msg_id = message.and_then(|m| m.id.as_ref());
535
536 // Check if this assistant event has a message_id that matches the current streaming message
537 // If it does, and we have streamed content, then this assistant event is a duplicate
538 // because the content was already streamed via deltas.
539 if let Some(ast_msg_id) = assistant_msg_id {
540 // Check if message was already marked as displayed (after message_stop)
541 if session.is_duplicate_final_message(ast_msg_id) {
542 return true;
543 }
544
545 // Check if the assistant message_id matches the current streaming message_id
546 if session.get_current_message_id() == Some(ast_msg_id) {
547 // Same message - check if we have streamed any content
548 // If yes, the assistant event is a duplicate
549 if session.has_any_streamed_content() {
550 return true;
551 }
552 }
553 }
554
555 // Check if this exact assistant content has been rendered before
556 // This handles the case where GLM sends multiple assistant events during
557 // streaming with the same content but different message_ids
558 let content_for_hash = Self::extract_text_content_for_hash(message);
559 if let Some((ref text_content, _)) = content_for_hash {
560 if !text_content.is_empty() {
561 // Compute hash of the assistant event content
562 let mut hasher = DefaultHasher::new();
563 text_content.hash(&mut hasher);
564 let content_hash = hasher.finish();
565
566 // Check if this content was already rendered
567 if session.is_assistant_content_rendered(content_hash) {
568 return true;
569 }
570 }
571 }
572
573 // If no message_id match, fall back to hash-based deduplication
574 // extract_text_content_for_hash now returns (content, tool_names_by_index)
575 if let Some((ref text_content, ref tool_names)) = content_for_hash {
576 if !text_content.is_empty() {
577 return session.is_duplicate_by_hash(text_content, Some(tool_names));
578 }
579 }
580
581 // Fallback to coarse check
582 session.has_any_streamed_content()
583 }
584
585 /// Format a text content block for assistant output.
586 fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
587 let limit = self.verbosity.truncate_limit("text");
588 let preview = truncate_text(text, limit);
589 let _ = writeln!(
590 out,
591 "{}[{}]{} {}{}{}",
592 colors.dim(),
593 prefix,
594 colors.reset(),
595 colors.white(),
596 preview,
597 colors.reset()
598 );
599 }
600
601 /// Format a tool use content block for assistant output.
602 fn format_tool_use_block(
603 &self,
604 out: &mut String,
605 tool: Option<&String>,
606 input: Option<&serde_json::Value>,
607 prefix: &str,
608 colors: Colors,
609 ) {
610 let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
611 let _ = writeln!(
612 out,
613 "{}[{}]{} {}Tool{}: {}{}{}",
614 colors.dim(),
615 prefix,
616 colors.reset(),
617 colors.magenta(),
618 colors.reset(),
619 colors.bold(),
620 tool_name,
621 colors.reset(),
622 );
623
624 // Show tool input details at Normal and above (not just Verbose)
625 // Tool inputs provide crucial context for understanding agent actions
626 if self.verbosity.show_tool_input() {
627 if let Some(input_val) = input {
628 let input_str = format_tool_input(input_val);
629 let limit = self.verbosity.truncate_limit("tool_input");
630 let preview = truncate_text(&input_str, limit);
631 if !preview.is_empty() {
632 let _ = writeln!(
633 out,
634 "{}[{}]{} {} └─ {}{}",
635 colors.dim(),
636 prefix,
637 colors.reset(),
638 colors.dim(),
639 preview,
640 colors.reset()
641 );
642 }
643 }
644 }
645 }
646
647 /// Format a tool result content block for assistant output.
648 fn format_tool_result_block(
649 &self,
650 out: &mut String,
651 content: &serde_json::Value,
652 prefix: &str,
653 colors: Colors,
654 ) {
655 let content_str = match content {
656 serde_json::Value::String(s) => s.clone(),
657 other => other.to_string(),
658 };
659 let limit = self.verbosity.truncate_limit("tool_result");
660 let preview = truncate_text(&content_str, limit);
661 let _ = writeln!(
662 out,
663 "{}[{}]{} {}Result:{} {}",
664 colors.dim(),
665 prefix,
666 colors.reset(),
667 colors.dim(),
668 colors.reset(),
669 preview
670 );
671 }
672
673 /// Format all content blocks from an assistant message.
674 fn format_content_blocks(
675 &self,
676 out: &mut String,
677 content: &[ContentBlock],
678 prefix: &str,
679 colors: Colors,
680 ) {
681 for block in content {
682 match block {
683 ContentBlock::Text { text } => {
684 if let Some(text) = text {
685 self.format_text_block(out, text, prefix, colors);
686 }
687 }
688 ContentBlock::ToolUse { name, input } => {
689 self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
690 }
691 ContentBlock::ToolResult { content } => {
692 if let Some(content) = content {
693 self.format_tool_result_block(out, content, prefix, colors);
694 }
695 }
696 ContentBlock::Unknown => {}
697 }
698 }
699 }
700
701 /// Format an assistant event
702 fn format_assistant_event(
703 &self,
704 message: Option<crate::json_parser::types::AssistantMessage>,
705 ) -> String {
706 use std::collections::hash_map::DefaultHasher;
707 use std::hash::{Hash, Hasher};
708
709 // CRITICAL FIX: When ANY content has been streamed via deltas,
710 // the Assistant event should NOT display it again.
711 // The Assistant event represents the "complete" message, but if we've
712 // already shown the streaming deltas, showing it again causes duplication.
713 if self.is_duplicate_assistant_message(message.as_ref()) {
714 return String::new();
715 }
716
717 let mut out = String::new();
718 if let Some(ref msg) = message {
719 if let Some(ref content) = msg.content {
720 self.format_content_blocks(&mut out, content, &self.display_name, self.colors);
721
722 // If we successfully rendered content, mark it as rendered
723 if !out.is_empty() {
724 let mut session = self.streaming_session.borrow_mut();
725
726 // Mark the message as pre-rendered so that ALL subsequent streaming
727 // deltas for this message are suppressed.
728 // This handles the case where assistant event arrives BEFORE streaming starts.
729 if let Some(ref message_id) = msg.id {
730 session.mark_message_pre_rendered(message_id);
731 }
732
733 // Mark the assistant content as rendered by hash to prevent duplicate
734 // assistant events with the same content but different message_ids
735 if let Some((ref text_content, _)) =
736 Self::extract_text_content_for_hash(message.as_ref())
737 {
738 if !text_content.is_empty() {
739 let mut hasher = DefaultHasher::new();
740 text_content.hash(&mut hasher);
741 let content_hash = hasher.finish();
742 session.mark_assistant_content_rendered(content_hash);
743 }
744 }
745 }
746 }
747 }
748 out
749 }
750
751 /// Format a user event
752 fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
753 let c = &self.colors;
754 let prefix = &self.display_name;
755
756 if let Some(msg) = message {
757 if let Some(content) = msg.content {
758 if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
759 let limit = self.verbosity.truncate_limit("user");
760 let preview = truncate_text(text, limit);
761 return format!(
762 "{}[{}]{} {}User{}: {}{}{}\n",
763 c.dim(),
764 prefix,
765 c.reset(),
766 c.blue(),
767 c.reset(),
768 c.dim(),
769 preview,
770 c.reset()
771 );
772 }
773 }
774 }
775 String::new()
776 }
777
778 /// Format a result event
779 fn format_result_event(
780 &self,
781 subtype: Option<String>,
782 duration_ms: Option<u64>,
783 total_cost_usd: Option<f64>,
784 num_turns: Option<u32>,
785 result: Option<String>,
786 error: Option<String>,
787 ) -> String {
788 let c = &self.colors;
789 let prefix = &self.display_name;
790
791 let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
792 let duration_m = duration_total_secs / 60;
793 let duration_s_rem = duration_total_secs % 60;
794 let cost = total_cost_usd.unwrap_or(0.0);
795 let turns = num_turns.unwrap_or(0);
796
797 let mut out = if subtype.as_deref() == Some("success") {
798 format!(
799 "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
800 c.dim(),
801 prefix,
802 c.reset(),
803 c.green(),
804 CHECK,
805 c.reset(),
806 c.dim(),
807 duration_m,
808 duration_s_rem,
809 turns,
810 cost,
811 c.reset()
812 )
813 } else {
814 let err = error.unwrap_or_else(|| "unknown error".to_string());
815 format!(
816 "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
817 c.dim(),
818 prefix,
819 c.reset(),
820 c.red(),
821 CROSS,
822 subtype.unwrap_or_else(|| "error".to_string()),
823 c.reset(),
824 err,
825 c.dim(),
826 duration_m,
827 duration_s_rem,
828 c.reset()
829 )
830 };
831
832 if let Some(result) = result {
833 let limit = self.verbosity.truncate_limit("result");
834 let preview = truncate_text(&result, limit);
835 let _ = writeln!(
836 out,
837 "\n{}Result summary:{}\n{}{}{}",
838 c.bold(),
839 c.reset(),
840 c.dim(),
841 preview,
842 c.reset()
843 );
844 }
845 out
846 }
847
848 /// Handle content block delta events
849 fn handle_content_block_delta(
850 &self,
851 session: &mut std::cell::RefMut<'_, StreamingSession>,
852 index: u64,
853 delta: ContentBlockDelta,
854 ) -> String {
855 let c = &self.colors;
856 let prefix = &self.display_name;
857
858 match delta {
859 ContentBlockDelta::TextDelta { text: Some(text) } => {
860 let index_str = index.to_string();
861
862 // Track this delta with StreamingSession for state management.
863 //
864 // StreamingSession handles protocol/streaming quality concerns (including
865 // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
866 // whether a prefix should be displayed for this stream.
867 //
868 // The parser layer still applies additional deduplication:
869 // - Skip whitespace-only accumulated output
870 // - Hash-based deduplication after sanitization (whitespace-insensitive)
871 let show_prefix = session.on_text_delta(index, &text);
872
873 // Get accumulated text for streaming display
874 let accumulated_text = session
875 .get_accumulated(ContentType::Text, &index_str)
876 .unwrap_or("");
877
878 // Check if this message was pre-rendered from an assistant event.
879 // When an assistant event arrives BEFORE streaming deltas, we render it
880 // and mark the message_id as pre-rendered. ALL subsequent streaming deltas
881 // for this message should be suppressed to prevent duplication.
882 if let Some(message_id) = session.get_current_message_id() {
883 if session.is_message_pre_rendered(message_id) {
884 return String::new();
885 }
886 }
887
888 // Sanitize the accumulated text to check if it's empty
889 // This is needed to skip rendering when the accumulated content is just whitespace
890 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
891
892 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
893 // This prevents rendering empty lines when the accumulated content is just whitespace
894 if sanitized_text.is_empty() {
895 return String::new();
896 }
897
898 // Check if this sanitized content has already been rendered
899 // This prevents duplicates when accumulated content differs only by whitespace
900 if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
901 {
902 return String::new();
903 }
904
905 // Use TextDeltaRenderer for consistent rendering
906 let terminal_mode = *self.terminal_mode.borrow();
907
908 // Use prefix trie to detect if new content extends previously rendered content
909 // If yes, we do an in-place update (carriage return + new content)
910 let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
911
912 let output = if show_prefix && !has_prefix {
913 // First delta with no prefix match - use the renderer with prefix
914 TextDeltaRenderer::render_first_delta(
915 accumulated_text,
916 prefix,
917 *c,
918 terminal_mode,
919 )
920 } else {
921 // Either continuation OR prefix match - use renderer for in-place update
922 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
923 TextDeltaRenderer::render_subsequent_delta(
924 accumulated_text,
925 prefix,
926 *c,
927 terminal_mode,
928 )
929 };
930
931 // Mark this sanitized content as rendered for future duplicate detection
932 // We use the sanitized text (not the rendered output) to avoid false positives
933 // when the same accumulated text is rendered with different terminal modes
934 session.mark_rendered(ContentType::Text, &index_str);
935 session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
936
937 output
938 }
939 ContentBlockDelta::ThinkingDelta {
940 thinking: Some(text),
941 } => {
942 // Track thinking deltas
943 session.on_thinking_delta(index, &text);
944 // Display thinking with visual distinction
945 Self::formatter().format_thinking(text.as_str(), prefix, *c)
946 }
947 ContentBlockDelta::ToolUseDelta {
948 tool_use: Some(tool_delta),
949 } => {
950 // Track tool name for GLM/CCS deduplication (if available in delta)
951 if let Some(serde_json::Value::String(name)) = tool_delta.get("name") {
952 session.set_tool_name(index, Some(name.to_string()));
953 }
954
955 // Handle tool input streaming
956 // Extract the tool input from the delta
957 let input_str =
958 tool_delta
959 .get("input")
960 .map_or_else(String::new, |input| match input {
961 serde_json::Value::String(s) => s.clone(),
962 other => format_tool_input(other),
963 });
964
965 if input_str.is_empty() {
966 String::new()
967 } else {
968 // Accumulate tool input
969 session.on_tool_input_delta(index, &input_str);
970
971 // Show partial tool input in real-time
972 let formatter = DeltaDisplayFormatter::new();
973 formatter.format_tool_input(&input_str, prefix, *c)
974 }
975 }
976 _ => String::new(),
977 }
978 }
979
980 /// Handle text delta events
981 fn handle_text_delta(
982 &self,
983 session: &mut std::cell::RefMut<'_, StreamingSession>,
984 text: &str,
985 ) -> String {
986 let c = &self.colors;
987 let prefix = &self.display_name;
988
989 // Standalone text delta (not part of content block)
990 // Use default index "0" for standalone text
991 let default_index = 0u64;
992 let default_index_str = "0";
993
994 // Track this delta with StreamingSession for state management.
995 //
996 // StreamingSession handles protocol/streaming quality concerns (including
997 // snapshot-as-delta repairs and consecutive duplicate filtering) and returns
998 // whether a prefix should be displayed for this stream.
999 //
1000 // The parser layer still applies additional deduplication:
1001 // - Skip whitespace-only accumulated output
1002 // - Hash-based deduplication after sanitization (whitespace-insensitive)
1003 let show_prefix = session.on_text_delta(default_index, text);
1004
1005 // Get accumulated text for streaming display
1006 let accumulated_text = session
1007 .get_accumulated(ContentType::Text, default_index_str)
1008 .unwrap_or("");
1009
1010 // Sanitize the accumulated text to check if it's empty
1011 // This is needed to skip rendering when the accumulated content is just whitespace
1012 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
1013
1014 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
1015 // This prevents rendering empty lines when the accumulated content is just whitespace
1016 if sanitized_text.is_empty() {
1017 return String::new();
1018 }
1019
1020 // Check if this sanitized content has already been rendered
1021 // This prevents duplicates when accumulated content differs only by whitespace
1022 if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
1023 return String::new();
1024 }
1025
1026 // Use TextDeltaRenderer for consistent rendering across all parsers
1027 let terminal_mode = *self.terminal_mode.borrow();
1028
1029 // Use prefix trie to detect if new content extends previously rendered content
1030 // If yes, we do an in-place update (carriage return + new content)
1031 let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
1032
1033 let output = if show_prefix && !has_prefix {
1034 // First delta with no prefix match - use the renderer with prefix
1035 TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
1036 } else {
1037 // Either continuation OR prefix match - use renderer for in-place update
1038 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
1039 TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
1040 };
1041
1042 // Mark this sanitized content as rendered for future duplicate detection
1043 // We use the sanitized text (not the rendered output) to avoid false positives
1044 // when the same accumulated text is rendered with different terminal modes
1045 session.mark_rendered(ContentType::Text, default_index_str);
1046 session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
1047
1048 output
1049 }
1050
1051 /// Handle message stop events
1052 fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
1053 let c = &self.colors;
1054
1055 // Message complete - add final newline if we were in a content block
1056 // OR if any content was streamed (handles edge cases where block state
1057 // may not have been set but content was still streamed)
1058 let metrics = session.get_streaming_quality_metrics();
1059 let was_in_block = session.on_message_stop();
1060 let had_content = session.has_any_streamed_content();
1061 if was_in_block || had_content {
1062 // Use TextDeltaRenderer for completion - adds final newline
1063 let terminal_mode = *self.terminal_mode.borrow();
1064 let completion = format!(
1065 "{}{}",
1066 c.reset(),
1067 TextDeltaRenderer::render_completion(terminal_mode)
1068 );
1069 // Show streaming quality metrics in debug mode or when flag is set
1070 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
1071 && metrics.total_deltas > 0;
1072 if show_metrics {
1073 format!("{}\n{}", completion, metrics.format(*c))
1074 } else {
1075 completion
1076 }
1077 } else {
1078 String::new()
1079 }
1080 }
1081
1082 /// Handle error events
1083 fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
1084 let c = &self.colors;
1085 let prefix = &self.display_name;
1086
1087 let msg = err
1088 .message
1089 .unwrap_or_else(|| "Unknown streaming error".to_string());
1090 format!(
1091 "{}[{}]{} {}Error: {}{}\n",
1092 c.dim(),
1093 prefix,
1094 c.reset(),
1095 c.red(),
1096 msg,
1097 c.reset()
1098 )
1099 }
1100
1101 /// Handle unknown events
1102 fn handle_unknown_event(&self) -> String {
1103 let c = &self.colors;
1104 let prefix = &self.display_name;
1105
1106 // Unknown stream event - in debug mode, log it
1107 if self.verbosity.is_debug() {
1108 format!(
1109 "{}[{}]{} {}Unknown streaming event{}\n",
1110 c.dim(),
1111 prefix,
1112 c.reset(),
1113 c.dim(),
1114 c.reset()
1115 )
1116 } else {
1117 String::new()
1118 }
1119 }
1120
1121 /// Check if a Claude event is a control event (state management with no user output)
1122 ///
1123 /// Control events are valid JSON that represent state transitions rather than
1124 /// user-facing content. They should be tracked separately from "ignored" events
1125 /// to avoid false health warnings.
1126 const fn is_control_event(event: &ClaudeEvent) -> bool {
1127 match event {
1128 // Stream events that are control events
1129 ClaudeEvent::StreamEvent { event } => matches!(
1130 event,
1131 StreamInnerEvent::MessageStart { .. }
1132 | StreamInnerEvent::ContentBlockStart { .. }
1133 | StreamInnerEvent::ContentBlockStop { .. }
1134 | StreamInnerEvent::MessageDelta { .. }
1135 | StreamInnerEvent::MessageStop
1136 | StreamInnerEvent::Ping
1137 ),
1138 _ => false,
1139 }
1140 }
1141
1142 /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1143 ///
1144 /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1145 /// tool input deltas) that are shown to the user in real-time. These should be
1146 /// tracked separately to avoid inflating "ignored" percentages.
1147 const fn is_partial_event(event: &ClaudeEvent) -> bool {
1148 match event {
1149 // Stream events that produce incremental content
1150 ClaudeEvent::StreamEvent { event } => matches!(
1151 event,
1152 StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1153 ),
1154 _ => false,
1155 }
1156 }
1157
1158 /// Get a shared delta display formatter
1159 const fn formatter() -> DeltaDisplayFormatter {
1160 DeltaDisplayFormatter::new()
1161 }
1162
1163 /// Parse a stream of Claude NDJSON events
1164 pub fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
1165 use super::incremental_parser::IncrementalNdjsonParser;
1166
1167 let c = &self.colors;
1168 let monitor = HealthMonitor::new("Claude");
1169 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
1170 std::fs::OpenOptions::new()
1171 .create(true)
1172 .append(true)
1173 .open(log_path)
1174 .ok()
1175 .map(std::io::BufWriter::new)
1176 });
1177
1178 // Use incremental parser for true real-time streaming
1179 // This processes JSON as soon as it's complete, not waiting for newlines
1180 let mut incremental_parser = IncrementalNdjsonParser::new();
1181 let mut byte_buffer = Vec::new();
1182
1183 // Track whether we've seen a success result event for GLM/ccs-glm compatibility
1184 // Some agents (GLM via CCS) emit both a success result and an error_during_execution
1185 // result when they exit with code 1 despite producing valid output. We suppress
1186 // the spurious error event to avoid confusing duplicate output.
1187 let mut seen_success_result = false;
1188
1189 loop {
1190 // Read available bytes
1191 byte_buffer.clear();
1192 let chunk = reader.fill_buf()?;
1193 if chunk.is_empty() {
1194 break;
1195 }
1196
1197 // Process all bytes immediately
1198 byte_buffer.extend_from_slice(chunk);
1199 let consumed = chunk.len();
1200 reader.consume(consumed);
1201
1202 // Feed bytes to incremental parser
1203 let json_events = incremental_parser.feed(&byte_buffer);
1204
1205 // Process each complete JSON event immediately
1206 for line in json_events {
1207 let trimmed = line.trim();
1208 if trimmed.is_empty() {
1209 continue;
1210 }
1211
1212 // Check for Result events to handle GLM/ccs-glm duplicate event bug
1213 // Some agents emit both success and error_during_execution results
1214 let should_skip_result = if trimmed.starts_with('{') {
1215 // First, check if the JSON has an 'errors' field with actual error messages.
1216 // This is important because Claude events can have either 'error' (string)
1217 // or 'errors' (array of strings), and we need to check both.
1218 let has_errors_with_content =
1219 if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
1220 // Check for 'errors' array with at least one non-empty string
1221 json.get("errors")
1222 .and_then(|v| v.as_array())
1223 .is_some_and(|arr| {
1224 arr.iter()
1225 .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
1226 })
1227 } else {
1228 false
1229 };
1230
1231 if let Ok(ClaudeEvent::Result {
1232 subtype,
1233 duration_ms,
1234 error,
1235 ..
1236 }) = serde_json::from_str::<ClaudeEvent>(trimmed)
1237 {
1238 let is_error_result = subtype.as_deref() != Some("success");
1239
1240 // Suppress spurious GLM error events based on these characteristics:
1241 // 1. Error event (subtype != "success")
1242 // 2. duration_ms is 0 or very small (< 100ms, indicating synthetic event)
1243 // 3. error field is null or empty (no actual error message)
1244 // 4. NO 'errors' field with actual error messages (this indicates a real error)
1245 //
1246 // These criteria identify the spurious error_during_execution events
1247 // that GLM emits when exiting with code 1 despite producing valid output.
1248 //
1249 // We DON'T suppress if there's an 'errors' array with content, because
1250 // that indicates a real error condition that the user should see.
1251 let is_spurious_glm_error = is_error_result
1252 && duration_ms.unwrap_or(0) < 100
1253 && (error.is_none() || error.as_ref().is_some_and(|e| e.is_empty()))
1254 && !has_errors_with_content;
1255
1256 if is_spurious_glm_error && seen_success_result {
1257 // Error after success - suppress (original fix)
1258 true
1259 } else if subtype.as_deref() == Some("success") {
1260 seen_success_result = true;
1261 false
1262 } else if is_spurious_glm_error {
1263 // Spurious error BEFORE success - still suppress based on characteristics
1264 // This handles the reverse-order case where error arrives first
1265 true
1266 } else {
1267 false
1268 }
1269 } else {
1270 false
1271 }
1272 } else {
1273 false
1274 };
1275
1276 // In debug mode, also show the raw JSON
1277 if self.verbosity.is_debug() {
1278 eprintln!(
1279 "{}[DEBUG]{} {}{}{}",
1280 c.dim(),
1281 c.reset(),
1282 c.dim(),
1283 &line,
1284 c.reset()
1285 );
1286 }
1287
1288 // Skip suppressed result events but still log them
1289 if should_skip_result {
1290 if let Some(ref mut file) = log_writer {
1291 writeln!(file, "{line}")?;
1292 file.get_mut().sync_all()?;
1293 }
1294 monitor.record_control_event();
1295 continue;
1296 }
1297
1298 // Parse the event once - parse_event handles malformed JSON by returning None
1299 match self.parse_event(&line) {
1300 Some(output) => {
1301 // Check if this is a partial/delta event (streaming content)
1302 if trimmed.starts_with('{') {
1303 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1304 if Self::is_partial_event(&event) {
1305 monitor.record_partial_event();
1306 } else {
1307 monitor.record_parsed();
1308 }
1309 } else {
1310 monitor.record_parsed();
1311 }
1312 } else {
1313 monitor.record_parsed();
1314 }
1315 // Write output to printer
1316 let mut printer = self.printer.borrow_mut();
1317 write!(printer, "{output}")?;
1318 printer.flush()?;
1319 }
1320 None => {
1321 // Check if this was a control event (state management with no user output)
1322 // Control events are valid JSON that return empty output but aren't "ignored"
1323 if trimmed.starts_with('{') {
1324 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1325 if Self::is_control_event(&event) {
1326 monitor.record_control_event();
1327 } else {
1328 // Valid JSON but not a control event - track as unknown
1329 monitor.record_unknown_event();
1330 }
1331 } else {
1332 // Failed to deserialize - track as parse error
1333 monitor.record_parse_error();
1334 }
1335 } else {
1336 monitor.record_ignored();
1337 }
1338 }
1339 }
1340
1341 // Log raw JSON to file if configured
1342 if let Some(ref mut file) = log_writer {
1343 writeln!(file, "{line}")?;
1344 }
1345 }
1346 }
1347
1348 if let Some(ref mut file) = log_writer {
1349 file.flush()?;
1350 // Ensure data is written to disk before continuing
1351 // This prevents race conditions where extraction runs before OS commits writes
1352 let _ = file.get_mut().sync_all();
1353 }
1354 if let Some(warning) = monitor.check_and_warn(*c) {
1355 let mut printer = self.printer.borrow_mut();
1356 writeln!(printer, "{warning}")?;
1357 printer.flush()?;
1358 }
1359 Ok(())
1360 }
1361}
1362
1363#[cfg(all(test, feature = "test-utils"))]
1364mod tests {
1365 use super::*;
1366 use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1367
1368 #[test]
1369 fn test_printer_method_accessible() {
1370 // Test that the printer() method is accessible and returns a SharedPrinter
1371 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1372 let parser =
1373 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1374
1375 // This test verifies the printer() method is accessible
1376 let _printer_ref = parser.printer();
1377 }
1378
1379 #[test]
1380 fn test_streaming_metrics_method_accessible() {
1381 // Test that the streaming_metrics() method is accessible
1382 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1383 let parser =
1384 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1385
1386 // This test verifies the streaming_metrics() method is accessible
1387 let _metrics = parser.streaming_metrics();
1388 }
1389}