1use std::cell::RefCell;
6use std::io::BufRead;
7use std::rc::Rc;
8
9use crate::json_parser::claude::io::ParserState;
10#[cfg(any(test, feature = "test-utils"))]
11use crate::json_parser::health::StreamingQualityMetrics;
12use crate::json_parser::incremental_parser::IncrementalNdjsonParser;
13use crate::json_parser::printer::Printable;
14use crate::json_parser::printer::StdoutPrinter;
15use crate::json_parser::types::{ContentBlock, ContentBlockDelta};
16
17pub struct ClaudeParser {
22 colors: Colors,
23 pub(crate) verbosity: Verbosity,
24 log_path: Option<std::path::PathBuf>,
25 display_name: String,
26 state: ParserState,
27 show_streaming_metrics: bool,
28 printer: Rc<RefCell<dyn Printable>>,
29}
30
31impl ClaudeParser {
32 #[must_use]
33 pub fn new(colors: Colors, verbosity: Verbosity) -> Self {
34 Self::with_printer(
35 colors,
36 verbosity,
37 Rc::new(RefCell::new(StdoutPrinter::new())),
38 )
39 }
40
41 pub fn with_printer(
42 colors: Colors,
43 verbosity: Verbosity,
44 printer: Rc<RefCell<dyn Printable>>,
45 ) -> Self {
46 let verbose_warnings = matches!(verbosity, Verbosity::Debug);
47
48 Self {
49 colors,
50 verbosity,
51 log_path: None,
52 display_name: "Claude".to_string(),
53 state: ParserState::new(verbose_warnings),
54 show_streaming_metrics: false,
55 printer,
56 }
57 }
58
59 pub(crate) const fn with_show_streaming_metrics(mut self, show: bool) -> Self {
60 self.show_streaming_metrics = show;
61 self
62 }
63
64 #[must_use]
65 pub fn with_display_name(mut self, display_name: &str) -> Self {
66 self.display_name = display_name.to_string();
67 self
68 }
69
70 pub(crate) fn with_log_file(mut self, path: &str) -> Self {
71 self.log_path = Some(std::path::PathBuf::from(path));
72 self
73 }
74
75 #[cfg(any(test, feature = "test-utils"))]
85 #[must_use]
86 pub fn with_terminal_mode(self, mode: TerminalMode) -> Self {
87 self.state.terminal_mode.replace(mode);
88 self
89 }
90
91 #[cfg(any(test, feature = "test-utils"))]
123 pub fn printer(&self) -> Rc<RefCell<dyn Printable>> {
124 self.printer.clone()
125 }
126
127 pub(crate) fn with_printer_mut<R>(&mut self, f: impl FnOnce(&mut dyn Printable) -> R) -> R {
128 let mut printer_ref = self.printer.borrow_mut();
129 f(&mut *printer_ref)
130 }
131
132 #[cfg(any(test, feature = "test-utils"))]
164 pub fn streaming_metrics(&self) -> StreamingQualityMetrics {
165 self.state
166 .streaming_session
167 .borrow()
168 .get_streaming_quality_metrics()
169 }
170
171 fn update_cursor_up_state(&self, output: &str) {
173 if *self.state.terminal_mode.borrow() == TerminalMode::Full {
174 self.state.with_cursor_up_active_mut(|cursor_up_active| {
175 if output.contains("\x1b[1B\n") {
176 *cursor_up_active = false;
177 }
178 if output.contains("\x1b[1A") {
179 *cursor_up_active = true;
180 }
181 });
182 }
183 }
184
185 fn handle_non_json_line(&self, trimmed: &str) -> Option<String> {
187 if trimmed.is_empty() || trimmed.starts_with('{') {
188 return None;
189 }
190 let finalize = self
191 .state
192 .with_session_mut(|session| self.finalize_in_place_full_mode(session));
193 let out = format!("{finalize}{trimmed}\n");
194 self.update_cursor_up_state(&out);
195 Some(out)
196 }
197
198 fn dispatch_event(&self, event: ClaudeEvent, line: &str) -> String {
200 match event {
201 ClaudeEvent::System { subtype, session_id, cwd } => {
202 self.format_system_event(subtype.as_ref(), session_id, cwd)
203 }
204 ClaudeEvent::Assistant { message } => self.format_assistant_event(message.as_ref()),
205 ClaudeEvent::User { message } => self.format_user_event(message),
206 ClaudeEvent::Result { subtype, duration_ms, total_cost_usd, num_turns, result, error } => {
207 self.format_result_event(subtype, duration_ms, total_cost_usd, num_turns, result, error)
208 }
209 ClaudeEvent::StreamEvent { event } => self.parse_stream_event(event),
210 ClaudeEvent::Unknown => self.format_unknown_event(line),
211 }
212 }
213
214 fn format_unknown_event(&self, line: &str) -> String {
215 format_unknown_json_event(line, &self.display_name, self.colors, self.verbosity.is_verbose())
216 }
217
218 pub fn parse_event(&self, line: &str) -> Option<String> {
225 let event: ClaudeEvent = if let Ok(e) = serde_json::from_str(line) {
226 e
227 } else {
228 return self.handle_non_json_line(line.trim());
229 };
230 let finalize = self.compute_finalize_for_event(&event);
231 let output = self.dispatch_event(event, line);
232 let combined = combine_finalize_and_output(finalize, output);
233 combined.inspect(|out| {
234 self.update_cursor_up_state(out);
235 })
236 }
237
238 fn compute_finalize_for_event(&self, event: &ClaudeEvent) -> String {
239 if matches!(event, ClaudeEvent::StreamEvent { .. }) {
240 String::new()
241 } else {
242 self.state
243 .with_session_mut(|session| self.finalize_in_place_full_mode(session))
244 }
245 }
246
247 fn reset_message_state(&self) {
249 self.state.with_thinking_active_index_mut(|idx| *idx = None);
250 self.state
251 .with_thinking_non_tty_indices_mut(|indices| indices.clear());
252 self.state
253 .with_suppress_thinking_for_message_mut(|v| *v = false);
254 self.state.with_text_line_active_mut(|v| *v = false);
255 self.state.with_cursor_up_active_mut(|v| *v = false);
256 self.state.with_last_rendered_content_mut(|v| v.clear());
257 }
258
259 fn handle_message_start(
261 &self,
262 message: Option<crate::json_parser::types::AssistantMessage>,
263 message_id: Option<String>,
264 ) -> String {
265 let in_place_finalize = self
266 .state
267 .with_session_mut(|session| self.finalize_in_place_full_mode(session));
268 self.reset_message_state();
269 let effective_message_id =
270 message_id.or_else(|| message.as_ref().and_then(|m| m.id.clone()));
271 self.state.with_session_mut(|session| {
272 session.set_current_message_id(effective_message_id);
273 session.on_message_start();
274 });
275 in_place_finalize
276 }
277
278 fn handle_content_block_start_no_block(&self, index: u64) -> String {
290 self.state.with_session_mut(|session| { session.on_content_block_start(index); });
291 String::new()
292 }
293
294 fn parse_stream_event(&self, event: StreamInnerEvent) -> String {
295 match event {
296 StreamInnerEvent::MessageStart { message, message_id } => self.handle_message_start(message, message_id),
297 StreamInnerEvent::ContentBlockStart { index: Some(index), content_block: Some(block) } => self.handle_content_block_start_with_block(index, block),
298 StreamInnerEvent::ContentBlockStart { index: Some(index), content_block: None } => self.handle_content_block_start_no_block(index),
299 StreamInnerEvent::ContentBlockStart { .. } => String::new(),
300 StreamInnerEvent::ContentBlockDelta { index: Some(index), delta: Some(delta) } => self.handle_content_block_delta_inner(index, delta),
301 StreamInnerEvent::TextDelta { text: Some(text) } => self.handle_text_delta_inner(&text),
302 StreamInnerEvent::ContentBlockStop { .. } | StreamInnerEvent::MessageDelta { .. } | StreamInnerEvent::ContentBlockDelta { .. } | StreamInnerEvent::Ping | StreamInnerEvent::TextDelta { text: None } | StreamInnerEvent::Error { error: None } => String::new(),
303 StreamInnerEvent::MessageStop => self.handle_message_stop_inner(),
304 StreamInnerEvent::Error { error: Some(err), .. } => self.handle_error_event(err),
305 StreamInnerEvent::Unknown => self.handle_unknown_event(),
306 }
307 }
308
309 fn handle_content_block_start_with_block(&self, index: u64, block: ContentBlock) -> String {
310 self.state.with_session_mut(|session| {
311 session.on_content_block_start(index);
312 apply_content_block_start_to_session(session, index, &block);
313 });
314 String::new()
315 }
316
317 fn handle_content_block_delta_inner(&self, index: u64, delta: ContentBlockDelta) -> String {
318 self.state
319 .with_session_mut(|session| self.handle_content_block_delta(session, index, delta))
320 }
321
322 fn handle_text_delta_inner(&self, text: &str) -> String {
323 self.state
324 .with_session_mut(|session| self.handle_text_delta(session, text))
325 }
326
327 fn handle_message_stop_inner(&self) -> String {
328 self.state
329 .with_session_mut(|session| self.handle_message_stop(session))
330 }
331}
332
333struct StreamLoopState {
334 incremental_parser: IncrementalNdjsonParser,
335 log_buffer: Vec<u8>,
336 seen_success_result: std::cell::Cell<bool>,
337}
338
339impl StreamLoopState {
340 fn new() -> Self {
341 Self {
342 incremental_parser: IncrementalNdjsonParser::new(),
343 log_buffer: Vec::new(),
344 seen_success_result: std::cell::Cell::new(false),
345 }
346 }
347}
348
349impl ClaudeParser {
350 pub fn parse_stream<R: BufRead>(
351 &mut self,
352 mut reader: R,
353 workspace: &dyn crate::workspace::Workspace,
354 ) -> std::io::Result<()> {
355 let c = self.colors;
356 let monitor = HealthMonitor::new("Claude");
357 let mut state = StreamLoopState::new();
358 self.run_stream_loop(&mut reader, c, &monitor, &mut state)?;
359 self.finalize_parse_stream(workspace, &monitor, c, &state.log_buffer)
360 }
361
362 fn run_stream_loop<R: BufRead>(
363 &mut self, reader: &mut R, c: Colors,
364 monitor: &HealthMonitor, state: &mut StreamLoopState,
365 ) -> std::io::Result<()> {
366 let logging_enabled = self.log_path.is_some();
367 loop {
368 let chunk = reader.fill_buf()?;
369 if chunk.is_empty() { break; }
370 let data = chunk.to_vec(); reader.consume(data.len());
371 let (new_parser, events) = std::mem::take(&mut state.incremental_parser).feed_and_get_events(&data);
372 state.incremental_parser = new_parser;
373 events.into_iter().for_each(|line| { self.process_stream_line(&line, c, monitor, &mut state.log_buffer, logging_enabled, &state.seen_success_result); });
374 }
375 Ok(())
376 }
377
378 #[expect(
379 clippy::print_stderr,
380 reason = "debug-only output for verbose debugging"
381 )]
382 fn process_stream_line(
383 &mut self,
384 line: &str,
385 c: Colors,
386 monitor: &HealthMonitor,
387 log_buffer: &mut Vec<u8>,
388 logging_enabled: bool,
389 seen_success_result: &std::cell::Cell<bool>,
390 ) {
391 let trimmed = line.trim();
392 if trimmed.is_empty() { return; }
393 if self.verbosity.is_debug() {
394 eprintln!("{}[DEBUG]{} {}{}{}", c.dim(), c.reset(), c.dim(), line, c.reset());
395 }
396 self.process_parsed_line(trimmed, line, monitor, log_buffer, logging_enabled, seen_success_result);
397 }
398
399 fn process_parsed_line(
400 &mut self,
401 trimmed: &str,
402 line: &str,
403 monitor: &HealthMonitor,
404 log_buffer: &mut Vec<u8>,
405 logging_enabled: bool,
406 seen_success_result: &std::cell::Cell<bool>,
407 ) {
408 if should_skip_result_event(trimmed, seen_success_result) {
409 log_line_if_enabled(log_buffer, line, logging_enabled);
410 monitor.record_control_event();
411 return;
412 }
413 match self.parse_event(line) {
414 Some(output) => {
415 record_monitor_for_parsed_output(trimmed, line, monitor);
416 self.with_printer_mut(|printer| {
417 if write!(printer, "{output}").is_ok() { printer.flush().ok(); }
418 });
419 }
420 None => record_monitor_for_no_output(trimmed, line, monitor),
421 }
422 log_line_if_enabled(log_buffer, line, logging_enabled);
423 }
424
425 fn finalize_parse_stream(
426 &mut self,
427 workspace: &dyn crate::workspace::Workspace,
428 monitor: &HealthMonitor,
429 c: Colors,
430 log_buffer: &[u8],
431 ) -> std::io::Result<()> {
432 if let Some(log_path) = &self.log_path {
433 workspace.append_bytes(log_path, log_buffer)?;
434 }
435 if let Some(warning) = monitor.check_and_warn(c) {
436 self.with_printer_mut(|printer| {
437 writeln!(printer, "{warning}").ok();
438 printer.flush().ok();
439 });
440 }
441 Ok(())
442 }
443}
444
445fn log_line_if_enabled(log_buffer: &mut Vec<u8>, line: &str, logging_enabled: bool) {
446 if logging_enabled { let _ = writeln!(log_buffer, "{line}"); }
447}
448
449fn combine_finalize_and_output(finalize: String, output: String) -> Option<String> {
450 let combined = if output.is_empty() {
451 finalize
452 } else {
453 format!("{finalize}{output}")
454 };
455 if combined.is_empty() { None } else { Some(combined) }
456}
457
458fn apply_content_block_start_to_session(
459 session: &mut crate::json_parser::streaming_state::StreamingSession,
460 index: u64,
461 block: &ContentBlock,
462) {
463 match block {
464 ContentBlock::Text { text: Some(t) } if !t.is_empty() => {
465 session.on_text_delta(index, t);
466 }
467 ContentBlock::ToolUse { name, input } => {
468 apply_tool_use_start_to_session(session, index, name.as_deref(), input.as_ref());
469 }
470 _ => {}
471 }
472}
473
474fn json_value_to_tool_input_str(v: &serde_json::Value) -> String {
475 if let serde_json::Value::String(s) = v { s.clone() } else { format_tool_input(v) }
476}
477
478fn apply_tool_use_start_to_session(
479 session: &mut crate::json_parser::streaming_state::StreamingSession,
480 index: u64,
481 name: Option<&str>,
482 input: Option<&serde_json::Value>,
483) {
484 if let Some(n) = name {
485 session.set_tool_name(index, Some(n.to_string()));
486 }
487 if let Some(i) = input {
488 session.on_tool_input_delta(index, &json_value_to_tool_input_str(i));
489 }
490}
491
492fn has_errors_array_with_content(trimmed: &str) -> bool {
493 serde_json::from_str::<serde_json::Value>(trimmed).is_ok_and(|json| {
494 json.get("errors")
495 .and_then(|v| v.as_array())
496 .is_some_and(|arr| {
497 arr.iter()
498 .any(|e| e.as_str().is_some_and(|s| !s.trim().is_empty()))
499 })
500 })
501}
502
503fn is_spurious_glm_error(
504 subtype: &Option<String>,
505 duration_ms: Option<u64>,
506 error: &Option<String>,
507 has_errors: bool,
508) -> bool {
509 subtype.as_deref() != Some("success")
510 && duration_ms.unwrap_or(0) < 100
511 && (error.is_none() || error.as_ref().is_some_and(std::string::String::is_empty))
512 && !has_errors
513}
514
515fn should_skip_result_event(trimmed: &str, seen_success: &std::cell::Cell<bool>) -> bool {
516 if !trimmed.starts_with('{') { return false; }
517 let has_errors = has_errors_array_with_content(trimmed);
518 let Ok(ClaudeEvent::Result { subtype, duration_ms, error, .. }) =
519 serde_json::from_str::<ClaudeEvent>(trimmed)
520 else {
521 return false;
522 };
523 let spurious = is_spurious_glm_error(&subtype, duration_ms, &error, has_errors);
524 if subtype.as_deref() == Some("success") {
525 seen_success.set(true);
526 false
527 } else {
528 spurious
529 }
530}
531
532fn record_monitor_for_parsed_output(trimmed: &str, line: &str, monitor: &HealthMonitor) {
533 let is_partial = trimmed.starts_with('{')
534 && serde_json::from_str::<ClaudeEvent>(line)
535 .is_ok_and(|e| ClaudeParser::is_partial_event(&e));
536 if is_partial {
537 monitor.record_partial_event();
538 } else {
539 monitor.record_parsed();
540 }
541}
542
543fn record_monitor_for_no_output(trimmed: &str, line: &str, monitor: &HealthMonitor) {
544 if !trimmed.starts_with('{') { return monitor.record_ignored(); }
545 match serde_json::from_str::<ClaudeEvent>(line) {
546 Ok(event) if ClaudeParser::is_control_event(&event) => monitor.record_control_event(),
547 Ok(_) => monitor.record_unknown_event(),
548 Err(_) => monitor.record_parse_error(),
549 }
550}