ralph_workflow/json_parser/claude.rs
1//! Claude CLI JSON parser.
2//!
3//! Parses NDJSON output from Claude CLI and formats it for display.
4//!
5//! # Streaming Output Behavior
6//!
7//! This parser implements real-time streaming output for text deltas. When content
8//! arrives in multiple chunks (via `content_block_delta` events), the parser:
9//!
10//! 1. **Accumulates** text deltas from each chunk into a buffer
11//! 2. **Displays** the accumulated text after each chunk
12//! 3. **Uses carriage return (`\r`) and line clearing (`\x1b[2K`)** to rewrite the entire line,
13//! creating an updating effect that shows the content building up in real-time
14//! 4. **Shows prefix on every delta**, rewriting the entire line each time (industry standard)
15//!
16//! Example output sequence for streaming "Hello World" in two chunks:
17//! ```text
18//! [Claude] Hello\r (first chunk with prefix, no newline)
19//! \x1b[2K\r[Claude] Hello World\r (second chunk clears line, rewrites with accumulated)
20//! [Claude] Hello World\n (message_stop adds final newline)
21//! ```
22//!
23//! # Single-Line Pattern
24//!
25//! The renderer uses a single-line pattern with carriage return for in-place updates.
26//! This is the industry standard for streaming CLIs (used by Rich, Ink, Bubble Tea).
27//!
28//! Each delta rewrites the entire line with prefix, ensuring that:
29//! - The user always sees the prefix
30//! - Content updates in-place without visual artifacts
31//! - Terminal state is clean and predictable
32//!
33//! This pattern is consistent across all parsers (Claude, Codex, Gemini, `OpenCode`)
34//! with variations in when the prefix is shown based on each format's event structure.
35
36use crate::common::truncate_text;
37use crate::config::Verbosity;
38use crate::logger::{Colors, CHECK, CROSS};
39use std::cell::RefCell;
40use std::fmt::Write as _;
41use std::io::{self, BufRead, Write};
42use std::rc::Rc;
43
44use super::delta_display::{DeltaDisplayFormatter, DeltaRenderer, TextDeltaRenderer};
45use super::health::HealthMonitor;
46use super::health::StreamingQualityMetrics;
47use super::printer::SharedPrinter;
48use super::streaming_state::StreamingSession;
49use super::terminal::TerminalMode;
50use super::types::{
51 format_tool_input, format_unknown_json_event, ClaudeEvent, ContentBlock, ContentBlockDelta,
52 ContentType, StreamInnerEvent,
53};
54
55/// Claude event parser
56///
57/// Note: This parser is designed for single-threaded use only.
58/// The internal state uses `Rc<RefCell<>>` for convenience, not for thread safety.
59/// Do not share this parser across threads.
60pub struct ClaudeParser {
61 colors: Colors,
62 pub(crate) verbosity: Verbosity,
63 log_file: Option<String>,
64 display_name: String,
65 /// Unified streaming session tracker
66 /// Provides single source of truth for streaming state across all content types
67 streaming_session: Rc<RefCell<StreamingSession>>,
68 /// Terminal mode for output formatting
69 /// Detected at parse time and cached for performance
70 terminal_mode: RefCell<TerminalMode>,
71 /// Whether to show streaming quality metrics
72 show_streaming_metrics: bool,
73 /// Output printer for capturing or displaying output
74 printer: SharedPrinter,
75}
76
77impl ClaudeParser {
78 /// Create a new `ClaudeParser` with the given colors and verbosity.
79 ///
80 /// # Arguments
81 ///
82 /// * `colors` - Colors for terminal output
83 /// * `verbosity` - Verbosity level for output
84 ///
85 /// # Returns
86 ///
87 /// A new `ClaudeParser` instance
88 ///
89 /// # Example
90 ///
91 /// ```ignore
92 /// use ralph_workflow::json_parser::ClaudeParser;
93 /// use ralph_workflow::logger::Colors;
94 /// use ralph_workflow::config::Verbosity;
95 ///
96 /// let parser = ClaudeParser::new(Colors::new(), Verbosity::Normal);
97 /// ```
98 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
99 Self::with_printer(colors, verbosity, super::printer::shared_stdout())
100 }
101
102 /// Create a new `ClaudeParser` with a custom printer.
103 ///
104 /// # Arguments
105 ///
106 /// * `colors` - Colors for terminal output
107 /// * `verbosity` - Verbosity level for output
108 /// * `printer` - Shared printer for output
109 ///
110 /// # Returns
111 ///
112 /// A new `ClaudeParser` instance
113 pub fn with_printer(colors: Colors, verbosity: Verbosity, printer: SharedPrinter) -> Self {
114 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
115 let streaming_session = StreamingSession::new().with_verbose_warnings(verbose_warnings);
116
117 // Use the printer's is_terminal method to validate it's connected correctly
118 // This is a sanity check that also satisfies the compiler that the method is used
119 let _printer_is_terminal = printer.borrow().is_terminal();
120
121 Self {
122 colors,
123 verbosity,
124 log_file: None,
125 display_name: "Claude".to_string(),
126 streaming_session: Rc::new(RefCell::new(streaming_session)),
127 terminal_mode: RefCell::new(TerminalMode::detect()),
128 show_streaming_metrics: false,
129 printer,
130 }
131 }
132
133 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
134 self.show_streaming_metrics = show;
135 self
136 }
137
138 /// Set the display name for this parser.
139 ///
140 /// # Arguments
141 ///
142 /// * `display_name` - The name to display in output
143 ///
144 /// # Returns
145 ///
146 /// Self for builder pattern chaining
147 pub fn with_display_name(mut self, display_name: &str) -> Self {
148 self.display_name = display_name.to_string();
149 self
150 }
151
152 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
153 self.log_file = Some(path.to_string());
154 self
155 }
156
157 /// Set the terminal mode for this parser.
158 ///
159 /// # Arguments
160 ///
161 /// * `mode` - The terminal mode to use
162 ///
163 /// # Returns
164 ///
165 /// Self for builder pattern chaining
166 #[cfg(test)]
167 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
168 *self.terminal_mode.borrow_mut() = mode;
169 self
170 }
171
172 /// Get a shared reference to the printer.
173 ///
174 /// This allows tests, monitoring, and other code to access the printer after parsing
175 /// to verify output content, check for duplicates, or capture output for analysis.
176 ///
177 /// # Returns
178 ///
179 /// A clone of the shared printer reference (`Rc<RefCell<dyn Printable>>`)
180 ///
181 /// # Example
182 ///
183 /// ```ignore
184 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
185 /// use std::rc::Rc;
186 /// use std::cell::RefCell;
187 ///
188 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
189 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
190 ///
191 /// // Parse events...
192 ///
193 /// // Now access the printer to verify output
194 /// let printer_ref = parser.printer().borrow();
195 /// assert!(!printer_ref.has_duplicate_consecutive_lines());
196 /// ```
197 #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
198 pub fn printer(&self) -> SharedPrinter {
199 Rc::clone(&self.printer)
200 }
201
202 /// Get streaming quality metrics from the current session.
203 ///
204 /// This provides insight into the deduplication and streaming quality of the
205 /// parsing session, including:
206 /// - Number of snapshot repairs (when the agent sent accumulated content as a delta)
207 /// - Number of large deltas (potential protocol violations)
208 /// - Total deltas processed
209 ///
210 /// Useful for testing, monitoring, and debugging streaming behavior.
211 ///
212 /// # Returns
213 ///
214 /// A copy of the streaming quality metrics from the internal `StreamingSession`.
215 ///
216 /// # Example
217 ///
218 /// ```ignore
219 /// use ralph_workflow::json_parser::{ClaudeParser, printer::TestPrinter};
220 /// use std::rc::Rc;
221 /// use std::cell::RefCell;
222 ///
223 /// let printer = Rc::new(RefCell::new(TestPrinter::new()));
224 /// let parser = ClaudeParser::with_printer(colors, verbosity, Rc::clone(&printer));
225 ///
226 /// // Parse events...
227 ///
228 /// // Verify deduplication logic triggered
229 /// let metrics = parser.streaming_metrics();
230 /// assert!(metrics.snapshot_repairs_count > 0, "Snapshot repairs should occur");
231 /// ```
232 #[cfg_attr(any(debug_assertions, test, feature = "monitoring"), allow(dead_code))]
233 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
234 self.streaming_session
235 .borrow()
236 .get_streaming_quality_metrics()
237 }
238
239 /// Parse and display a single Claude JSON event
240 ///
241 /// Returns `Some(formatted_output)` for valid events, or None for:
242 /// - Malformed JSON (logged at debug level)
243 /// - Unknown event types
244 /// - Empty or whitespace-only output
245 pub fn parse_event(&self, line: &str) -> Option<String> {
246 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
247 e
248 } else {
249 // Non-JSON line - could be raw text output from agent
250 // Pass through as-is if it looks like real output (not empty)
251 let trimmed = line.trim();
252 if !trimmed.is_empty() && !trimmed.starts_with('{') {
253 return Some(format!("{trimmed}\n"));
254 }
255 return None;
256 };
257 let c = &self.colors;
258 let prefix = &self.display_name;
259
260 let output = match event {
261 ClaudeEvent::System {
262 subtype,
263 session_id,
264 cwd,
265 } => self.format_system_event(subtype.as_ref(), session_id, cwd),
266 ClaudeEvent::Assistant { message } => self.format_assistant_event(message),
267 ClaudeEvent::User { message } => self.format_user_event(message),
268 ClaudeEvent::Result {
269 subtype,
270 duration_ms,
271 total_cost_usd,
272 num_turns,
273 result,
274 error,
275 } => self.format_result_event(
276 subtype,
277 duration_ms,
278 total_cost_usd,
279 num_turns,
280 result,
281 error,
282 ),
283 ClaudeEvent::StreamEvent { event } => {
284 // Handle streaming events for delta/partial updates
285 self.parse_stream_event(event)
286 }
287 ClaudeEvent::Unknown => {
288 // Use the generic unknown event formatter for consistent handling
289 // In verbose mode, this will show the event type and key fields
290 // In normal mode, this returns empty string
291 format_unknown_json_event(line, prefix, *c, self.verbosity.is_verbose())
292 }
293 };
294
295 if output.is_empty() {
296 None
297 } else {
298 Some(output)
299 }
300 }
301
302 /// Parse a streaming event for delta/partial updates
303 ///
304 /// Handles the nested events within `stream_event`:
305 /// - MessageStart/Stop: Manage session state
306 /// - `ContentBlockStart`: Initialize new content blocks
307 /// - ContentBlockDelta/TextDelta: Accumulate and display incrementally
308 /// - `ContentBlockStop`: Finalize content blocks
309 /// - `MessageDelta`: Process message metadata without output
310 /// - Error: Display appropriately
311 ///
312 /// Returns String for display content, empty String for control events.
313 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
314 let mut session = self.streaming_session.borrow_mut();
315
316 match event {
317 StreamInnerEvent::MessageStart {
318 message: _,
319 message_id,
320 } => {
321 // Set message ID for tracking and clear session state on new message
322 session.set_current_message_id(message_id);
323 session.on_message_start();
324 String::new()
325 }
326 StreamInnerEvent::ContentBlockStart {
327 index: Some(index),
328 content_block: Some(block),
329 } => {
330 // Initialize a new content block at this index
331 session.on_content_block_start(index);
332 match &block {
333 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
334 // Initial text in ContentBlockStart - treat as first delta
335 session.on_text_delta(index, t);
336 }
337 ContentBlock::ToolUse {
338 name: _,
339 input: Some(i),
340 } => {
341 // Initialize tool input accumulator
342 let input_str = if let serde_json::Value::String(s) = &i {
343 s.clone()
344 } else {
345 format_tool_input(i)
346 };
347 session.on_tool_input_delta(index, &input_str);
348 }
349 _ => {}
350 }
351 String::new()
352 }
353 StreamInnerEvent::ContentBlockStart {
354 index: Some(index),
355 content_block: None,
356 } => {
357 // Content block started but no initial content provided
358 session.on_content_block_start(index);
359 String::new()
360 }
361 StreamInnerEvent::ContentBlockStart { .. } => {
362 // Content block without index - ignore
363 String::new()
364 }
365 StreamInnerEvent::ContentBlockDelta {
366 index: Some(index),
367 delta: Some(delta),
368 } => self.handle_content_block_delta(&mut session, index, delta),
369 StreamInnerEvent::TextDelta { text: Some(text) } => {
370 self.handle_text_delta(&mut session, &text)
371 }
372 StreamInnerEvent::ContentBlockStop { .. } => {
373 // Content block completion event - no output needed
374 // This event marks the end of a content block but doesn't produce
375 // any displayable content. It's a control event for state management.
376 String::new()
377 }
378 StreamInnerEvent::MessageDelta { .. } => {
379 // Message delta event with usage/metadata - no output needed
380 // This event contains final message metadata (stop_reason, usage stats)
381 // but is used for tracking/monitoring purposes only, not display.
382 String::new()
383 }
384 StreamInnerEvent::ContentBlockDelta { .. }
385 | StreamInnerEvent::Ping
386 | StreamInnerEvent::TextDelta { text: None }
387 | StreamInnerEvent::Error { error: None } => String::new(),
388 StreamInnerEvent::MessageStop => self.handle_message_stop(&mut session),
389 StreamInnerEvent::Error {
390 error: Some(err), ..
391 } => self.handle_error_event(err),
392 StreamInnerEvent::Unknown => self.handle_unknown_event(),
393 }
394 }
395
396 /// Format a system event
397 fn format_system_event(
398 &self,
399 subtype: Option<&String>,
400 session_id: Option<String>,
401 cwd: Option<String>,
402 ) -> String {
403 let c = &self.colors;
404 let prefix = &self.display_name;
405
406 if subtype.map(std::string::String::as_str) == Some("init") {
407 let sid = session_id.unwrap_or_else(|| "unknown".to_string());
408 let mut out = format!(
409 "{}[{}]{} {}Session started{} {}({:.8}...){}\n",
410 c.dim(),
411 prefix,
412 c.reset(),
413 c.cyan(),
414 c.reset(),
415 c.dim(),
416 sid,
417 c.reset()
418 );
419 if let Some(cwd) = cwd {
420 let _ = writeln!(
421 out,
422 "{}[{}]{} {}Working dir: {}{}",
423 c.dim(),
424 prefix,
425 c.reset(),
426 c.dim(),
427 cwd,
428 c.reset()
429 );
430 }
431 out
432 } else {
433 format!(
434 "{}[{}]{} {}{}{}\n",
435 c.dim(),
436 prefix,
437 c.reset(),
438 c.cyan(),
439 subtype.map_or("system", |s| s.as_str()),
440 c.reset()
441 )
442 }
443 }
444
445 /// Extract text content from a message for hash-based deduplication.
446 fn extract_text_content_for_hash(
447 message: Option<&crate::json_parser::types::AssistantMessage>,
448 ) -> Option<String> {
449 message?.content.as_ref().map(|content| {
450 content
451 .iter()
452 .filter_map(|block| {
453 if let ContentBlock::Text { text } = block {
454 text.as_deref()
455 } else {
456 None
457 }
458 })
459 .collect::<Vec<_>>()
460 .join("")
461 })
462 }
463
464 /// Check if this assistant message is a duplicate of already-streamed content.
465 fn is_duplicate_assistant_message(
466 &self,
467 message: Option<&crate::json_parser::types::AssistantMessage>,
468 ) -> bool {
469 let session = self.streaming_session.borrow();
470
471 // Extract message_id from the assistant message
472 let assistant_msg_id = message.and_then(|m| m.id.as_ref());
473
474 // Check if this assistant event has a message_id that matches the current streaming message
475 // If it does, and we have streamed content, then this assistant event is a duplicate
476 // because the content was already streamed via deltas.
477 if let Some(ast_msg_id) = assistant_msg_id {
478 // Check if message was already marked as displayed (after message_stop)
479 if session.is_duplicate_final_message(ast_msg_id) {
480 return true;
481 }
482
483 // Check if the assistant message_id matches the current streaming message_id
484 if session.get_current_message_id() == Some(ast_msg_id) {
485 // Same message - check if we have streamed any content
486 // If yes, the assistant event is a duplicate
487 if session.has_any_streamed_content() {
488 return true;
489 }
490 }
491 }
492
493 // If no message_id match, fall back to hash-based deduplication
494 let text_content_for_hash = Self::extract_text_content_for_hash(message);
495 if let Some(ref text_content) = text_content_for_hash {
496 if !text_content.is_empty() {
497 return session.is_duplicate_by_hash(text_content);
498 }
499 }
500
501 // Fallback to coarse check
502 session.has_any_streamed_content()
503 }
504
505 /// Format a text content block for assistant output.
506 fn format_text_block(&self, out: &mut String, text: &str, prefix: &str, colors: Colors) {
507 let limit = self.verbosity.truncate_limit("text");
508 let preview = truncate_text(text, limit);
509 let _ = writeln!(
510 out,
511 "{}[{}]{} {}{}{}",
512 colors.dim(),
513 prefix,
514 colors.reset(),
515 colors.white(),
516 preview,
517 colors.reset()
518 );
519 }
520
521 /// Format a tool use content block for assistant output.
522 fn format_tool_use_block(
523 &self,
524 out: &mut String,
525 tool: Option<&String>,
526 input: Option<&serde_json::Value>,
527 prefix: &str,
528 colors: Colors,
529 ) {
530 let tool_name = tool.cloned().unwrap_or_else(|| "unknown".to_string());
531 let _ = writeln!(
532 out,
533 "{}[{}]{} {}Tool{}: {}{}{}",
534 colors.dim(),
535 prefix,
536 colors.reset(),
537 colors.magenta(),
538 colors.reset(),
539 colors.bold(),
540 tool_name,
541 colors.reset(),
542 );
543
544 // Show tool input details at Normal and above (not just Verbose)
545 // Tool inputs provide crucial context for understanding agent actions
546 if self.verbosity.show_tool_input() {
547 if let Some(input_val) = input {
548 let input_str = format_tool_input(input_val);
549 let limit = self.verbosity.truncate_limit("tool_input");
550 let preview = truncate_text(&input_str, limit);
551 if !preview.is_empty() {
552 let _ = writeln!(
553 out,
554 "{}[{}]{} {} └─ {}{}",
555 colors.dim(),
556 prefix,
557 colors.reset(),
558 colors.dim(),
559 preview,
560 colors.reset()
561 );
562 }
563 }
564 }
565 }
566
567 /// Format a tool result content block for assistant output.
568 fn format_tool_result_block(
569 &self,
570 out: &mut String,
571 content: &serde_json::Value,
572 prefix: &str,
573 colors: Colors,
574 ) {
575 let content_str = match content {
576 serde_json::Value::String(s) => s.clone(),
577 other => other.to_string(),
578 };
579 let limit = self.verbosity.truncate_limit("tool_result");
580 let preview = truncate_text(&content_str, limit);
581 let _ = writeln!(
582 out,
583 "{}[{}]{} {}Result:{} {}",
584 colors.dim(),
585 prefix,
586 colors.reset(),
587 colors.dim(),
588 colors.reset(),
589 preview
590 );
591 }
592
593 /// Format all content blocks from an assistant message.
594 fn format_content_blocks(
595 &self,
596 out: &mut String,
597 content: &[ContentBlock],
598 prefix: &str,
599 colors: Colors,
600 ) {
601 for block in content {
602 match block {
603 ContentBlock::Text { text } => {
604 if let Some(text) = text {
605 self.format_text_block(out, text, prefix, colors);
606 }
607 }
608 ContentBlock::ToolUse { name, input } => {
609 self.format_tool_use_block(out, name.as_ref(), input.as_ref(), prefix, colors);
610 }
611 ContentBlock::ToolResult { content } => {
612 if let Some(content) = content {
613 self.format_tool_result_block(out, content, prefix, colors);
614 }
615 }
616 ContentBlock::Unknown => {}
617 }
618 }
619 }
620
621 /// Format an assistant event
622 fn format_assistant_event(
623 &self,
624 message: Option<crate::json_parser::types::AssistantMessage>,
625 ) -> String {
626 // CRITICAL FIX: When ANY content has been streamed via deltas,
627 // the Assistant event should NOT display it again.
628 // The Assistant event represents the "complete" message, but if we've
629 // already shown the streaming deltas, showing it again causes duplication.
630 if self.is_duplicate_assistant_message(message.as_ref()) {
631 return String::new();
632 }
633
634 let mut out = String::new();
635 if let Some(msg) = message {
636 if let Some(content) = msg.content {
637 self.format_content_blocks(&mut out, &content, &self.display_name, self.colors);
638 }
639 }
640 out
641 }
642
643 /// Format a user event
644 fn format_user_event(&self, message: Option<crate::json_parser::types::UserMessage>) -> String {
645 let c = &self.colors;
646 let prefix = &self.display_name;
647
648 if let Some(msg) = message {
649 if let Some(content) = msg.content {
650 if let Some(ContentBlock::Text { text: Some(text) }) = content.first() {
651 let limit = self.verbosity.truncate_limit("user");
652 let preview = truncate_text(text, limit);
653 return format!(
654 "{}[{}]{} {}User{}: {}{}{}\n",
655 c.dim(),
656 prefix,
657 c.reset(),
658 c.blue(),
659 c.reset(),
660 c.dim(),
661 preview,
662 c.reset()
663 );
664 }
665 }
666 }
667 String::new()
668 }
669
670 /// Format a result event
671 fn format_result_event(
672 &self,
673 subtype: Option<String>,
674 duration_ms: Option<u64>,
675 total_cost_usd: Option<f64>,
676 num_turns: Option<u32>,
677 result: Option<String>,
678 error: Option<String>,
679 ) -> String {
680 let c = &self.colors;
681 let prefix = &self.display_name;
682
683 let duration_total_secs = duration_ms.unwrap_or(0) / 1000;
684 let duration_m = duration_total_secs / 60;
685 let duration_s_rem = duration_total_secs % 60;
686 let cost = total_cost_usd.unwrap_or(0.0);
687 let turns = num_turns.unwrap_or(0);
688
689 let mut out = if subtype.as_deref() == Some("success") {
690 format!(
691 "{}[{}]{} {}{} Completed{} {}({}m {}s, {} turns, ${:.4}){}\n",
692 c.dim(),
693 prefix,
694 c.reset(),
695 c.green(),
696 CHECK,
697 c.reset(),
698 c.dim(),
699 duration_m,
700 duration_s_rem,
701 turns,
702 cost,
703 c.reset()
704 )
705 } else {
706 let err = error.unwrap_or_else(|| "unknown error".to_string());
707 format!(
708 "{}[{}]{} {}{} {}{}: {} {}({}m {}s){}\n",
709 c.dim(),
710 prefix,
711 c.reset(),
712 c.red(),
713 CROSS,
714 subtype.unwrap_or_else(|| "error".to_string()),
715 c.reset(),
716 err,
717 c.dim(),
718 duration_m,
719 duration_s_rem,
720 c.reset()
721 )
722 };
723
724 if let Some(result) = result {
725 let limit = self.verbosity.truncate_limit("result");
726 let preview = truncate_text(&result, limit);
727 let _ = writeln!(
728 out,
729 "\n{}Result summary:{}\n{}{}{}",
730 c.bold(),
731 c.reset(),
732 c.dim(),
733 preview,
734 c.reset()
735 );
736 }
737 out
738 }
739
740 /// Handle content block delta events
741 fn handle_content_block_delta(
742 &self,
743 session: &mut std::cell::RefMut<'_, StreamingSession>,
744 index: u64,
745 delta: ContentBlockDelta,
746 ) -> String {
747 let c = &self.colors;
748 let prefix = &self.display_name;
749
750 match delta {
751 ContentBlockDelta::TextDelta { text: Some(text) } => {
752 // Check for snapshot-as-delta bug (GLM sending full accumulated content)
753 // If detected, extract only the delta portion
754 let index_str = index.to_string();
755 let text_to_process = if session.is_likely_snapshot(&text, &index_str) {
756 // Snapshot detected - extract delta (auto-corrects GLM/CCS quirk)
757 session
758 .get_delta_from_snapshot(&text, &index_str)
759 .unwrap_or(&text)
760 } else {
761 // Genuine delta - use as-is
762 &text
763 };
764
765 // Check for empty delta after snapshot extraction
766 if text_to_process.is_empty() {
767 return String::new();
768 }
769
770 // Use StreamingSession to track state and determine prefix display
771 let show_prefix = session.on_text_delta(index, text_to_process);
772
773 // Get accumulated text for streaming display
774 let accumulated_text = session
775 .get_accumulated(ContentType::Text, &index_str)
776 .unwrap_or("");
777
778 // Sanitize the accumulated text to check if it's empty
779 // This is needed to skip rendering when the accumulated content is just whitespace
780 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
781
782 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
783 // This prevents rendering empty lines when the accumulated content is just whitespace
784 if sanitized_text.is_empty() {
785 return String::new();
786 }
787
788 // Check if this sanitized content has already been rendered
789 // This prevents duplicates when accumulated content differs only by whitespace
790 if session.is_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text)
791 {
792 return String::new();
793 }
794
795 // Use TextDeltaRenderer for consistent rendering
796 let terminal_mode = *self.terminal_mode.borrow();
797
798 // Use prefix trie to detect if new content extends previously rendered content
799 // If yes, we do an in-place update (carriage return + new content)
800 let has_prefix = session.has_rendered_prefix(ContentType::Text, &index_str);
801
802 let output = if show_prefix && !has_prefix {
803 // First delta with no prefix match - use the renderer with prefix
804 TextDeltaRenderer::render_first_delta(
805 accumulated_text,
806 prefix,
807 *c,
808 terminal_mode,
809 )
810 } else {
811 // Either continuation OR prefix match - use renderer for in-place update
812 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
813 TextDeltaRenderer::render_subsequent_delta(
814 accumulated_text,
815 prefix,
816 *c,
817 terminal_mode,
818 )
819 };
820
821 // Mark this sanitized content as rendered for future duplicate detection
822 // We use the sanitized text (not the rendered output) to avoid false positives
823 // when the same accumulated text is rendered with different terminal modes
824 session.mark_rendered(ContentType::Text, &index_str);
825 session.mark_content_hash_rendered(ContentType::Text, &index_str, &sanitized_text);
826
827 output
828 }
829 ContentBlockDelta::ThinkingDelta {
830 thinking: Some(text),
831 } => {
832 // Track thinking deltas
833 session.on_thinking_delta(index, &text);
834 // Display thinking with visual distinction
835 Self::formatter().format_thinking(text.as_str(), prefix, *c)
836 }
837 ContentBlockDelta::ToolUseDelta {
838 tool_use: Some(tool_delta),
839 } => {
840 // Handle tool input streaming
841 // Extract the tool input from the delta
842 let input_str =
843 tool_delta
844 .get("input")
845 .map_or_else(String::new, |input| match input {
846 serde_json::Value::String(s) => s.clone(),
847 other => format_tool_input(other),
848 });
849
850 if input_str.is_empty() {
851 String::new()
852 } else {
853 // Accumulate tool input
854 session.on_tool_input_delta(index, &input_str);
855
856 // Show partial tool input in real-time
857 let formatter = DeltaDisplayFormatter::new();
858 formatter.format_tool_input(&input_str, prefix, *c)
859 }
860 }
861 _ => String::new(),
862 }
863 }
864
865 /// Handle text delta events
866 fn handle_text_delta(
867 &self,
868 session: &mut std::cell::RefMut<'_, StreamingSession>,
869 text: &str,
870 ) -> String {
871 let c = &self.colors;
872 let prefix = &self.display_name;
873
874 // Standalone text delta (not part of content block)
875 // Use default index "0" for standalone text
876 let default_index = 0u64;
877 let default_index_str = "0";
878
879 // Check for snapshot-as-delta bug
880 let text_to_process = if session.is_likely_snapshot(text, default_index_str) {
881 // Snapshot detected - extract delta (auto-corrects GLM/CCS quirk)
882 session
883 .get_delta_from_snapshot(text, default_index_str)
884 .unwrap_or(text)
885 } else {
886 text
887 };
888
889 // Check for empty delta after snapshot extraction
890 if text_to_process.is_empty() {
891 return String::new();
892 }
893
894 let show_prefix = session.on_text_delta(default_index, text_to_process);
895
896 // Get accumulated text for streaming display
897 let accumulated_text = session
898 .get_accumulated(ContentType::Text, default_index_str)
899 .unwrap_or("");
900
901 // Sanitize the accumulated text to check if it's empty
902 // This is needed to skip rendering when the accumulated content is just whitespace
903 let sanitized_text = super::delta_display::sanitize_for_display(accumulated_text);
904
905 // Skip rendering if the sanitized text is empty (e.g., only whitespace)
906 // This prevents rendering empty lines when the accumulated content is just whitespace
907 if sanitized_text.is_empty() {
908 return String::new();
909 }
910
911 // Check if this sanitized content has already been rendered
912 // This prevents duplicates when accumulated content differs only by whitespace
913 if session.is_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text) {
914 return String::new();
915 }
916
917 // Use TextDeltaRenderer for consistent rendering across all parsers
918 let terminal_mode = *self.terminal_mode.borrow();
919
920 // Use prefix trie to detect if new content extends previously rendered content
921 // If yes, we do an in-place update (carriage return + new content)
922 let has_prefix = session.has_rendered_prefix(ContentType::Text, default_index_str);
923
924 let output = if show_prefix && !has_prefix {
925 // First delta with no prefix match - use the renderer with prefix
926 TextDeltaRenderer::render_first_delta(accumulated_text, prefix, *c, terminal_mode)
927 } else {
928 // Either continuation OR prefix match - use renderer for in-place update
929 // This handles the case where "Hello" becomes "Hello World" - we REPLACE
930 TextDeltaRenderer::render_subsequent_delta(accumulated_text, prefix, *c, terminal_mode)
931 };
932
933 // Mark this sanitized content as rendered for future duplicate detection
934 // We use the sanitized text (not the rendered output) to avoid false positives
935 // when the same accumulated text is rendered with different terminal modes
936 session.mark_rendered(ContentType::Text, default_index_str);
937 session.mark_content_hash_rendered(ContentType::Text, default_index_str, &sanitized_text);
938
939 output
940 }
941
942 /// Handle message stop events
943 fn handle_message_stop(&self, session: &mut std::cell::RefMut<'_, StreamingSession>) -> String {
944 let c = &self.colors;
945
946 // Message complete - add final newline if we were in a content block
947 // OR if any content was streamed (handles edge cases where block state
948 // may not have been set but content was still streamed)
949 let metrics = session.get_streaming_quality_metrics();
950 let was_in_block = session.on_message_stop();
951 let had_content = session.has_any_streamed_content();
952 if was_in_block || had_content {
953 // Use TextDeltaRenderer for completion - adds final newline
954 let terminal_mode = *self.terminal_mode.borrow();
955 let completion = format!(
956 "{}{}",
957 c.reset(),
958 TextDeltaRenderer::render_completion(terminal_mode)
959 );
960 // Show streaming quality metrics in debug mode or when flag is set
961 let show_metrics = (self.verbosity.is_debug() || self.show_streaming_metrics)
962 && metrics.total_deltas > 0;
963 if show_metrics {
964 format!("{}\n{}", completion, metrics.format(*c))
965 } else {
966 completion
967 }
968 } else {
969 String::new()
970 }
971 }
972
973 /// Handle error events
974 fn handle_error_event(&self, err: crate::json_parser::types::StreamError) -> String {
975 let c = &self.colors;
976 let prefix = &self.display_name;
977
978 let msg = err
979 .message
980 .unwrap_or_else(|| "Unknown streaming error".to_string());
981 format!(
982 "{}[{}]{} {}Error: {}{}\n",
983 c.dim(),
984 prefix,
985 c.reset(),
986 c.red(),
987 msg,
988 c.reset()
989 )
990 }
991
992 /// Handle unknown events
993 fn handle_unknown_event(&self) -> String {
994 let c = &self.colors;
995 let prefix = &self.display_name;
996
997 // Unknown stream event - in debug mode, log it
998 if self.verbosity.is_debug() {
999 format!(
1000 "{}[{}]{} {}Unknown streaming event{}\n",
1001 c.dim(),
1002 prefix,
1003 c.reset(),
1004 c.dim(),
1005 c.reset()
1006 )
1007 } else {
1008 String::new()
1009 }
1010 }
1011
1012 /// Check if a Claude event is a control event (state management with no user output)
1013 ///
1014 /// Control events are valid JSON that represent state transitions rather than
1015 /// user-facing content. They should be tracked separately from "ignored" events
1016 /// to avoid false health warnings.
1017 const fn is_control_event(event: &ClaudeEvent) -> bool {
1018 match event {
1019 // Stream events that are control events
1020 ClaudeEvent::StreamEvent { event } => matches!(
1021 event,
1022 StreamInnerEvent::MessageStart { .. }
1023 | StreamInnerEvent::ContentBlockStart { .. }
1024 | StreamInnerEvent::ContentBlockStop { .. }
1025 | StreamInnerEvent::MessageDelta { .. }
1026 | StreamInnerEvent::MessageStop
1027 | StreamInnerEvent::Ping
1028 ),
1029 _ => false,
1030 }
1031 }
1032
1033 /// Check if a Claude event is a partial/delta event (streaming content displayed incrementally)
1034 ///
1035 /// Partial events represent streaming content deltas (text deltas, thinking deltas,
1036 /// tool input deltas) that are shown to the user in real-time. These should be
1037 /// tracked separately to avoid inflating "ignored" percentages.
1038 const fn is_partial_event(event: &ClaudeEvent) -> bool {
1039 match event {
1040 // Stream events that produce incremental content
1041 ClaudeEvent::StreamEvent { event } => matches!(
1042 event,
1043 StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::TextDelta { .. }
1044 ),
1045 _ => false,
1046 }
1047 }
1048
1049 /// Get a shared delta display formatter
1050 const fn formatter() -> DeltaDisplayFormatter {
1051 DeltaDisplayFormatter::new()
1052 }
1053
1054 /// Parse a stream of Claude NDJSON events
1055 pub fn parse_stream<R: BufRead>(&self, mut reader: R) -> io::Result<()> {
1056 use super::incremental_parser::IncrementalNdjsonParser;
1057
1058 let c = &self.colors;
1059 let monitor = HealthMonitor::new("Claude");
1060 let mut log_writer = self.log_file.as_ref().and_then(|log_path| {
1061 std::fs::OpenOptions::new()
1062 .create(true)
1063 .append(true)
1064 .open(log_path)
1065 .ok()
1066 .map(std::io::BufWriter::new)
1067 });
1068
1069 // Use incremental parser for true real-time streaming
1070 // This processes JSON as soon as it's complete, not waiting for newlines
1071 let mut incremental_parser = IncrementalNdjsonParser::new();
1072 let mut byte_buffer = Vec::new();
1073
1074 loop {
1075 // Read available bytes
1076 byte_buffer.clear();
1077 let chunk = reader.fill_buf()?;
1078 if chunk.is_empty() {
1079 break;
1080 }
1081
1082 // Process all bytes immediately
1083 byte_buffer.extend_from_slice(chunk);
1084 let consumed = chunk.len();
1085 reader.consume(consumed);
1086
1087 // Feed bytes to incremental parser
1088 let json_events = incremental_parser.feed(&byte_buffer);
1089
1090 // Process each complete JSON event immediately
1091 for line in json_events {
1092 let trimmed = line.trim();
1093 if trimmed.is_empty() {
1094 continue;
1095 }
1096
1097 // In debug mode, also show the raw JSON
1098 if self.verbosity.is_debug() {
1099 eprintln!(
1100 "{}[DEBUG]{} {}{}{}",
1101 c.dim(),
1102 c.reset(),
1103 c.dim(),
1104 &line,
1105 c.reset()
1106 );
1107 }
1108
1109 // Parse the event once - parse_event handles malformed JSON by returning None
1110 match self.parse_event(&line) {
1111 Some(output) => {
1112 // Check if this is a partial/delta event (streaming content)
1113 if trimmed.starts_with('{') {
1114 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1115 if Self::is_partial_event(&event) {
1116 monitor.record_partial_event();
1117 } else {
1118 monitor.record_parsed();
1119 }
1120 } else {
1121 monitor.record_parsed();
1122 }
1123 } else {
1124 monitor.record_parsed();
1125 }
1126 // Write output to printer
1127 let mut printer = self.printer.borrow_mut();
1128 write!(printer, "{output}")?;
1129 printer.flush()?;
1130 }
1131 None => {
1132 // Check if this was a control event (state management with no user output)
1133 // Control events are valid JSON that return empty output but aren't "ignored"
1134 if trimmed.starts_with('{') {
1135 if let Ok(event) = serde_json::from_str::<ClaudeEvent>(&line) {
1136 if Self::is_control_event(&event) {
1137 monitor.record_control_event();
1138 } else {
1139 // Valid JSON but not a control event - track as unknown
1140 monitor.record_unknown_event();
1141 }
1142 } else {
1143 // Failed to deserialize - track as parse error
1144 monitor.record_parse_error();
1145 }
1146 } else {
1147 monitor.record_ignored();
1148 }
1149 }
1150 }
1151
1152 // Log raw JSON to file if configured
1153 if let Some(ref mut file) = log_writer {
1154 writeln!(file, "{line}")?;
1155 }
1156 }
1157 }
1158
1159 if let Some(ref mut file) = log_writer {
1160 file.flush()?;
1161 }
1162 if let Some(warning) = monitor.check_and_warn(*c) {
1163 let mut printer = self.printer.borrow_mut();
1164 writeln!(printer, "{warning}")?;
1165 printer.flush()?;
1166 }
1167 Ok(())
1168 }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173 #[cfg(test)]
1174 use super::*;
1175 #[cfg(test)]
1176 use crate::json_parser::printer::{SharedPrinter, TestPrinter};
1177
1178 #[test]
1179 #[cfg(test)]
1180 fn test_printer_method_accessible() {
1181 // Test that the printer() method is accessible and returns a SharedPrinter
1182 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1183 let parser =
1184 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1185
1186 // This test verifies the printer() method is accessible
1187 let _printer_ref = parser.printer();
1188 }
1189
1190 #[test]
1191 #[cfg(test)]
1192 fn test_streaming_metrics_method_accessible() {
1193 // Test that the streaming_metrics() method is accessible
1194 let test_printer: SharedPrinter = Rc::new(RefCell::new(TestPrinter::new()));
1195 let parser =
1196 ClaudeParser::with_printer(Colors::new(), Verbosity::Normal, Rc::clone(&test_printer));
1197
1198 // This test verifies the streaming_metrics() method is accessible
1199 let _metrics = parser.streaming_metrics();
1200 }
1201}