Skip to main content

unified_agent_sdk/providers/claude_code/
normalizer.rs

1//! Claude Code log normalizer.
2
3use std::collections::HashMap;
4
5use claude_code::{ContentBlock, Message, ToolResultBlock, UserContent, parse_message};
6use serde_json::Value;
7
8use crate::error::ExecutorError;
9use crate::log::{ActionType, LogNormalizer, NormalizedLog};
10use crate::types::{Role, ToolStatus};
11
12#[derive(Debug, Clone)]
13struct PendingToolCall {
14    name: String,
15    args: Value,
16    action: ActionType,
17}
18
19/// Normalizes Claude Code JSON stream messages into `NormalizedLog` entries.
20#[derive(Default)]
21pub struct ClaudeCodeLogNormalizer {
22    line_buffer: Vec<u8>,
23    json_buffer: String,
24    pending_tools: HashMap<String, PendingToolCall>,
25}
26
27impl ClaudeCodeLogNormalizer {
28    /// Creates a new stateful normalizer for Claude Code JSON stream chunks.
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    fn process_line(&mut self, line: &[u8]) -> Result<Vec<NormalizedLog>, ExecutorError> {
34        let line = std::str::from_utf8(line).map_err(|err| {
35            ExecutorError::execution_failed("failed to decode claude log chunk as UTF-8", err)
36        })?;
37        let trimmed = line.trim();
38        if trimmed.is_empty() {
39            return Ok(Vec::new());
40        }
41
42        self.json_buffer.push_str(trimmed);
43
44        match serde_json::from_str::<Value>(&self.json_buffer) {
45            Ok(value) => {
46                self.json_buffer.clear();
47                self.process_value(value)
48            }
49            Err(err) if err.is_eof() => Ok(Vec::new()),
50            Err(err) => {
51                self.json_buffer.clear();
52                Err(ExecutorError::Serialization(err))
53            }
54        }
55    }
56
57    fn process_value(&mut self, value: Value) -> Result<Vec<NormalizedLog>, ExecutorError> {
58        let message = parse_message(&value).map_err(|err| {
59            ExecutorError::execution_failed("failed to parse claude sdk message", err)
60        })?;
61        let Some(message) = message else {
62            return Ok(Vec::new());
63        };
64
65        Ok(self.normalize_message(message))
66    }
67
68    fn normalize_message(&mut self, message: Message) -> Vec<NormalizedLog> {
69        match message {
70            Message::User(user) => self.normalize_user_content(user.content),
71            Message::Assistant(assistant) => self.normalize_assistant_content(assistant.content),
72            Message::System(system) => vec![NormalizedLog::Message {
73                role: Role::System,
74                content: system.subtype,
75            }],
76            Message::Result(result) => {
77                let mut logs = Vec::new();
78
79                if let Some((total, limit)) = extract_token_usage(result.usage.as_ref()) {
80                    logs.push(NormalizedLog::TokenUsage { total, limit });
81                }
82
83                if result.is_error {
84                    logs.push(NormalizedLog::Error {
85                        error_type: "result_error".to_string(),
86                        message: result.result.unwrap_or_else(|| {
87                            format!("Claude Code result subtype: {}", result.subtype)
88                        }),
89                    });
90                }
91
92                logs
93            }
94            Message::StreamEvent(_) => Vec::new(),
95        }
96    }
97
98    fn normalize_user_content(&mut self, content: UserContent) -> Vec<NormalizedLog> {
99        match content {
100            UserContent::Text(text) => {
101                if text.trim().is_empty() {
102                    Vec::new()
103                } else {
104                    vec![NormalizedLog::Message {
105                        role: Role::User,
106                        content: text,
107                    }]
108                }
109            }
110            UserContent::Blocks(blocks) => {
111                let mut logs = Vec::new();
112                for block in blocks {
113                    match block {
114                        ContentBlock::Text(text) => {
115                            if !text.text.trim().is_empty() {
116                                logs.push(NormalizedLog::Message {
117                                    role: Role::User,
118                                    content: text.text,
119                                });
120                            }
121                        }
122                        ContentBlock::ToolResult(result) => {
123                            self.push_tool_result_log(&mut logs, result);
124                        }
125                        _ => {}
126                    }
127                }
128                logs
129            }
130        }
131    }
132
133    fn normalize_assistant_content(&mut self, blocks: Vec<ContentBlock>) -> Vec<NormalizedLog> {
134        let mut logs = Vec::new();
135
136        for block in blocks {
137            match block {
138                ContentBlock::Text(text) => logs.push(NormalizedLog::Message {
139                    role: Role::Assistant,
140                    content: text.text,
141                }),
142                ContentBlock::Thinking(thinking) => logs.push(NormalizedLog::Thinking {
143                    content: thinking.thinking,
144                }),
145                ContentBlock::ToolUse(tool_use) => {
146                    let action = infer_action(&tool_use.name, &tool_use.input);
147                    self.pending_tools.insert(
148                        tool_use.id.clone(),
149                        PendingToolCall {
150                            name: tool_use.name.clone(),
151                            args: tool_use.input.clone(),
152                            action: action.clone(),
153                        },
154                    );
155
156                    logs.push(NormalizedLog::ToolCall {
157                        name: tool_use.name,
158                        args: tool_use.input,
159                        status: ToolStatus::Started,
160                        action,
161                    });
162                }
163                ContentBlock::ToolResult(result) => {
164                    self.push_tool_result_log(&mut logs, result);
165                }
166            }
167        }
168
169        logs
170    }
171
172    fn push_tool_result_log(&mut self, logs: &mut Vec<NormalizedLog>, result: ToolResultBlock) {
173        let ToolResultBlock {
174            tool_use_id,
175            content,
176            is_error,
177        } = result;
178        if let Some(pending) = self.pending_tools.remove(&tool_use_id) {
179            logs.push(NormalizedLog::ToolCall {
180                name: pending.name,
181                args: content.unwrap_or(pending.args),
182                status: if is_error.unwrap_or(false) {
183                    ToolStatus::Failed
184                } else {
185                    ToolStatus::Completed
186                },
187                action: pending.action,
188            });
189        }
190    }
191
192    fn error_to_log(error: ExecutorError) -> NormalizedLog {
193        NormalizedLog::Error {
194            error_type: error_type(&error).to_string(),
195            message: error.to_string(),
196        }
197    }
198}
199
200impl LogNormalizer for ClaudeCodeLogNormalizer {
201    fn normalize(&mut self, chunk: &[u8]) -> Vec<NormalizedLog> {
202        let mut logs = Vec::new();
203
204        for &byte in chunk {
205            if byte == b'\n' {
206                let line = std::mem::take(&mut self.line_buffer);
207                match self.process_line(&line) {
208                    Ok(mut parsed) => logs.append(&mut parsed),
209                    Err(error) => logs.push(Self::error_to_log(error)),
210                }
211            } else {
212                self.line_buffer.push(byte);
213            }
214        }
215
216        logs
217    }
218
219    fn flush(&mut self) -> Vec<NormalizedLog> {
220        let mut logs = Vec::new();
221
222        if !self.line_buffer.is_empty() {
223            let line = std::mem::take(&mut self.line_buffer);
224            match self.process_line(&line) {
225                Ok(mut parsed) => logs.append(&mut parsed),
226                Err(error) => logs.push(Self::error_to_log(error)),
227            }
228        }
229
230        if !self.json_buffer.trim().is_empty() {
231            let buffer_len = self.json_buffer.len();
232            let message = format!(
233                "incomplete Claude Code JSON message buffered at flush: <redacted> (buffer_len={buffer_len})"
234            );
235            self.json_buffer.clear();
236            logs.push(Self::error_to_log(ExecutorError::execution_failed(
237                "failed to flush claude code log stream",
238                message,
239            )));
240        }
241
242        self.pending_tools.clear();
243
244        logs
245    }
246}
247
248fn infer_action(name: &str, args: &Value) -> ActionType {
249    let lower = name.to_ascii_lowercase();
250
251    if lower.starts_with("mcp__") {
252        return ActionType::McpTool {
253            tool: name.to_string(),
254        };
255    }
256
257    if lower.contains("askuser") || lower.contains("ask_user") {
258        return ActionType::AskUser;
259    }
260
261    if lower.contains("websearch") || lower.contains("web_search") || lower.contains("webfetch") {
262        return ActionType::WebSearch {
263            query: extract_first_string(args, &["query", "search_query", "url"])
264                .unwrap_or_default(),
265        };
266    }
267
268    if lower.contains("read") {
269        return ActionType::FileRead {
270            path: extract_first_string(args, &["file_path", "path", "target_file"])
271                .unwrap_or_default(),
272        };
273    }
274
275    if lower.contains("edit")
276        || lower.contains("write")
277        || lower.contains("patch")
278        || lower.contains("multiedit")
279    {
280        return ActionType::FileEdit {
281            path: extract_first_string(args, &["file_path", "path", "target_file"])
282                .unwrap_or_default(),
283        };
284    }
285
286    if lower.contains("bash") || lower.contains("command") || lower.contains("run") {
287        return ActionType::CommandRun {
288            command: extract_first_string(args, &["command", "cmd"]).unwrap_or_default(),
289        };
290    }
291
292    ActionType::McpTool {
293        tool: name.to_string(),
294    }
295}
296
297fn extract_first_string(args: &Value, keys: &[&str]) -> Option<String> {
298    let object = args.as_object()?;
299
300    for key in keys {
301        if let Some(value) = object.get(*key).and_then(Value::as_str) {
302            return Some(value.to_string());
303        }
304    }
305
306    None
307}
308
309fn extract_token_usage(usage: Option<&Value>) -> Option<(u32, u32)> {
310    let usage = usage?;
311
312    if let Some(total) = value_to_u64(Some(usage)) {
313        let total = saturating_u64_to_u32(total);
314        return (total > 0).then_some((total, 0));
315    }
316
317    let object = usage.as_object()?;
318    let total = value_to_u64(object.get("input_tokens"))
319        .unwrap_or(0)
320        .saturating_add(value_to_u64(object.get("output_tokens")).unwrap_or(0))
321        .saturating_add(value_to_u64(object.get("cache_creation_input_tokens")).unwrap_or(0))
322        .saturating_add(value_to_u64(object.get("cache_read_input_tokens")).unwrap_or(0));
323    let limit = value_to_u64(object.get("limit"))
324        .or_else(|| value_to_u64(object.get("max_tokens")))
325        .unwrap_or(0);
326
327    if total == 0 && limit == 0 {
328        None
329    } else {
330        Some((saturating_u64_to_u32(total), saturating_u64_to_u32(limit)))
331    }
332}
333
334fn value_to_u64(value: Option<&Value>) -> Option<u64> {
335    match value {
336        Some(Value::Number(number)) => number
337            .as_u64()
338            .or_else(|| number.as_i64().and_then(|v| u64::try_from(v).ok())),
339        _ => None,
340    }
341}
342
343fn saturating_u64_to_u32(value: u64) -> u32 {
344    value.min(u64::from(u32::MAX)) as u32
345}
346
347fn error_type(error: &ExecutorError) -> &'static str {
348    error.error_type()
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn normalizes_claude_stream_incrementally() {
357        let mut normalizer = ClaudeCodeLogNormalizer::new();
358
359        let assistant = concat!(
360            r#"{"type":"assistant","message":{"content":[{"type":"text","text":"hello"},{"type":"thinking","thinking":"analyzing","signature":"sig"},{"type":"tool_use","id":"toolu_1","name":"Read","input":{"file_path":"src/main.rs"}}],"model":"claude-3-7-sonnet"}}"#,
361            "\n"
362        );
363        let split = assistant.len() / 2;
364
365        let first = normalizer.normalize(&assistant.as_bytes()[..split]);
366        assert!(first.is_empty());
367
368        let second = normalizer.normalize(&assistant.as_bytes()[split..]);
369        assert_eq!(second.len(), 3);
370        assert!(matches!(
371            &second[0],
372            NormalizedLog::Message {
373                role: Role::Assistant,
374                content
375            } if content == "hello"
376        ));
377        assert!(matches!(
378            &second[1],
379            NormalizedLog::Thinking { content } if content == "analyzing"
380        ));
381        assert!(matches!(
382            &second[2],
383            NormalizedLog::ToolCall {
384                name,
385                status: ToolStatus::Started,
386                ..
387            } if name == "Read"
388        ));
389
390        let tool_result = concat!(
391            r#"{"type":"user","message":{"content":[{"type":"tool_result","tool_use_id":"toolu_1","content":{"ok":true},"is_error":false}]}}"#,
392            "\n"
393        );
394        let third = normalizer.normalize(tool_result.as_bytes());
395        assert_eq!(third.len(), 1);
396        assert!(matches!(
397            &third[0],
398            NormalizedLog::ToolCall {
399                name,
400                args,
401                status: ToolStatus::Completed,
402                ..
403            } if name == "Read" && args == &serde_json::json!({"ok": true})
404        ));
405
406        let result = concat!(
407            r#"{"type":"result","subtype":"success","duration_ms":1,"duration_api_ms":1,"is_error":false,"num_turns":1,"session_id":"s1","usage":{"input_tokens":10,"output_tokens":5,"cache_creation_input_tokens":2,"cache_read_input_tokens":3}}"#,
408            "\n"
409        );
410        let fourth = normalizer.normalize(result.as_bytes());
411        assert_eq!(fourth.len(), 1);
412        assert!(matches!(
413            &fourth[0],
414            NormalizedLog::TokenUsage { total, limit } if *total == 20 && *limit == 0
415        ));
416    }
417
418    #[test]
419    fn extracts_limit_when_explicitly_present() {
420        let usage = serde_json::json!({
421            "input_tokens": 4,
422            "output_tokens": 6,
423            "limit": 100
424        });
425
426        let parsed = extract_token_usage(Some(&usage));
427        assert_eq!(parsed, Some((10, 100)));
428    }
429
430    #[test]
431    fn extracts_max_tokens_when_limit_is_absent() {
432        let usage = serde_json::json!({
433            "input_tokens": 4,
434            "output_tokens": 6,
435            "max_tokens": 100
436        });
437
438        let parsed = extract_token_usage(Some(&usage));
439        assert_eq!(parsed, Some((10, 100)));
440    }
441
442    #[test]
443    fn numeric_usage_keeps_unknown_limit() {
444        let usage = serde_json::json!(42);
445        let parsed = extract_token_usage(Some(&usage));
446        assert_eq!(parsed, Some((42, 0)));
447    }
448
449    #[test]
450    fn numeric_zero_usage_is_suppressed() {
451        let usage = serde_json::json!(0);
452        let parsed = extract_token_usage(Some(&usage));
453        assert_eq!(parsed, None);
454    }
455
456    #[test]
457    fn flush_emits_error_for_incomplete_json() {
458        let mut normalizer = ClaudeCodeLogNormalizer::new();
459
460        let partial = br#"{"type":"assistant","message":{"content":[{"type":"text","text":"hello"}],"model":"claude"}"#;
461        let logs = normalizer.normalize(partial);
462        assert!(logs.is_empty());
463
464        let flushed = normalizer.flush();
465        assert_eq!(flushed.len(), 1);
466        assert!(matches!(
467            &flushed[0],
468            NormalizedLog::Error { error_type, .. } if error_type == "execution_failed"
469        ));
470    }
471
472    #[test]
473    fn invalid_utf8_is_reported_as_error_log() {
474        let mut normalizer = ClaudeCodeLogNormalizer::new();
475        let logs = normalizer.normalize(&[0xFF, b'\n']);
476
477        assert_eq!(logs.len(), 1);
478        assert!(matches!(
479            &logs[0],
480            NormalizedLog::Error { error_type, .. } if error_type == "execution_failed"
481        ));
482    }
483
484    #[test]
485    fn flush_clears_pending_tool_calls() {
486        let mut normalizer = ClaudeCodeLogNormalizer::new();
487
488        let tool_start = concat!(
489            r#"{"type":"assistant","message":{"content":[{"type":"tool_use","id":"toolu_1","name":"Read","input":{"file_path":"src/main.rs"}}],"model":"claude-3-7-sonnet"}}"#,
490            "\n"
491        );
492        let started = normalizer.normalize(tool_start.as_bytes());
493        assert!(matches!(
494            started.as_slice(),
495            [NormalizedLog::ToolCall {
496                status: ToolStatus::Started,
497                ..
498            }]
499        ));
500
501        let _ = normalizer.flush();
502
503        let tool_result = concat!(
504            r#"{"type":"user","message":{"content":[{"type":"tool_result","tool_use_id":"toolu_1","content":{"ok":true},"is_error":false}]}}"#,
505            "\n"
506        );
507        let logs_after_flush = normalizer.normalize(tool_result.as_bytes());
508        assert!(logs_after_flush.is_empty());
509    }
510
511    #[test]
512    fn ignores_whitespace_only_user_text_blocks() {
513        let mut normalizer = ClaudeCodeLogNormalizer::new();
514
515        let user_message = concat!(
516            r#"{"type":"user","message":{"content":[{"type":"text","text":"   "},{"type":"text","text":"hello"}]}}"#,
517            "\n"
518        );
519        let logs = normalizer.normalize(user_message.as_bytes());
520
521        assert_eq!(logs.len(), 1);
522        assert!(matches!(
523            &logs[0],
524            NormalizedLog::Message {
525                role: Role::User,
526                content
527            } if content == "hello"
528        ));
529    }
530}