1use std::cell::RefCell;
6use std::io::BufRead;
7use std::rc::Rc;
8
9use crate::json_parser::claude::io::ParserState;
10#[cfg(any(test, feature = "test-utils"))]
11use crate::json_parser::health::StreamingQualityMetrics;
12use crate::json_parser::incremental_parser::IncrementalNdjsonParser;
13use crate::json_parser::printer::Printable;
14use crate::json_parser::printer::StdoutPrinter;
15use crate::json_parser::types::{ContentBlock, ContentBlockDelta, ToolActivityTracker};
16
17pub struct ClaudeParser {
22 colors: Colors,
23 pub(crate) verbosity: Verbosity,
24 log_path: Option<std::path::PathBuf>,
25 display_name: String,
26 state: ParserState,
27 show_streaming_metrics: bool,
28 printer: Rc<RefCell<dyn Printable>>,
29 tool_activity_tracker: ToolActivityTracker,
34}
35
36impl ClaudeParser {
37 #[must_use]
38 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
39 Self::with_printer(
40 colors,
41 verbosity,
42 Rc::new(RefCell::new(StdoutPrinter::new())),
43 )
44 }
45
46 pub fn with_printer(
47 colors: Colors,
48 verbosity: Verbosity,
49 printer: Rc<RefCell<dyn Printable>>,
50 ) -> Self {
51 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
52
53 Self {
54 colors,
55 verbosity,
56 log_path: None,
57 display_name: "Claude".to_string(),
58 state: ParserState::new(verbose_warnings),
59 show_streaming_metrics: false,
60 printer,
61 tool_activity_tracker: ToolActivityTracker::new(),
62 }
63 }
64
65 pub(crate) fn with_tool_activity_tracker(
70 mut self,
71 tracker: std::sync::Arc<std::sync::atomic::AtomicU32>,
72 ) -> Self {
73 self.tool_activity_tracker = ToolActivityTracker::with_tracker(tracker);
74 self
75 }
76
77 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
78 self.show_streaming_metrics = show;
79 self
80 }
81
82 #[must_use]
83 pub fn with_display_name(mut self, display_name: &str) -> Self {
84 self.display_name = display_name.to_string();
85 self
86 }
87
88 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
89 self.log_path = Some(std::path::PathBuf::from(path));
90 self
91 }
92
93 #[cfg(any(test, feature = "test-utils"))]
103 #[must_use]
104 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
105 self.state.terminal_mode.replace(mode);
106 self
107 }
108
109 #[cfg(any(test, feature = "test-utils"))]
141 pub fn printer(&self) -> Rc<RefCell<dyn Printable>> {
142 self.printer.clone()
143 }
144
145 pub(crate) fn with_printer_mut<R>(&mut self, f: impl FnOnce(&mut dyn Printable) -> R) -> R {
146 let mut printer_ref = self.printer.borrow_mut();
147 f(&mut *printer_ref)
148 }
149
150 #[cfg(any(test, feature = "test-utils"))]
182 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
183 self.state
184 .streaming_session
185 .borrow()
186 .get_streaming_quality_metrics()
187 }
188
189 fn update_cursor_up_state(&self, output: &str) {
191 if *self.state.terminal_mode.borrow() == TerminalMode::Full {
192 self.state.with_cursor_up_active_mut(|cursor_up_active| {
193 if output.contains("\x1b[1B\n") {
194 *cursor_up_active = false;
195 }
196 if output.contains("\x1b[1A") {
197 *cursor_up_active = true;
198 }
199 });
200 }
201 }
202
203 fn handle_non_json_line(&self, trimmed: &str) -> Option<String> {
205 if trimmed.is_empty() || trimmed.starts_with('{') {
206 return None;
207 }
208 let finalize = self
209 .state
210 .with_session_mut(|session| self.finalize_in_place_full_mode(session));
211 let out = format!("{finalize}{trimmed}\n");
212 self.update_cursor_up_state(&out);
213 Some(out)
214 }
215
216 fn dispatch_event(&self, event: ClaudeEvent, line: &str) -> String {
218 match event {
219 ClaudeEvent::System { subtype, session_id, cwd } => {
220 self.format_system_event(subtype.as_ref(), session_id, cwd)
221 }
222 ClaudeEvent::Assistant { message } => self.format_assistant_event(message.as_ref()),
223 ClaudeEvent::User { message } => self.format_user_event(message),
224 ClaudeEvent::Result { subtype, duration_ms, total_cost_usd, num_turns, result, error } => {
225 self.format_result_event(subtype, duration_ms, total_cost_usd, num_turns, result, error)
226 }
227 ClaudeEvent::StreamEvent { event } => self.parse_stream_event(event),
228 ClaudeEvent::Unknown => self.format_unknown_event(line),
229 }
230 }
231
232 fn format_unknown_event(&self, line: &str) -> String {
233 format_unknown_json_event(line, &self.display_name, self.colors, self.verbosity.is_verbose())
234 }
235
236 pub fn parse_event(&self, line: &str) -> Option<String> {
243 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
244 e
245 } else {
246 return self.handle_non_json_line(line.trim());
247 };
248 let finalize = self.compute_finalize_for_event(&event);
249 let output = self.dispatch_event(event, line);
250 let combined = combine_finalize_and_output(finalize, output);
251 combined.inspect(|out| {
252 self.update_cursor_up_state(out);
253 })
254 }
255
256 fn compute_finalize_for_event(&self, event: &ClaudeEvent) -> String {
257 if matches!(event, ClaudeEvent::StreamEvent { .. }) {
258 String::new()
259 } else {
260 self.state
261 .with_session_mut(|session| self.finalize_in_place_full_mode(session))
262 }
263 }
264
265 fn reset_message_state(&self) {
267 self.state.with_thinking_active_index_mut(|idx| *idx = None);
268 self.state
269 .with_thinking_non_tty_indices_mut(|indices| indices.clear());
270 self.state
271 .with_suppress_thinking_for_message_mut(|v| *v = false);
272 self.state.with_text_line_active_mut(|v| *v = false);
273 self.state.with_cursor_up_active_mut(|v| *v = false);
274 self.state.with_last_rendered_content_mut(|v| v.clear());
275 }
276
277 fn handle_message_start(
279 &self,
280 message: Option<crate::json_parser::types::AssistantMessage>,
281 message_id: Option<String>,
282 ) -> String {
283 self.tool_activity_tracker.clear_active();
287 let in_place_finalize = self
288 .state
289 .with_session_mut(|session| self.finalize_in_place_full_mode(session));
290 self.reset_message_state();
291 let effective_message_id =
292 message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
293 self.state.with_session_mut(|session| {
294 session.set_current_message_id(effective_message_id);
295 session.on_message_start();
296 });
297 in_place_finalize
298 }
299
300 fn handle_content_block_start_no_block(&self, index: u64) -> String {
312 self.state.with_session_mut(|session| { session.on_content_block_start(index); });
313 String::new()
314 }
315
316 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
317 match event {
318 StreamInnerEvent::MessageStart { message, message_id } => self.handle_message_start(message, message_id),
319 StreamInnerEvent::ContentBlockStart { index: Some(index), content_block: Some(block) } => self.handle_content_block_start_with_block(index, block),
320 StreamInnerEvent::ContentBlockStart { index: Some(index), content_block: None } => self.handle_content_block_start_no_block(index),
321 StreamInnerEvent::ContentBlockStart { .. } => String::new(),
322 StreamInnerEvent::ContentBlockDelta { index: Some(index), delta: Some(delta) } => self.handle_content_block_delta_inner(index, delta),
323 StreamInnerEvent::TextDelta { text: Some(text) } => self.handle_text_delta_inner(&text),
324 StreamInnerEvent::ContentBlockStop { .. } | StreamInnerEvent::MessageDelta { .. } | StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::Ping | StreamInnerEvent::TextDelta { text: None } | StreamInnerEvent::Error { error: None } => String::new(),
325 StreamInnerEvent::MessageStop => self.handle_message_stop_inner(),
326 StreamInnerEvent::Error { error: Some(err), .. } => self.handle_error_event(err),
327 StreamInnerEvent::Unknown => self.handle_unknown_event(),
328 }
329 }
330
331 fn handle_content_block_start_with_block(&self, index: u64, block: ContentBlock) -> String {
332 if matches!(block, ContentBlock::ToolUse { .. }) {
335 self.tool_activity_tracker.set_active();
336 }
337 self.state.with_session_mut(|session| {
338 session.on_content_block_start(index);
339 apply_content_block_start_to_session(session, index, &block);
340 });
341 String::new()
342 }
343
344 fn handle_content_block_delta_inner(&self, index: u64, delta: ContentBlockDelta) -> String {
345 self.state
346 .with_session_mut(|session| self.handle_content_block_delta(session, index, delta))
347 }
348
349 fn handle_text_delta_inner(&self, text: &str) -> String {
350 self.state
351 .with_session_mut(|session| self.handle_text_delta(session, text))
352 }
353
354 fn handle_message_stop_inner(&self) -> String {
355 self.state
359 .with_session_mut(|session| self.handle_message_stop(session))
360 }
361}
362
363struct StreamLoopState {
364 incremental_parser: IncrementalNdjsonParser,
365 log_buffer: Vec<u8>,
366 seen_success_result: std::cell::Cell<bool>,
367}
368
369impl StreamLoopState {
370 fn new() -> Self {
371 Self {
372 incremental_parser: IncrementalNdjsonParser::new(),
373 log_buffer: Vec::new(),
374 seen_success_result: std::cell::Cell::new(false),
375 }
376 }
377}
378
379impl ClaudeParser {
380 pub fn parse_stream<R: BufRead>(
381 &mut self,
382 mut reader: R,
383 workspace: &dyn crate::workspace::Workspace,
384 ) -> std::io::Result<()> {
385 let c = self.colors;
386 let monitor = HealthMonitor::new("Claude");
387 let mut state = StreamLoopState::new();
388 self.run_stream_loop(&mut reader, c, &monitor, &mut state)?;
389 self.finalize_parse_stream(workspace, &monitor, c, &state.log_buffer)
390 }
391
392 fn run_stream_loop<R: BufRead>(
393 &mut self, reader: &mut R, c: Colors,
394 monitor: &HealthMonitor, state: &mut StreamLoopState,
395 ) -> std::io::Result<()> {
396 let logging_enabled = self.log_path.is_some();
397 loop {
398 let chunk = reader.fill_buf()?;
399 if chunk.is_empty() { break; }
400 let data = chunk.to_vec(); reader.consume(data.len());
401 let (new_parser, events) = std::mem::take(&mut state.incremental_parser).feed_and_get_events(&data);
402 state.incremental_parser = new_parser;
403 events.into_iter().for_each(|line| { self.process_stream_line(&line, c, monitor, &mut state.log_buffer, logging_enabled, &state.seen_success_result); });
404 }
405 Ok(())
406 }
407
408 #[expect(
409 clippy::print_stderr,
410 reason = "debug-only output for verbose debugging"
411 )]
412 fn process_stream_line(
413 &mut self,
414 line: &str,
415 c: Colors,
416 monitor: &HealthMonitor,
417 log_buffer: &mut Vec<u8>,
418 logging_enabled: bool,
419 seen_success_result: &std::cell::Cell<bool>,
420 ) {
421 let trimmed = line.trim();
422 if trimmed.is_empty() { return; }
423 if self.verbosity.is_debug() {
424 eprintln!("{}[DEBUG]{} {}{}{}", c.dim(), c.reset(), c.dim(), line, c.reset());
425 }
426 self.process_parsed_line(trimmed, line, monitor, log_buffer, logging_enabled, seen_success_result);
427 }
428
429 fn process_parsed_line(
430 &mut self,
431 trimmed: &str,
432 line: &str,
433 monitor: &HealthMonitor,
434 log_buffer: &mut Vec<u8>,
435 logging_enabled: bool,
436 seen_success_result: &std::cell::Cell<bool>,
437 ) {
438 if should_skip_result_event(trimmed, seen_success_result) {
439 log_line_if_enabled(log_buffer, line, logging_enabled);
440 monitor.record_control_event();
441 return;
442 }
443 match self.parse_event(line) {
444 Some(output) => {
445 record_monitor_for_parsed_output(trimmed, line, monitor);
446 self.with_printer_mut(|printer| {
447 if write!(printer, "{output}").is_ok() { printer.flush().ok(); }
448 });
449 }
450 None => record_monitor_for_no_output(trimmed, line, monitor),
451 }
452 log_line_if_enabled(log_buffer, line, logging_enabled);
453 }
454
455 fn finalize_parse_stream(
456 &mut self,
457 workspace: &dyn crate::workspace::Workspace,
458 monitor: &HealthMonitor,
459 c: Colors,
460 log_buffer: &[u8],
461 ) -> std::io::Result<()> {
462 self.tool_activity_tracker.reset(); if let Some(log_path) = &self.log_path {
464 workspace.append_bytes(log_path, log_buffer)?;
465 }
466 if let Some(warning) = monitor.check_and_warn(c) {
467 self.with_printer_mut(|printer| {
468 writeln!(printer, "{warning}").ok();
469 printer.flush().ok();
470 });
471 }
472 Ok(())
473 }
474}
475
476fn log_line_if_enabled(log_buffer: &mut Vec<u8>, line: &str, logging_enabled: bool) {
477 if logging_enabled { let _ = writeln!(log_buffer, "{line}"); }
478}
479
480fn combine_finalize_and_output(finalize: String, output: String) -> Option<String> {
481 let combined = if output.is_empty() {
482 finalize
483 } else {
484 format!("{finalize}{output}")
485 };
486 if combined.is_empty() { None } else { Some(combined) }
487}
488
489fn apply_content_block_start_to_session(
490 session: &mut crate::json_parser::streaming_state::StreamingSession,
491 index: u64,
492 block: &ContentBlock,
493) {
494 match block {
495 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
496 session.on_text_delta(index, t);
497 }
498 ContentBlock::ToolUse { name, input } => {
499 apply_tool_use_start_to_session(session, index, name.as_deref(), input.as_ref());
500 }
501 _ => {}
502 }
503}
504
505fn json_value_to_tool_input_str(v: &serde_json::Value) -> String {
506 if let serde_json::Value::String(s) = v { s.clone() } else { format_tool_input(v) }
507}
508
509fn apply_tool_use_start_to_session(
510 session: &mut crate::json_parser::streaming_state::StreamingSession,
511 index: u64,
512 name: Option<&str>,
513 input: Option<&serde_json::Value>,
514) {
515 if let Some(n) = name {
516 session.set_tool_name(index, Some(n.to_string()));
517 }
518 if let Some(i) = input {
519 session.on_tool_input_delta(index, &json_value_to_tool_input_str(i));
520 }
521}
522
523fn has_errors_array_with_content(trimmed: &str) -> bool {
524 serde_json::from_str::<serde_json::Value>(trimmed).is_ok_and(|json| {
525 json.get("errors")
526 .and_then(|v| v.as_array())
527 .is_some_and(|arr| {
528 arr.iter()
529 .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
530 })
531 })
532}
533
534fn is_spurious_glm_error(
535 subtype: &Option<String>,
536 duration_ms: Option<u64>,
537 error: &Option<String>,
538 has_errors: bool,
539) -> bool {
540 subtype.as_deref() != Some("success")
541 && duration_ms.unwrap_or(0) < 100
542 && (error.is_none() || error.as_ref().is_some_and(std::string::String::is_empty))
543 && !has_errors
544}
545
546fn should_skip_result_event(trimmed: &str, seen_success: &std::cell::Cell<bool>) -> bool {
547 if !trimmed.starts_with('{') { return false; }
548 let has_errors = has_errors_array_with_content(trimmed);
549 let Ok(ClaudeEvent::Result { subtype, duration_ms, error, .. }) =
550 serde_json::from_str::<ClaudeEvent>(trimmed)
551 else {
552 return false;
553 };
554 let spurious = is_spurious_glm_error(&subtype, duration_ms, &error, has_errors);
555 if subtype.as_deref() == Some("success") {
556 seen_success.set(true);
557 false
558 } else {
559 spurious
560 }
561}
562
563fn record_monitor_for_parsed_output(trimmed: &str, line: &str, monitor: &HealthMonitor) {
564 let is_partial = trimmed.starts_with('{')
565 && serde_json::from_str::<ClaudeEvent>(line)
566 .is_ok_and(|e| ClaudeParser::is_partial_event(&e));
567 if is_partial {
568 monitor.record_partial_event();
569 } else {
570 monitor.record_parsed();
571 }
572}
573
574fn record_monitor_for_no_output(trimmed: &str, line: &str, monitor: &HealthMonitor) {
575 if !trimmed.starts_with('{') { return monitor.record_ignored(); }
576 match serde_json::from_str::<ClaudeEvent>(line) {
577 Ok(event) if ClaudeParser::is_control_event(&event) => monitor.record_control_event(),
578 Ok(_) => monitor.record_unknown_event(),
579 Err(_) => monitor.record_parse_error(),
580 }
581}