1use std::collections::HashMap;
4
5use claude_code::{ContentBlock, Message, ToolResultBlock, UserContent, parse_message};
6use serde_json::Value;
7
8use crate::error::ExecutorError;
9use crate::log::{ActionType, LogNormalizer, NormalizedLog};
10use crate::types::{Role, ToolStatus};
11
12#[derive(Debug, Clone)]
13struct PendingToolCall {
14 name: String,
15 args: Value,
16 action: ActionType,
17}
18
19#[derive(Default)]
21pub struct ClaudeCodeLogNormalizer {
22 line_buffer: Vec<u8>,
23 json_buffer: String,
24 pending_tools: HashMap<String, PendingToolCall>,
25}
26
27impl ClaudeCodeLogNormalizer {
28 pub fn new() -> Self {
30 Self::default()
31 }
32
33 fn process_line(&mut self, line: &[u8]) -> Result<Vec<NormalizedLog>, ExecutorError> {
34 let line = std::str::from_utf8(line).map_err(|err| {
35 ExecutorError::execution_failed("failed to decode claude log chunk as UTF-8", err)
36 })?;
37 let trimmed = line.trim();
38 if trimmed.is_empty() {
39 return Ok(Vec::new());
40 }
41
42 self.json_buffer.push_str(trimmed);
43
44 match serde_json::from_str::<Value>(&self.json_buffer) {
45 Ok(value) => {
46 self.json_buffer.clear();
47 self.process_value(value)
48 }
49 Err(err) if err.is_eof() => Ok(Vec::new()),
50 Err(err) => {
51 self.json_buffer.clear();
52 Err(ExecutorError::Serialization(err))
53 }
54 }
55 }
56
57 fn process_value(&mut self, value: Value) -> Result<Vec<NormalizedLog>, ExecutorError> {
58 let message = parse_message(&value).map_err(|err| {
59 ExecutorError::execution_failed("failed to parse claude sdk message", err)
60 })?;
61 let Some(message) = message else {
62 return Ok(Vec::new());
63 };
64
65 Ok(self.normalize_message(message))
66 }
67
68 fn normalize_message(&mut self, message: Message) -> Vec<NormalizedLog> {
69 match message {
70 Message::User(user) => self.normalize_user_content(user.content),
71 Message::Assistant(assistant) => self.normalize_assistant_content(assistant.content),
72 Message::System(system) => vec![NormalizedLog::Message {
73 role: Role::System,
74 content: system.subtype,
75 }],
76 Message::Result(result) => {
77 let mut logs = Vec::new();
78
79 if let Some((total, limit)) = extract_token_usage(result.usage.as_ref()) {
80 logs.push(NormalizedLog::TokenUsage { total, limit });
81 }
82
83 if result.is_error {
84 logs.push(NormalizedLog::Error {
85 error_type: "result_error".to_string(),
86 message: result.result.unwrap_or_else(|| {
87 format!("Claude Code result subtype: {}", result.subtype)
88 }),
89 });
90 }
91
92 logs
93 }
94 Message::StreamEvent(_) => Vec::new(),
95 }
96 }
97
98 fn normalize_user_content(&mut self, content: UserContent) -> Vec<NormalizedLog> {
99 match content {
100 UserContent::Text(text) => {
101 if text.trim().is_empty() {
102 Vec::new()
103 } else {
104 vec![NormalizedLog::Message {
105 role: Role::User,
106 content: text,
107 }]
108 }
109 }
110 UserContent::Blocks(blocks) => {
111 let mut logs = Vec::new();
112 for block in blocks {
113 match block {
114 ContentBlock::Text(text) => {
115 if !text.text.trim().is_empty() {
116 logs.push(NormalizedLog::Message {
117 role: Role::User,
118 content: text.text,
119 });
120 }
121 }
122 ContentBlock::ToolResult(result) => {
123 self.push_tool_result_log(&mut logs, result);
124 }
125 _ => {}
126 }
127 }
128 logs
129 }
130 }
131 }
132
133 fn normalize_assistant_content(&mut self, blocks: Vec<ContentBlock>) -> Vec<NormalizedLog> {
134 let mut logs = Vec::new();
135
136 for block in blocks {
137 match block {
138 ContentBlock::Text(text) => logs.push(NormalizedLog::Message {
139 role: Role::Assistant,
140 content: text.text,
141 }),
142 ContentBlock::Thinking(thinking) => logs.push(NormalizedLog::Thinking {
143 content: thinking.thinking,
144 }),
145 ContentBlock::ToolUse(tool_use) => {
146 let action = infer_action(&tool_use.name, &tool_use.input);
147 self.pending_tools.insert(
148 tool_use.id.clone(),
149 PendingToolCall {
150 name: tool_use.name.clone(),
151 args: tool_use.input.clone(),
152 action: action.clone(),
153 },
154 );
155
156 logs.push(NormalizedLog::ToolCall {
157 name: tool_use.name,
158 args: tool_use.input,
159 status: ToolStatus::Started,
160 action,
161 });
162 }
163 ContentBlock::ToolResult(result) => {
164 self.push_tool_result_log(&mut logs, result);
165 }
166 }
167 }
168
169 logs
170 }
171
172 fn push_tool_result_log(&mut self, logs: &mut Vec<NormalizedLog>, result: ToolResultBlock) {
173 let ToolResultBlock {
174 tool_use_id,
175 content,
176 is_error,
177 } = result;
178 if let Some(pending) = self.pending_tools.remove(&tool_use_id) {
179 logs.push(NormalizedLog::ToolCall {
180 name: pending.name,
181 args: content.unwrap_or(pending.args),
182 status: if is_error.unwrap_or(false) {
183 ToolStatus::Failed
184 } else {
185 ToolStatus::Completed
186 },
187 action: pending.action,
188 });
189 }
190 }
191
192 fn error_to_log(error: ExecutorError) -> NormalizedLog {
193 NormalizedLog::Error {
194 error_type: error_type(&error).to_string(),
195 message: error.to_string(),
196 }
197 }
198}
199
200impl LogNormalizer for ClaudeCodeLogNormalizer {
201 fn normalize(&mut self, chunk: &[u8]) -> Vec<NormalizedLog> {
202 let mut logs = Vec::new();
203
204 for &byte in chunk {
205 if byte == b'\n' {
206 let line = std::mem::take(&mut self.line_buffer);
207 match self.process_line(&line) {
208 Ok(mut parsed) => logs.append(&mut parsed),
209 Err(error) => logs.push(Self::error_to_log(error)),
210 }
211 } else {
212 self.line_buffer.push(byte);
213 }
214 }
215
216 logs
217 }
218
219 fn flush(&mut self) -> Vec<NormalizedLog> {
220 let mut logs = Vec::new();
221
222 if !self.line_buffer.is_empty() {
223 let line = std::mem::take(&mut self.line_buffer);
224 match self.process_line(&line) {
225 Ok(mut parsed) => logs.append(&mut parsed),
226 Err(error) => logs.push(Self::error_to_log(error)),
227 }
228 }
229
230 if !self.json_buffer.trim().is_empty() {
231 let buffer_len = self.json_buffer.len();
232 let message = format!(
233 "incomplete Claude Code JSON message buffered at flush: <redacted> (buffer_len={buffer_len})"
234 );
235 self.json_buffer.clear();
236 logs.push(Self::error_to_log(ExecutorError::execution_failed(
237 "failed to flush claude code log stream",
238 message,
239 )));
240 }
241
242 self.pending_tools.clear();
243
244 logs
245 }
246}
247
248fn infer_action(name: &str, args: &Value) -> ActionType {
249 let lower = name.to_ascii_lowercase();
250
251 if lower.starts_with("mcp__") {
252 return ActionType::McpTool {
253 tool: name.to_string(),
254 };
255 }
256
257 if lower.contains("askuser") || lower.contains("ask_user") {
258 return ActionType::AskUser;
259 }
260
261 if lower.contains("websearch") || lower.contains("web_search") || lower.contains("webfetch") {
262 return ActionType::WebSearch {
263 query: extract_first_string(args, &["query", "search_query", "url"])
264 .unwrap_or_default(),
265 };
266 }
267
268 if lower.contains("read") {
269 return ActionType::FileRead {
270 path: extract_first_string(args, &["file_path", "path", "target_file"])
271 .unwrap_or_default(),
272 };
273 }
274
275 if lower.contains("edit")
276 || lower.contains("write")
277 || lower.contains("patch")
278 || lower.contains("multiedit")
279 {
280 return ActionType::FileEdit {
281 path: extract_first_string(args, &["file_path", "path", "target_file"])
282 .unwrap_or_default(),
283 };
284 }
285
286 if lower.contains("bash") || lower.contains("command") || lower.contains("run") {
287 return ActionType::CommandRun {
288 command: extract_first_string(args, &["command", "cmd"]).unwrap_or_default(),
289 };
290 }
291
292 ActionType::McpTool {
293 tool: name.to_string(),
294 }
295}
296
297fn extract_first_string(args: &Value, keys: &[&str]) -> Option<String> {
298 let object = args.as_object()?;
299
300 for key in keys {
301 if let Some(value) = object.get(*key).and_then(Value::as_str) {
302 return Some(value.to_string());
303 }
304 }
305
306 None
307}
308
309fn extract_token_usage(usage: Option<&Value>) -> Option<(u32, u32)> {
310 let usage = usage?;
311
312 if let Some(total) = value_to_u64(Some(usage)) {
313 let total = saturating_u64_to_u32(total);
314 return (total > 0).then_some((total, 0));
315 }
316
317 let object = usage.as_object()?;
318 let total = value_to_u64(object.get("input_tokens"))
319 .unwrap_or(0)
320 .saturating_add(value_to_u64(object.get("output_tokens")).unwrap_or(0))
321 .saturating_add(value_to_u64(object.get("cache_creation_input_tokens")).unwrap_or(0))
322 .saturating_add(value_to_u64(object.get("cache_read_input_tokens")).unwrap_or(0));
323 let limit = value_to_u64(object.get("limit"))
324 .or_else(|| value_to_u64(object.get("max_tokens")))
325 .unwrap_or(0);
326
327 if total == 0 && limit == 0 {
328 None
329 } else {
330 Some((saturating_u64_to_u32(total), saturating_u64_to_u32(limit)))
331 }
332}
333
334fn value_to_u64(value: Option<&Value>) -> Option<u64> {
335 match value {
336 Some(Value::Number(number)) => number
337 .as_u64()
338 .or_else(|| number.as_i64().and_then(|v| u64::try_from(v).ok())),
339 _ => None,
340 }
341}
342
343fn saturating_u64_to_u32(value: u64) -> u32 {
344 value.min(u64::from(u32::MAX)) as u32
345}
346
347fn error_type(error: &ExecutorError) -> &'static str {
348 error.error_type()
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn normalizes_claude_stream_incrementally() {
357 let mut normalizer = ClaudeCodeLogNormalizer::new();
358
359 let assistant = concat!(
360 r#"{"type":"assistant","message":{"content":[{"type":"text","text":"hello"},{"type":"thinking","thinking":"analyzing","signature":"sig"},{"type":"tool_use","id":"toolu_1","name":"Read","input":{"file_path":"src/main.rs"}}],"model":"claude-3-7-sonnet"}}"#,
361 "\n"
362 );
363 let split = assistant.len() / 2;
364
365 let first = normalizer.normalize(&assistant.as_bytes()[..split]);
366 assert!(first.is_empty());
367
368 let second = normalizer.normalize(&assistant.as_bytes()[split..]);
369 assert_eq!(second.len(), 3);
370 assert!(matches!(
371 &second[0],
372 NormalizedLog::Message {
373 role: Role::Assistant,
374 content
375 } if content == "hello"
376 ));
377 assert!(matches!(
378 &second[1],
379 NormalizedLog::Thinking { content } if content == "analyzing"
380 ));
381 assert!(matches!(
382 &second[2],
383 NormalizedLog::ToolCall {
384 name,
385 status: ToolStatus::Started,
386 ..
387 } if name == "Read"
388 ));
389
390 let tool_result = concat!(
391 r#"{"type":"user","message":{"content":[{"type":"tool_result","tool_use_id":"toolu_1","content":{"ok":true},"is_error":false}]}}"#,
392 "\n"
393 );
394 let third = normalizer.normalize(tool_result.as_bytes());
395 assert_eq!(third.len(), 1);
396 assert!(matches!(
397 &third[0],
398 NormalizedLog::ToolCall {
399 name,
400 args,
401 status: ToolStatus::Completed,
402 ..
403 } if name == "Read" && args == &serde_json::json!({"ok": true})
404 ));
405
406 let result = concat!(
407 r#"{"type":"result","subtype":"success","duration_ms":1,"duration_api_ms":1,"is_error":false,"num_turns":1,"session_id":"s1","usage":{"input_tokens":10,"output_tokens":5,"cache_creation_input_tokens":2,"cache_read_input_tokens":3}}"#,
408 "\n"
409 );
410 let fourth = normalizer.normalize(result.as_bytes());
411 assert_eq!(fourth.len(), 1);
412 assert!(matches!(
413 &fourth[0],
414 NormalizedLog::TokenUsage { total, limit } if *total == 20 && *limit == 0
415 ));
416 }
417
418 #[test]
419 fn extracts_limit_when_explicitly_present() {
420 let usage = serde_json::json!({
421 "input_tokens": 4,
422 "output_tokens": 6,
423 "limit": 100
424 });
425
426 let parsed = extract_token_usage(Some(&usage));
427 assert_eq!(parsed, Some((10, 100)));
428 }
429
430 #[test]
431 fn extracts_max_tokens_when_limit_is_absent() {
432 let usage = serde_json::json!({
433 "input_tokens": 4,
434 "output_tokens": 6,
435 "max_tokens": 100
436 });
437
438 let parsed = extract_token_usage(Some(&usage));
439 assert_eq!(parsed, Some((10, 100)));
440 }
441
442 #[test]
443 fn numeric_usage_keeps_unknown_limit() {
444 let usage = serde_json::json!(42);
445 let parsed = extract_token_usage(Some(&usage));
446 assert_eq!(parsed, Some((42, 0)));
447 }
448
449 #[test]
450 fn numeric_zero_usage_is_suppressed() {
451 let usage = serde_json::json!(0);
452 let parsed = extract_token_usage(Some(&usage));
453 assert_eq!(parsed, None);
454 }
455
456 #[test]
457 fn flush_emits_error_for_incomplete_json() {
458 let mut normalizer = ClaudeCodeLogNormalizer::new();
459
460 let partial = br#"{"type":"assistant","message":{"content":[{"type":"text","text":"hello"}],"model":"claude"}"#;
461 let logs = normalizer.normalize(partial);
462 assert!(logs.is_empty());
463
464 let flushed = normalizer.flush();
465 assert_eq!(flushed.len(), 1);
466 assert!(matches!(
467 &flushed[0],
468 NormalizedLog::Error { error_type, .. } if error_type == "execution_failed"
469 ));
470 }
471
472 #[test]
473 fn invalid_utf8_is_reported_as_error_log() {
474 let mut normalizer = ClaudeCodeLogNormalizer::new();
475 let logs = normalizer.normalize(&[0xFF, b'\n']);
476
477 assert_eq!(logs.len(), 1);
478 assert!(matches!(
479 &logs[0],
480 NormalizedLog::Error { error_type, .. } if error_type == "execution_failed"
481 ));
482 }
483
484 #[test]
485 fn flush_clears_pending_tool_calls() {
486 let mut normalizer = ClaudeCodeLogNormalizer::new();
487
488 let tool_start = concat!(
489 r#"{"type":"assistant","message":{"content":[{"type":"tool_use","id":"toolu_1","name":"Read","input":{"file_path":"src/main.rs"}}],"model":"claude-3-7-sonnet"}}"#,
490 "\n"
491 );
492 let started = normalizer.normalize(tool_start.as_bytes());
493 assert!(matches!(
494 started.as_slice(),
495 [NormalizedLog::ToolCall {
496 status: ToolStatus::Started,
497 ..
498 }]
499 ));
500
501 let _ = normalizer.flush();
502
503 let tool_result = concat!(
504 r#"{"type":"user","message":{"content":[{"type":"tool_result","tool_use_id":"toolu_1","content":{"ok":true},"is_error":false}]}}"#,
505 "\n"
506 );
507 let logs_after_flush = normalizer.normalize(tool_result.as_bytes());
508 assert!(logs_after_flush.is_empty());
509 }
510
511 #[test]
512 fn ignores_whitespace_only_user_text_blocks() {
513 let mut normalizer = ClaudeCodeLogNormalizer::new();
514
515 let user_message = concat!(
516 r#"{"type":"user","message":{"content":[{"type":"text","text":" "},{"type":"text","text":"hello"}]}}"#,
517 "\n"
518 );
519 let logs = normalizer.normalize(user_message.as_bytes());
520
521 assert_eq!(logs.len(), 1);
522 assert!(matches!(
523 &logs[0],
524 NormalizedLog::Message {
525 role: Role::User,
526 content
527 } if content == "hello"
528 ));
529 }
530}