use crate::control_plane::telemetry::{CompletionEvent, TokenBreakdown};
use chrono::Utc;
#[derive(Debug, Clone, PartialEq)]
pub enum ParsedOutput {
Completion(CompletionEvent),
Ignored,
Unparseable(String),
}
pub fn parse_opencode_line(
line: &str,
session_id: &str,
model: &str,
start_timestamp: Option<i64>,
) -> ParsedOutput {
let line = line.trim();
if line.is_empty() {
return ParsedOutput::Ignored;
}
let Ok(value) = serde_json::from_str::<serde_json::Value>(line) else {
return ParsedOutput::Unparseable(line.to_string());
};
let event_type = value.get("type").and_then(|v| v.as_str()).unwrap_or("");
match event_type {
"step_finish" => parse_opencode_step_finish(&value, session_id, model, start_timestamp),
"step_start" | "text" | "tool_use" | "tool_result" => ParsedOutput::Ignored,
_ => ParsedOutput::Ignored,
}
}
fn parse_opencode_step_finish(
value: &serde_json::Value,
session_id: &str,
model: &str,
start_timestamp: Option<i64>,
) -> ParsedOutput {
let part = match value.get("part") {
Some(p) => p,
None => return ParsedOutput::Unparseable(value.to_string()),
};
let tokens = part
.get("tokens")
.map(|t| TokenBreakdown {
total: t.get("total").and_then(|v| v.as_u64()).unwrap_or(0),
input: t.get("input").and_then(|v| v.as_u64()).unwrap_or(0),
output: t.get("output").and_then(|v| v.as_u64()).unwrap_or(0),
reasoning: t.get("reasoning").and_then(|v| v.as_u64()).unwrap_or(0),
cache_read: t
.get("cache")
.and_then(|c| c.get("read"))
.and_then(|v| v.as_u64())
.unwrap_or(0),
cache_write: t
.get("cache")
.and_then(|c| c.get("write"))
.and_then(|v| v.as_u64())
.unwrap_or(0),
})
.unwrap_or_default();
let cost_usd = value.get("cost").and_then(|v| v.as_f64()).unwrap_or(0.0);
let finish_timestamp = value
.get("timestamp")
.and_then(|v| v.as_i64())
.unwrap_or_else(|| Utc::now().timestamp_millis());
let latency_ms = start_timestamp
.map(|start| {
let diff = finish_timestamp - start;
if diff > 0 {
diff as u64
} else {
0
}
})
.unwrap_or(0);
let reason = part
.get("reason")
.and_then(|v| v.as_str())
.unwrap_or("stop");
let success = reason == "stop";
let error = if success {
None
} else {
Some(format!("completion ended with reason: {}", reason))
};
ParsedOutput::Completion(CompletionEvent {
model: model.to_string(),
session_id: session_id.to_string(),
completed_at: Utc::now(),
latency_ms,
success,
tokens,
cost_usd,
error,
})
}
pub fn parse_claude_line(line: &str, session_id: &str, model: &str) -> ParsedOutput {
let line = line.trim();
if line.is_empty() {
return ParsedOutput::Ignored;
}
let Ok(value) = serde_json::from_str::<serde_json::Value>(line) else {
return ParsedOutput::Unparseable(line.to_string());
};
let event_type = value.get("type").and_then(|v| v.as_str()).unwrap_or("");
match event_type {
"result" => parse_claude_result(&value, session_id, model),
"assistant" | "user" | "system" => ParsedOutput::Ignored,
_ => ParsedOutput::Ignored,
}
}
fn parse_claude_result(value: &serde_json::Value, session_id: &str, model: &str) -> ParsedOutput {
let usage = value.get("usage");
let tokens = usage
.map(|u| TokenBreakdown {
total: u.get("total_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
input: u.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
output: u.get("output_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
reasoning: 0,
cache_read: u
.get("cache_read_input_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0),
cache_write: u
.get("cache_creation_input_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0),
})
.unwrap_or_default();
let cost_usd = value
.get("cost_usd")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let duration_ms = value
.get("duration_ms")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let is_error = value
.get("is_error")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let error = if is_error {
value
.get("error")
.and_then(|v| v.as_str())
.map(String::from)
} else {
None
};
ParsedOutput::Completion(CompletionEvent {
model: model.to_string(),
session_id: session_id.to_string(),
completed_at: Utc::now(),
latency_ms: duration_ms,
success: !is_error,
tokens,
cost_usd,
error,
})
}
pub fn parse_pi_rust_line(line: &str, session_id: &str, model: &str) -> ParsedOutput {
let line = line.trim();
if line.is_empty() {
return ParsedOutput::Ignored;
}
let Ok(value) = serde_json::from_str::<serde_json::Value>(line) else {
return ParsedOutput::Unparseable(line.to_string());
};
let event_type = value.get("type").and_then(|v| v.as_str()).unwrap_or("");
match event_type {
"turn_end" => parse_pi_rust_turn_end(&value, session_id, model),
"session" | "agent_start" | "agent_end" | "message_start" | "message_end"
| "message_update" | "turn_start" => ParsedOutput::Ignored,
_ => ParsedOutput::Ignored,
}
}
fn parse_pi_rust_turn_end(
value: &serde_json::Value,
session_id: &str,
model: &str,
) -> ParsedOutput {
let message = match value.get("message") {
Some(m) => m,
None => return ParsedOutput::Unparseable(value.to_string()),
};
let usage = message.get("usage");
let tokens = usage
.map(|u| TokenBreakdown {
total: u.get("totalTokens").and_then(|v| v.as_u64()).unwrap_or(0),
input: u.get("input").and_then(|v| v.as_u64()).unwrap_or(0),
output: u.get("output").and_then(|v| v.as_u64()).unwrap_or(0),
reasoning: 0,
cache_read: u.get("cacheRead").and_then(|v| v.as_u64()).unwrap_or(0),
cache_write: u.get("cacheWrite").and_then(|v| v.as_u64()).unwrap_or(0),
})
.unwrap_or_default();
let cost_usd = usage
.and_then(|u| u.get("cost"))
.and_then(|c| c.get("total"))
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let latency_ms = value
.get("latencyBreakdown")
.and_then(|lb| lb.get("totalMs"))
.and_then(|v| v.as_u64())
.unwrap_or(0);
let stop_reason = message
.get("stopReason")
.and_then(|v| v.as_str())
.unwrap_or("stop");
let success = stop_reason == "stop";
let error = if success {
None
} else {
Some(format!("completion ended with reason: {}", stop_reason))
};
ParsedOutput::Completion(CompletionEvent {
model: model.to_string(),
session_id: session_id.to_string(),
completed_at: Utc::now(),
latency_ms,
success,
tokens,
cost_usd,
error,
})
}
pub fn parse_stderr_for_limit_errors(stderr: &str) -> Option<String> {
for line in stderr.lines() {
let lower = line.to_lowercase();
if lower.contains("weekly session limit")
|| lower.contains("monthly limit")
|| lower.contains("rate limit")
|| lower.contains("quota exceeded")
|| lower.contains("429")
|| lower.contains("capacity limit")
|| lower.contains("spending limit")
|| lower.contains("subscription limit")
|| lower.contains("usage limit")
|| lower.contains("hit your limit")
|| lower.contains("you've hit your limit")
|| lower.contains("plan limit")
|| lower.contains("tier limit")
|| lower.contains("usage cap")
|| lower.contains("daily limit")
|| lower.contains("hourly limit")
|| lower.contains("out of quota")
|| lower.contains("quota exhausted")
|| lower.contains("subscription quota")
|| lower.contains("insufficient balance")
|| lower.contains("insufficient_quota")
{
return Some(line.to_string());
}
}
None
}
pub fn parse_opencode_output(stdout: &str, session_id: &str, model: &str) -> Vec<CompletionEvent> {
let mut start_ts: Option<i64> = None;
let mut events = Vec::new();
for line in stdout.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<serde_json::Value>(line) {
if value.get("type").and_then(|v| v.as_str()) == Some("step_start") {
start_ts = value.get("timestamp").and_then(|v| v.as_i64());
continue;
}
}
if let ParsedOutput::Completion(event) =
parse_opencode_line(line, session_id, model, start_ts)
{
events.push(event);
}
}
events
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_opencode_step_finish() {
let line = r#"{"type":"step_finish","timestamp":1775767451123,"sessionID":"ses_test","part":{"id":"prt_1","reason":"stop","snapshot":"abc","messageID":"msg_1","sessionID":"ses_test","type":"step-finish","tokens":{"total":48432,"input":45327,"output":97,"reasoning":0,"cache":{"write":0,"read":3008}},"cost":0}}"#;
let result = parse_opencode_line(
line,
"sess-1",
"zai-coding-plan/glm-5.1",
Some(1775767450000i64),
);
match result {
ParsedOutput::Completion(event) => {
assert_eq!(event.model, "zai-coding-plan/glm-5.1");
assert_eq!(event.session_id, "sess-1");
assert!(event.success);
assert_eq!(event.tokens.total, 48432);
assert_eq!(event.tokens.input, 45327);
assert_eq!(event.tokens.output, 97);
assert_eq!(event.tokens.cache_read, 3008);
assert_eq!(event.tokens.cache_write, 0);
assert_eq!(event.latency_ms, 1123);
}
_ => panic!("Expected Completion, got {:?}", result),
}
}
#[test]
fn test_parse_opencode_ignored_events() {
assert_eq!(
parse_opencode_line(
r#"{"type":"step_start","timestamp":1234}"#,
"sess-1",
"model-a",
None
),
ParsedOutput::Ignored
);
assert_eq!(
parse_opencode_line(
r#"{"type":"text","timestamp":1234}"#,
"sess-1",
"model-a",
None
),
ParsedOutput::Ignored
);
}
#[test]
fn test_parse_opencode_unparseable() {
let result = parse_opencode_line("not json at all", "sess-1", "model-a", None);
assert!(matches!(result, ParsedOutput::Unparseable(_)));
}
#[test]
fn test_parse_opencode_non_stop_reason() {
let line = r#"{"type":"step_finish","timestamp":1775767451123,"sessionID":"ses_test","part":{"id":"prt_1","reason":"error","type":"step-finish","tokens":{"total":100,"input":50,"output":50,"reasoning":0,"cache":{"write":0,"read":0}},"cost":0}}"#;
let result = parse_opencode_line(line, "sess-1", "model-a", Some(1775767450000i64));
match result {
ParsedOutput::Completion(event) => {
assert!(!event.success);
assert!(event.error.is_some());
assert!(event.error.unwrap().contains("error"));
}
_ => panic!("Expected Completion"),
}
}
#[test]
fn test_parse_full_output() {
let stdout = r#"
{"type":"step_start","timestamp":1000,"sessionID":"ses_test","part":{"type":"step-start"}}
{"type":"text","timestamp":1001,"sessionID":"ses_test","part":{"type":"text","text":"Hello"}}
{"type":"step_finish","timestamp":2000,"sessionID":"ses_test","part":{"type":"step-finish","tokens":{"total":500,"input":400,"output":100,"reasoning":0,"cache":{"write":0,"read":0}},"cost":0.01}}
"#;
let events = parse_opencode_output(stdout, "sess-1", "model-a");
assert_eq!(events.len(), 1);
assert_eq!(events[0].tokens.total, 500);
assert_eq!(events[0].latency_ms, 1000);
}
#[test]
fn test_parse_stderr_limit_error() {
let stderr = "Error: weekly session limit reached\nPlease try again later";
assert!(parse_stderr_for_limit_errors(stderr).is_some());
let stderr = "connection refused\nno route to host";
assert!(parse_stderr_for_limit_errors(stderr).is_none());
}
#[test]
fn test_parse_pi_rust_turn_end() {
let line = r#"{"type":"turn_end","sessionId":"1e4e4498","turnIndex":0,"message":{"role":"assistant","content":[{"type":"text","text":"4"}],"api":"openai-completions","provider":"zai-coding-plan","model":"glm-5.1","usage":{"input":6938,"output":4,"cacheRead":64,"cacheWrite":0,"totalTokens":6942,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.01}},"stopReason":"stop"},"latencyBreakdown":{"totalMs":3905,"dominantComponent":"provider_streaming"}}"#;
let result = parse_pi_rust_line(line, "sess-1", "zai-coding-plan/glm-5.1");
match result {
ParsedOutput::Completion(event) => {
assert_eq!(event.model, "zai-coding-plan/glm-5.1");
assert_eq!(event.session_id, "sess-1");
assert!(event.success);
assert_eq!(event.tokens.total, 6942);
assert_eq!(event.tokens.input, 6938);
assert_eq!(event.tokens.output, 4);
assert_eq!(event.tokens.cache_read, 64);
assert_eq!(event.tokens.cache_write, 0);
assert_eq!(event.latency_ms, 3905);
assert!((event.cost_usd - 0.01).abs() < f64::EPSILON);
}
_ => panic!("Expected Completion, got {:?}", result),
}
}
#[test]
fn test_parse_pi_rust_ignored_events() {
assert_eq!(
parse_pi_rust_line(
r#"{"type":"session","version":3,"id":"abc"}"#,
"sess-1",
"model-a"
),
ParsedOutput::Ignored
);
assert_eq!(
parse_pi_rust_line(
r#"{"type":"agent_start","sessionId":"abc"}"#,
"sess-1",
"model-a"
),
ParsedOutput::Ignored
);
assert_eq!(
parse_pi_rust_line(
r#"{"type":"message_update","message":{}}"#,
"sess-1",
"model-a"
),
ParsedOutput::Ignored
);
}
#[test]
fn test_parse_pi_rust_unparseable() {
let result = parse_pi_rust_line("not json at all", "sess-1", "model-a");
assert!(matches!(result, ParsedOutput::Unparseable(_)));
}
#[test]
fn test_parse_pi_rust_non_stop_reason() {
let line = r#"{"type":"turn_end","sessionId":"abc","turnIndex":0,"message":{"role":"assistant","usage":{"input":100,"output":50,"cacheRead":0,"cacheWrite":0,"totalTokens":150,"cost":{"total":0.0}},"stopReason":"error"},"latencyBreakdown":{"totalMs":100}}"#;
let result = parse_pi_rust_line(line, "sess-1", "model-a");
match result {
ParsedOutput::Completion(event) => {
assert!(!event.success);
assert!(event.error.is_some());
assert!(event.error.unwrap().contains("error"));
}
_ => panic!("Expected Completion, got {:?}", result),
}
}
#[test]
fn test_parse_pi_rust_empty_line() {
assert_eq!(
parse_pi_rust_line("", "sess-1", "model-a"),
ParsedOutput::Ignored
);
assert_eq!(
parse_pi_rust_line(" ", "sess-1", "model-a"),
ParsedOutput::Ignored
);
}
}