use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamEvent {
System(SystemEventData),
Assistant(AssistantEventData),
User(UserEventData),
Result(ResultEventData),
RateLimitEvent(RateLimitEventData),
#[serde(other)]
Other,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemEventData {
pub subtype: String,
pub session_id: String,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub cwd: Option<String>,
#[serde(default)]
pub tools: Vec<String>,
#[serde(default)]
pub permission_mode: Option<String>,
#[serde(default)]
pub claude_code_version: Option<String>,
#[serde(flatten)]
pub extra: serde_json::Map<String, Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssistantEventData {
pub message: Value,
pub session_id: String,
pub uuid: String,
#[serde(default)]
pub parent_tool_use_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserEventData {
pub message: Value,
pub session_id: String,
pub uuid: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultEventData {
pub subtype: String,
pub is_error: bool,
#[serde(default)]
pub duration_ms: Option<u64>,
#[serde(default)]
pub duration_api_ms: Option<u64>,
#[serde(default)]
pub num_turns: Option<u32>,
#[serde(default)]
pub result: Option<String>,
pub session_id: String,
#[serde(default)]
pub total_cost_usd: Option<f64>,
#[serde(default)]
pub stop_reason: Option<String>,
#[serde(default)]
pub terminal_reason: Option<String>,
#[serde(default)]
pub usage: Value,
#[serde(default)]
pub model_usage: Value,
#[serde(flatten)]
pub extra: serde_json::Map<String, Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RateLimitEventData {
pub rate_limit_info: Value,
pub session_id: String,
pub uuid: String,
}
pub fn parse_line(line: &str) -> Result<Option<StreamEvent>, serde_json::Error> {
let trimmed = line.trim();
if trimmed.is_empty() {
return Ok(None);
}
serde_json::from_str(trimmed).map(Some)
}
impl StreamEvent {
pub fn type_name(&self) -> &'static str {
match self {
StreamEvent::System(_) => "system",
StreamEvent::Assistant(_) => "assistant",
StreamEvent::User(_) => "user",
StreamEvent::Result(_) => "result",
StreamEvent::RateLimitEvent(_) => "rate_limit_event",
StreamEvent::Other => "other",
}
}
pub fn session_id(&self) -> Option<&str> {
match self {
StreamEvent::System(s) => Some(&s.session_id),
StreamEvent::Assistant(a) => Some(&a.session_id),
StreamEvent::User(u) => Some(&u.session_id),
StreamEvent::Result(r) => Some(&r.session_id),
StreamEvent::RateLimitEvent(r) => Some(&r.session_id),
StreamEvent::Other => None,
}
}
pub fn assistant_text(&self) -> Option<String> {
let StreamEvent::Assistant(a) = self else {
return None;
};
let content = a.message.get("content")?.as_array()?;
let mut out = String::new();
for block in content {
if block.get("type").and_then(Value::as_str) == Some("text") {
if let Some(text) = block.get("text").and_then(Value::as_str) {
out.push_str(text);
}
}
}
if out.is_empty() {
None
} else {
Some(out)
}
}
pub fn tool_uses(&self) -> Vec<ToolUseRequest> {
let StreamEvent::Assistant(a) = self else {
return Vec::new();
};
let Some(content) = a.message.get("content").and_then(Value::as_array) else {
return Vec::new();
};
let mut out = Vec::new();
for block in content {
if block.get("type").and_then(Value::as_str) == Some("tool_use") {
let id = block
.get("id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let name = block
.get("name")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let input = block.get("input").cloned().unwrap_or(Value::Null);
out.push(ToolUseRequest { id, name, input });
}
}
out
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolUseRequest {
pub id: String,
pub name: String,
pub input: Value,
}
#[cfg(test)]
mod tests {
use super::*;
const FIXTURE_SYSTEM: &str = r#"{"type":"system","subtype":"init","cwd":"/private/tmp/x","session_id":"sess-1","tools":["Bash","Read","Edit"],"model":"claude-opus-4-7[1m]","permissionMode":"auto","claude_code_version":"2.1.138","apiKeySource":"none"}"#;
const FIXTURE_ASSISTANT: &str = r#"{"type":"assistant","message":{"model":"claude-opus-4-7","id":"msg_1","type":"message","role":"assistant","content":[{"type":"text","text":"ok"}],"usage":{"input_tokens":6,"output_tokens":1}},"parent_tool_use_id":null,"session_id":"sess-1","uuid":"u-1"}"#;
const FIXTURE_RESULT: &str = r#"{"type":"result","subtype":"success","is_error":false,"duration_ms":1742,"duration_api_ms":2572,"num_turns":1,"result":"ok","stop_reason":"end_turn","session_id":"sess-1","total_cost_usd":0.19,"usage":{},"modelUsage":{},"permission_denials":[],"terminal_reason":"completed","uuid":"r-1"}"#;
const FIXTURE_RATE_LIMIT: &str = r#"{"type":"rate_limit_event","rate_limit_info":{"status":"allowed_warning","resetsAt":1778443200,"utilization":0.93},"uuid":"rl-1","session_id":"sess-1"}"#;
const FIXTURE_TOOL_USE: &str = r#"{"type":"assistant","message":{"role":"assistant","content":[{"type":"tool_use","id":"tu_1","name":"Read","input":{"file_path":"/tmp/x.txt"}}],"usage":{"input_tokens":10,"output_tokens":20}},"session_id":"sess-1","uuid":"u-2"}"#;
#[test]
fn parses_system_event() {
let e = parse_line(FIXTURE_SYSTEM).unwrap().unwrap();
let StreamEvent::System(s) = &e else {
panic!("expected System, got {e:?}");
};
assert_eq!(s.subtype, "init");
assert_eq!(s.session_id, "sess-1");
assert_eq!(s.model.as_deref(), Some("claude-opus-4-7[1m]"));
assert!(s.tools.contains(&"Bash".to_string()));
assert_eq!(e.session_id(), Some("sess-1"));
assert!(s.extra.contains_key("apiKeySource"));
}
#[test]
fn parses_assistant_event_and_extracts_text() {
let e = parse_line(FIXTURE_ASSISTANT).unwrap().unwrap();
assert_eq!(e.type_name(), "assistant");
assert_eq!(e.assistant_text().as_deref(), Some("ok"));
assert!(e.tool_uses().is_empty());
}
#[test]
fn parses_tool_use_block() {
let e = parse_line(FIXTURE_TOOL_USE).unwrap().unwrap();
let uses = e.tool_uses();
assert_eq!(uses.len(), 1);
assert_eq!(uses[0].name, "Read");
assert_eq!(uses[0].id, "tu_1");
assert_eq!(
uses[0].input.get("file_path").and_then(Value::as_str),
Some("/tmp/x.txt")
);
assert!(e.assistant_text().is_none());
}
#[test]
fn parses_result_event() {
let e = parse_line(FIXTURE_RESULT).unwrap().unwrap();
let StreamEvent::Result(r) = &e else {
panic!("expected Result, got {e:?}");
};
assert!(!r.is_error);
assert_eq!(r.result.as_deref(), Some("ok"));
assert_eq!(r.duration_ms, Some(1742));
assert_eq!(r.num_turns, Some(1));
assert!((r.total_cost_usd.unwrap() - 0.19).abs() < 1e-9);
assert!(r.extra.contains_key("permission_denials"));
}
#[test]
fn parses_rate_limit_event() {
let e = parse_line(FIXTURE_RATE_LIMIT).unwrap().unwrap();
let StreamEvent::RateLimitEvent(r) = &e else {
panic!("expected RateLimitEvent, got {e:?}");
};
assert_eq!(r.session_id, "sess-1");
assert_eq!(
r.rate_limit_info
.get("status")
.and_then(Value::as_str),
Some("allowed_warning")
);
}
#[test]
fn unknown_type_falls_through_to_other() {
let line = r#"{"type":"future_event_we_havent_seen_yet","payload":{"x":1}}"#;
let e = parse_line(line).unwrap().unwrap();
assert!(matches!(e, StreamEvent::Other));
assert_eq!(e.type_name(), "other");
}
#[test]
fn empty_line_is_skipped() {
assert!(parse_line("").unwrap().is_none());
assert!(parse_line(" \n").unwrap().is_none());
}
#[test]
fn malformed_json_returns_err() {
assert!(parse_line("{broken").is_err());
}
#[test]
fn full_session_round_trips() {
let lines = [
FIXTURE_SYSTEM,
FIXTURE_RATE_LIMIT,
FIXTURE_ASSISTANT,
FIXTURE_RESULT,
];
let events: Vec<StreamEvent> = lines
.iter()
.map(|l| parse_line(l).unwrap().unwrap())
.collect();
assert!(matches!(events[0], StreamEvent::System(_)));
let StreamEvent::Result(r) = &events[3] else {
panic!("expected final Result");
};
assert_eq!(r.result.as_deref(), Some("ok"));
let assistant_text: String = events
.iter()
.filter_map(StreamEvent::assistant_text)
.collect();
assert_eq!(assistant_text, "ok");
}
}