use crate::core::rpc_protocol::{
AssistantEvent, RpcAttachment, RpcCommand, RpcEvent, TurnUsage,
};
use crate::{AgentEvent, LlmEvent, SessionEvent, StreamEvent};
pub const MAX_FRAME_BYTES: usize = 1024 * 1024;
pub fn parse_frame(line: &str, max_bytes: usize) -> Result<RpcCommand, RpcEvent> {
if line.len() > max_bytes {
return Err(RpcEvent::Error {
id: None,
message: "frame exceeds 1 MiB limit".to_string(),
});
}
serde_json::from_str::<RpcCommand>(line).map_err(|e| RpcEvent::Error {
id: None,
message: e.to_string(),
})
}
pub fn map_stream_event(ev: &StreamEvent) -> Option<RpcEvent> {
match ev {
StreamEvent::Llm(LlmEvent::Thinking(s)) => Some(RpcEvent::MessageUpdate {
event: AssistantEvent::ThinkingDelta { delta: s.clone() },
}),
StreamEvent::Llm(LlmEvent::Text(s)) => Some(RpcEvent::MessageUpdate {
event: AssistantEvent::TextDelta { delta: s.clone() },
}),
StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
Some(RpcEvent::MessageUpdate {
event: AssistantEvent::ToolcallStart {
tool_id: tool_id.clone(),
tool_name: tool_name.clone(),
},
})
}
StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
Some(RpcEvent::MessageUpdate {
event: AssistantEvent::ToolcallInputDelta {
tool_id: tool_id.clone(),
delta: delta.clone(),
},
})
}
StreamEvent::Llm(LlmEvent::ToolUse { tool_id, input, .. }) => {
Some(RpcEvent::MessageUpdate {
event: AssistantEvent::ToolcallInput {
tool_id: tool_id.clone(),
input: input.clone(),
},
})
}
StreamEvent::Llm(LlmEvent::ToolResult { tool_id, result }) => {
Some(RpcEvent::MessageUpdate {
event: AssistantEvent::ToolcallResult {
tool_id: tool_id.clone(),
result: result.clone(),
},
})
}
StreamEvent::Llm(LlmEvent::ToolResultDelta { .. }) => None,
StreamEvent::Agent(AgentEvent::SubagentStart {
subagent_id,
agent_name,
task_preview,
}) => Some(RpcEvent::SubagentStart {
subagent_id: *subagent_id,
agent_name: agent_name.clone(),
task_preview: task_preview.clone(),
}),
StreamEvent::Agent(AgentEvent::SubagentUpdate {
subagent_id,
agent_name,
status,
}) => Some(RpcEvent::SubagentUpdate {
subagent_id: *subagent_id,
agent_name: agent_name.clone(),
status: status.clone(),
}),
StreamEvent::Agent(AgentEvent::SubagentDone {
subagent_id,
agent_name,
result_preview,
duration_secs,
}) => Some(RpcEvent::SubagentDone {
subagent_id: *subagent_id,
agent_name: agent_name.clone(),
result_preview: result_preview.clone(),
duration_secs: *duration_secs,
}),
StreamEvent::Agent(AgentEvent::SteeringDelivered { .. }) => None,
StreamEvent::Session(_) => None,
}
}
pub fn accumulate_usage(acc: &mut TurnUsage, event: &SessionEvent) {
if let SessionEvent::Usage {
input_tokens,
output_tokens,
cache_read_input_tokens,
cache_creation_input_tokens,
model,
} = event
{
acc.input_tokens += input_tokens;
acc.output_tokens += output_tokens;
acc.cache_read_input_tokens += cache_read_input_tokens;
acc.cache_creation_input_tokens += cache_creation_input_tokens;
if acc.model.is_none() {
acc.model = model.clone();
}
}
}
fn quote_path(p: &str) -> String {
let escaped = p.replace('\\', "\\\\").replace('"', "\\\"");
format!("\"{escaped}\"")
}
pub fn build_user_content(message: &str, attachments: &[RpcAttachment]) -> String {
if attachments.is_empty() {
return message.to_string();
}
let parts: Vec<String> = attachments.iter().map(|a| quote_path(&a.path)).collect();
format!("[user attached files: {}]\n{}", parts.join(", "), message)
}
pub fn build_tools_list_body(tools_schema: &[serde_json::Value]) -> serde_json::Value {
serde_json::json!({
"ok": true,
"tools": tools_schema,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::rpc_protocol::{AssistantEvent, RpcCommand, RpcEvent, RpcAttachment, TurnUsage};
use crate::{AgentEvent, LlmEvent, SessionEvent, StreamEvent};
use serde_json::json;
#[test]
fn parse_frame_valid_prompt() {
let line = r#"{"type":"prompt","id":"abc","message":"hello"}"#;
let result = parse_frame(line, MAX_FRAME_BYTES);
assert!(result.is_ok(), "should parse valid prompt frame");
match result.unwrap() {
RpcCommand::Prompt { id, message, attachments } => {
assert_eq!(id, "abc");
assert_eq!(message, "hello");
assert!(attachments.is_empty());
}
other => panic!("unexpected variant: {:?}", other),
}
}
#[test]
fn parse_frame_valid_shutdown() {
let line = r#"{"type":"shutdown"}"#;
let result = parse_frame(line, MAX_FRAME_BYTES);
assert!(result.is_ok());
assert!(matches!(result.unwrap(), RpcCommand::Shutdown));
}
#[test]
fn parse_frame_valid_follow_up() {
let line = r#"{"type":"follow_up","id":"f1","message":"and then?"}"#;
let result = parse_frame(line, MAX_FRAME_BYTES);
match result.unwrap() {
RpcCommand::FollowUp { id, message } => {
assert_eq!(id, "f1");
assert_eq!(message, "and then?");
}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn parse_frame_valid_abort() {
let line = r#"{"type":"abort","id":"x"}"#;
assert!(matches!(parse_frame(line, MAX_FRAME_BYTES).unwrap(), RpcCommand::Abort { .. }));
}
#[test]
fn parse_frame_malformed_json() {
let line = "not json at all";
let result = parse_frame(line, MAX_FRAME_BYTES);
assert!(result.is_err());
match result.unwrap_err() {
RpcEvent::Error { id, message } => {
assert!(id.is_none(), "malformed-JSON error must have id=None");
assert!(!message.is_empty(), "error message must be non-empty");
}
other => panic!("unexpected event: {:?}", other),
}
}
#[test]
fn parse_frame_valid_json_unknown_type() {
let line = r#"{"type":"does_not_exist","id":"1"}"#;
let result = parse_frame(line, MAX_FRAME_BYTES);
assert!(result.is_err(), "unknown type should fail to deserialise");
}
#[test]
fn parse_frame_oversize() {
let oversize = "x".repeat(MAX_FRAME_BYTES + 1);
let result = parse_frame(&oversize, MAX_FRAME_BYTES);
assert!(result.is_err());
match result.unwrap_err() {
RpcEvent::Error { id, message } => {
assert!(id.is_none());
assert!(
message.contains("1 MiB"),
"expected '1 MiB' in message, got: {message}"
);
}
other => panic!("unexpected event: {:?}", other),
}
}
#[test]
fn parse_frame_exactly_at_limit_valid_json() {
let line = r#"{"type":"get_state","id":"x"}"#;
assert!(line.len() <= MAX_FRAME_BYTES);
let result = parse_frame(line, MAX_FRAME_BYTES);
assert!(result.is_ok());
}
#[test]
fn parse_frame_custom_small_limit() {
let line = r#"{"type":"shutdown"}"#; let result = parse_frame(line, 5); assert!(result.is_err());
match result.unwrap_err() {
RpcEvent::Error { id, .. } => assert!(id.is_none()),
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_llm_thinking() {
let ev = StreamEvent::Llm(LlmEvent::Thinking("hmm".to_string()));
let rpc = map_stream_event(&ev).expect("Thinking must produce an event");
match rpc {
RpcEvent::MessageUpdate {
event: AssistantEvent::ThinkingDelta { delta },
} => assert_eq!(delta, "hmm"),
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_llm_text() {
let ev = StreamEvent::Llm(LlmEvent::Text("hi".to_string()));
let rpc = map_stream_event(&ev).expect("Text must produce an event");
match rpc {
RpcEvent::MessageUpdate {
event: AssistantEvent::TextDelta { delta },
} => assert_eq!(delta, "hi"),
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_llm_tool_use_start() {
let ev = StreamEvent::Llm(LlmEvent::ToolUseStart {
tool_name: "bash".to_string(),
tool_id: "tid1".to_string(),
});
let rpc = map_stream_event(&ev).expect("ToolUseStart must produce an event");
match rpc {
RpcEvent::MessageUpdate {
event: AssistantEvent::ToolcallStart { tool_id, tool_name },
} => {
assert_eq!(tool_id, "tid1");
assert_eq!(tool_name, "bash");
}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_llm_tool_use_delta() {
let ev = StreamEvent::Llm(LlmEvent::ToolUseDelta {
tool_id: "tid1".to_string(),
delta: r#"{"cmd":"#.to_string(),
});
let rpc = map_stream_event(&ev).expect("ToolUseDelta must produce an event");
match rpc {
RpcEvent::MessageUpdate {
event: AssistantEvent::ToolcallInputDelta { tool_id, delta },
} => {
assert_eq!(tool_id, "tid1");
assert_eq!(delta, r#"{"cmd":"#);
}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_llm_tool_use_final_drops_tool_name() {
let ev = StreamEvent::Llm(LlmEvent::ToolUse {
tool_name: "bash".to_string(), tool_id: "tid1".to_string(),
input: json!({"cmd": "ls"}),
});
let rpc = map_stream_event(&ev).expect("ToolUse must produce an event");
match rpc {
RpcEvent::MessageUpdate {
event: AssistantEvent::ToolcallInput { tool_id, input },
} => {
assert_eq!(tool_id, "tid1");
assert_eq!(input, json!({"cmd": "ls"}));
}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_llm_tool_result() {
let ev = StreamEvent::Llm(LlmEvent::ToolResult {
tool_id: "tid1".to_string(),
result: "output here".to_string(),
});
let rpc = map_stream_event(&ev).expect("ToolResult must produce an event");
match rpc {
RpcEvent::MessageUpdate {
event: AssistantEvent::ToolcallResult { tool_id, result },
} => {
assert_eq!(tool_id, "tid1");
assert_eq!(result, "output here");
}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_llm_tool_result_delta_is_dropped() {
let ev = StreamEvent::Llm(LlmEvent::ToolResultDelta {
tool_id: "tid1".to_string(),
delta: "partial".to_string(),
});
assert!(
map_stream_event(&ev).is_none(),
"ToolResultDelta must be dropped — wire format has no streaming-result variant"
);
}
#[test]
fn map_agent_subagent_start() {
let ev = StreamEvent::Agent(AgentEvent::SubagentStart {
subagent_id: 7,
agent_name: "worker".to_string(),
task_preview: "do thing".to_string(),
});
let rpc = map_stream_event(&ev).expect("SubagentStart must produce an event");
match rpc {
RpcEvent::SubagentStart { subagent_id, agent_name, task_preview } => {
assert_eq!(subagent_id, 7);
assert_eq!(agent_name, "worker");
assert_eq!(task_preview, "do thing");
}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_agent_subagent_update() {
let ev = StreamEvent::Agent(AgentEvent::SubagentUpdate {
subagent_id: 7,
agent_name: "worker".to_string(),
status: "running".to_string(),
});
let rpc = map_stream_event(&ev).expect("SubagentUpdate must produce an event");
match rpc {
RpcEvent::SubagentUpdate { subagent_id, agent_name, status } => {
assert_eq!(subagent_id, 7);
assert_eq!(agent_name, "worker");
assert_eq!(status, "running");
}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_agent_subagent_done() {
let ev = StreamEvent::Agent(AgentEvent::SubagentDone {
subagent_id: 7,
agent_name: "worker".to_string(),
result_preview: "done!".to_string(),
duration_secs: 1.5,
});
let rpc = map_stream_event(&ev).expect("SubagentDone must produce an event");
match rpc {
RpcEvent::SubagentDone {
subagent_id,
agent_name,
result_preview,
duration_secs,
} => {
assert_eq!(subagent_id, 7);
assert_eq!(agent_name, "worker");
assert_eq!(result_preview, "done!");
assert!((duration_secs - 1.5).abs() < f64::EPSILON);
}
other => panic!("unexpected: {:?}", other),
}
}
#[test]
fn map_agent_steering_delivered_is_dropped() {
let ev = StreamEvent::Agent(AgentEvent::SteeringDelivered {
message: "steer".to_string(),
});
assert!(
map_stream_event(&ev).is_none(),
"SteeringDelivered must be dropped — internal hook signal"
);
}
#[test]
fn map_session_events_all_return_none() {
let events: &[StreamEvent] = &[
StreamEvent::Session(SessionEvent::Done),
StreamEvent::Session(SessionEvent::Error("oops".to_string())),
StreamEvent::Session(SessionEvent::MessageHistory(vec![])),
StreamEvent::Session(SessionEvent::Usage {
input_tokens: 1,
output_tokens: 2,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
model: None,
}),
];
for ev in events {
assert!(
map_stream_event(ev).is_none(),
"Session event {:?} should return None",
ev
);
}
}
fn zero_usage() -> TurnUsage {
TurnUsage {
input_tokens: 0,
output_tokens: 0,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
model: None,
}
}
#[test]
fn accumulate_usage_basic() {
let mut acc = zero_usage();
let ev = SessionEvent::Usage {
input_tokens: 100,
output_tokens: 50,
cache_read_input_tokens: 10,
cache_creation_input_tokens: 5,
model: Some("claude-3-5".to_string()),
};
accumulate_usage(&mut acc, &ev);
assert_eq!(acc.input_tokens, 100);
assert_eq!(acc.output_tokens, 50);
assert_eq!(acc.cache_read_input_tokens, 10);
assert_eq!(acc.cache_creation_input_tokens, 5);
assert_eq!(acc.model.as_deref(), Some("claude-3-5"));
}
#[test]
fn accumulate_usage_additive_across_calls() {
let mut acc = TurnUsage {
input_tokens: 10,
output_tokens: 5,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
model: Some("first-model".to_string()),
};
let ev = SessionEvent::Usage {
input_tokens: 20,
output_tokens: 8,
cache_read_input_tokens: 2,
cache_creation_input_tokens: 1,
model: Some("second-model".to_string()),
};
accumulate_usage(&mut acc, &ev);
assert_eq!(acc.input_tokens, 30);
assert_eq!(acc.output_tokens, 13);
assert_eq!(acc.cache_read_input_tokens, 2);
assert_eq!(acc.cache_creation_input_tokens, 1);
assert_eq!(acc.model.as_deref(), Some("first-model"));
}
#[test]
fn accumulate_usage_sets_model_when_none() {
let mut acc = zero_usage();
let ev = SessionEvent::Usage {
input_tokens: 1,
output_tokens: 1,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
model: Some("my-model".to_string()),
};
accumulate_usage(&mut acc, &ev);
assert_eq!(acc.model.as_deref(), Some("my-model"));
}
#[test]
fn accumulate_usage_ignores_done() {
let mut acc = zero_usage();
acc.input_tokens = 5;
accumulate_usage(&mut acc, &SessionEvent::Done);
assert_eq!(acc.input_tokens, 5, "Done must not mutate the accumulator");
}
#[test]
fn accumulate_usage_ignores_error() {
let mut acc = zero_usage();
acc.output_tokens = 3;
accumulate_usage(&mut acc, &SessionEvent::Error("boom".to_string()));
assert_eq!(acc.output_tokens, 3, "Error must not mutate the accumulator");
}
#[test]
fn accumulate_usage_ignores_message_history() {
let mut acc = zero_usage();
acc.input_tokens = 7;
accumulate_usage(&mut acc, &SessionEvent::MessageHistory(vec![]));
assert_eq!(acc.input_tokens, 7, "MessageHistory must not mutate the accumulator");
}
#[test]
fn build_user_content_no_attachments() {
assert_eq!(build_user_content("hello", &[]), "hello");
}
#[test]
fn build_user_content_single_attachment() {
let attachments = vec![RpcAttachment {
path: "/tmp/a.txt".to_string(),
name: None,
mime: None,
}];
let msg = build_user_content("check this", &attachments);
assert!(msg.starts_with("[user attached files: \"/tmp/a.txt\"]"));
assert!(msg.contains("check this"));
}
#[test]
fn build_user_content_multiple_attachments() {
let attachments = vec![
RpcAttachment { path: "/tmp/a.txt".to_string(), name: None, mime: None },
RpcAttachment { path: "/tmp/b.pdf".to_string(), name: None, mime: None },
];
let msg = build_user_content("check these", &attachments);
assert!(
msg.contains("[user attached files: \"/tmp/a.txt\", \"/tmp/b.pdf\"]"),
"paths must be quoted and comma-separated: {msg}"
);
assert!(msg.contains("check these"));
}
#[test]
fn build_user_content_preserves_original_message() {
let attachments = vec![RpcAttachment {
path: "/tmp/x".to_string(),
name: Some("x".to_string()),
mime: Some("text/plain".to_string()),
}];
let original = "multi\nline\nmessage";
let msg = build_user_content(original, &attachments);
assert!(msg.ends_with(original), "original message must appear verbatim at the end");
}
#[test]
fn build_user_content_path_with_comma_is_quoted() {
let attachments = vec![RpcAttachment {
path: "/tmp/a,b.pdf".to_string(),
name: None,
mime: None,
}];
let msg = build_user_content("look", &attachments);
assert!(
msg.contains("\"/tmp/a,b.pdf\""),
"comma path must be wrapped in quotes: {msg}"
);
assert!(
!msg.contains("[user attached files: /tmp/a,b.pdf]"),
"bare unquoted comma path must not appear: {msg}"
);
}
#[test]
fn build_user_content_multiple_paths_each_quoted() {
let attachments = vec![
RpcAttachment { path: "/p1".to_string(), name: None, mime: None },
RpcAttachment { path: "/p2".to_string(), name: None, mime: None },
];
let msg = build_user_content("x", &attachments);
assert!(
msg.contains("\"/p1\", \"/p2\""),
"each path must be individually quoted: {msg}"
);
}
#[test]
fn build_user_content_path_with_embedded_quote_is_escaped() {
let attachments = vec![RpcAttachment {
path: "/tmp/he\"llo".to_string(),
name: None,
mime: None,
}];
let msg = build_user_content("x", &attachments);
assert!(
msg.contains("\"/tmp/he\\\"llo\""),
"embedded double-quote must be backslash-escaped: {msg}"
);
}
#[test]
fn build_user_content_path_with_backslash_is_escaped() {
let attachments = vec![RpcAttachment {
path: "/tmp/a\\b".to_string(),
name: None,
mime: None,
}];
let msg = build_user_content("x", &attachments);
assert!(
msg.contains("\"/tmp/a\\\\b\""),
"backslash in path must be doubled: {msg}"
);
}
#[test]
fn build_tools_list_body_empty() {
let body = super::build_tools_list_body(&[]);
assert_eq!(body["ok"], true);
assert!(body["tools"].is_array());
assert_eq!(body["tools"].as_array().unwrap().len(), 0);
}
#[test]
fn build_tools_list_body_with_entries() {
let schema = vec![
json!({"name": "bash", "description": "Run bash", "input_schema": {"type": "object"}}),
json!({"name": "read", "description": "Read file", "input_schema": {"type": "object"}}),
];
let body = super::build_tools_list_body(&schema);
assert_eq!(body["ok"], true);
let tools = body["tools"].as_array().unwrap();
assert_eq!(tools.len(), 2);
assert_eq!(tools[0]["name"], "bash");
assert_eq!(tools[1]["name"], "read");
}
#[test]
fn build_tools_list_body_roundtrip_satisfies_bridge_contract() {
let schema = vec![
json!({"name": "bash", "description": "desc", "input_schema": {}}),
];
let body = super::build_tools_list_body(&schema);
let serialised = serde_json::to_string(&body).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&serialised).unwrap();
assert_eq!(parsed["ok"], true, "bridge check: ok===true");
assert!(parsed["tools"].is_array(), "bridge check: Array.isArray(tools)");
}
#[tokio::test]
async fn handle_compact_releases_lock_before_slow_await() {
use std::sync::Arc;
use tokio::sync::Mutex;
let shared: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let shared2 = shared.clone();
let task = tokio::spawn(async move {
let snapshot = {
let mut g = shared2.lock().await;
*g += 1; *g };
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
let mut g = shared2.lock().await;
*g = snapshot + 100;
});
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
let acquired = tokio::time::timeout(
tokio::time::Duration::from_millis(5),
shared.lock(),
)
.await;
assert!(
acquired.is_ok(),
"second task must acquire the lock during the slow phase — \
handle_compact must NOT hold the lock across compact_conversation"
);
drop(acquired);
task.await.unwrap();
assert_eq!(*shared.lock().await, 101);
}
}