use std::sync::Arc;
use std::time::{Duration, Instant};
use serde_json::{Value, json};
use tkach::message::{Content, Message, StopReason, Usage};
use tkach::provider::Response;
use tkach::providers::Mock;
use tkach::tools::SubAgent;
use tkach::{
Agent, CancellationToken, LlmProvider, Tool, ToolClass, ToolConcurrency, ToolContext,
ToolError, ToolOutput,
};
struct SlowWriter {
delay_ms: u64,
}
#[async_trait::async_trait]
impl Tool for SlowWriter {
fn name(&self) -> &str {
"slow_write"
}
fn description(&self) -> &str {
"Mutating tool that sleeps then writes content to a file"
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"file_path": { "type": "string" },
"content": { "type": "string" }
},
"required": ["file_path", "content"]
})
}
async fn execute(&self, input: Value, _ctx: &ToolContext) -> Result<ToolOutput, ToolError> {
let path = input["file_path"]
.as_str()
.ok_or_else(|| ToolError::InvalidInput("file_path required".into()))?
.to_string();
let content = input["content"]
.as_str()
.ok_or_else(|| ToolError::InvalidInput("content required".into()))?
.to_string();
tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
tokio::fs::write(&path, &content)
.await
.map_err(ToolError::Io)?;
Ok(ToolOutput::text(format!("wrote {path}")))
}
}
struct SlowReader {
delay_ms: u64,
}
#[async_trait::async_trait]
impl Tool for SlowReader {
fn name(&self) -> &str {
"slow_reader"
}
fn description(&self) -> &str {
"ReadOnly tool that sleeps then echoes a label"
}
fn input_schema(&self) -> Value {
json!({ "type": "object", "properties": { "label": { "type": "string" } } })
}
fn class(&self) -> ToolClass {
ToolClass::ReadOnly
}
async fn execute(&self, input: Value, _ctx: &ToolContext) -> Result<ToolOutput, ToolError> {
let label = input["label"].as_str().unwrap_or("anon").to_string();
tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
Ok(ToolOutput::text(format!("read[{label}]")))
}
}
fn fresh_tmp_dir(label: &str) -> std::path::PathBuf {
let stamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let dir = std::env::temp_dir().join(format!(
"tkach-tests-{label}-{}-{stamp}",
std::process::id()
));
std::fs::create_dir_all(&dir).unwrap();
dir
}
fn writes_mock(path_a: String, path_b: String) -> Mock {
Mock::new(move |req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
Ok(Response {
content: vec![
Content::ToolUse {
id: "w1".into(),
name: "slow_write".into(),
input: json!({ "file_path": path_a, "content": "alpha" }),
},
Content::ToolUse {
id: "w2".into(),
name: "slow_write".into(),
input: json!({ "file_path": path_b, "content": "beta" }),
},
],
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
})
}
fn fanout_mock(prompts: Vec<&'static str>) -> Mock {
Mock::new(move |req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("all done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
let content: Vec<Content> = prompts
.iter()
.enumerate()
.map(|(i, p)| Content::ToolUse {
id: format!("a{i}"),
name: "agent".into(),
input: json!({ "prompt": *p }),
})
.collect();
Ok(Response {
content,
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
})
}
fn sub_provider() -> Arc<dyn LlmProvider> {
Arc::new(Mock::new(|req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("sub done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
let label = req
.messages
.iter()
.find_map(|m| {
m.content.iter().find_map(|c| match c {
Content::Text { text, .. } => Some(text.clone()),
_ => None,
})
})
.unwrap_or_else(|| "anon".into());
Ok(Response {
content: vec![Content::ToolUse {
id: "r".into(),
name: "slow_reader".into(),
input: json!({ "label": label }),
}],
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
}))
}
const WRITE_DELAY_MS: u64 = 200;
#[tokio::test]
async fn parallel_writes_with_promotion_overlap() {
let dir = fresh_tmp_dir("parallel-writes");
let pa = dir.join("a.txt");
let pb = dir.join("b.txt");
let mock = writes_mock(
pa.to_string_lossy().into_owned(),
pb.to_string_lossy().into_owned(),
);
let agent = Agent::builder()
.provider(mock)
.model("mock")
.tool(SlowWriter {
delay_ms: WRITE_DELAY_MS,
})
.tool_concurrency("slow_write", ToolConcurrency::on())
.build()
.unwrap();
let started = Instant::now();
let result = agent
.run(
vec![Message::user_text("write two files")],
CancellationToken::new(),
)
.await
.expect("agent run");
let elapsed = started.elapsed();
assert_eq!(
tokio::fs::read_to_string(&pa).await.unwrap(),
"alpha",
"a.txt content mismatch"
);
assert_eq!(
tokio::fs::read_to_string(&pb).await.unwrap(),
"beta",
"b.txt content mismatch"
);
assert_eq!(result.text, "done");
let ceiling = Duration::from_millis(WRITE_DELAY_MS + WRITE_DELAY_MS / 2);
assert!(
elapsed < ceiling,
"parallel writes took {elapsed:?}, expected < {ceiling:?} \
(regressed to serial-mut pool?)"
);
let _ = tokio::fs::remove_dir_all(&dir).await;
}
#[tokio::test]
async fn writes_without_promotion_serialise() {
let dir = fresh_tmp_dir("serial-writes");
let pa = dir.join("a.txt");
let pb = dir.join("b.txt");
let mock = writes_mock(
pa.to_string_lossy().into_owned(),
pb.to_string_lossy().into_owned(),
);
let agent = Agent::builder()
.provider(mock)
.model("mock")
.tool(SlowWriter {
delay_ms: WRITE_DELAY_MS,
})
.build()
.unwrap();
let started = Instant::now();
let _ = agent
.run(
vec![Message::user_text("write two files")],
CancellationToken::new(),
)
.await
.expect("agent run");
let elapsed = started.elapsed();
let lower_bound = Duration::from_millis(WRITE_DELAY_MS * 17 / 10);
assert!(
elapsed >= lower_bound,
"serial writes finished too fast ({elapsed:?}, expected >= {lower_bound:?}) \
— promotion-by-default would break the consumer-opt-in contract"
);
let _ = tokio::fs::remove_dir_all(&dir).await;
}
const SUB_DELAY_MS: u64 = 200;
#[tokio::test]
async fn parallel_subagents_with_promotion_overlap() {
let parent_mock = fanout_mock(vec!["topic-A", "topic-B", "topic-C"]);
let agent = Agent::builder()
.provider(parent_mock)
.model("mock-parent")
.tool(SlowReader {
delay_ms: SUB_DELAY_MS,
})
.tool(SubAgent::new(sub_provider(), "mock-sub").max_turns(3))
.tool_concurrency("agent", ToolConcurrency::on())
.build()
.unwrap();
let started = Instant::now();
let result = agent
.run(
vec![Message::user_text("delegate to three sub-agents")],
CancellationToken::new(),
)
.await
.expect("agent run");
let elapsed = started.elapsed();
assert_eq!(result.text, "all done");
let ceiling = Duration::from_millis(SUB_DELAY_MS + (SUB_DELAY_MS * 3 / 4));
assert!(
elapsed < ceiling,
"parallel sub-agents took {elapsed:?}, expected < {ceiling:?} \
(regressed to serial-mut pool?)"
);
}
#[tokio::test]
async fn subagents_overlap_without_explicit_promotion() {
let parent_mock = fanout_mock(vec!["topic-A", "topic-B", "topic-C"]);
let agent = Agent::builder()
.provider(parent_mock)
.model("mock-parent")
.tool(SlowReader {
delay_ms: SUB_DELAY_MS,
})
.tool(SubAgent::new(sub_provider(), "mock-sub").max_turns(3))
.build()
.unwrap();
let started = Instant::now();
let _ = agent
.run(
vec![Message::user_text("delegate to three sub-agents")],
CancellationToken::new(),
)
.await
.expect("agent run");
let elapsed = started.elapsed();
let ceiling = Duration::from_millis(SUB_DELAY_MS + (SUB_DELAY_MS * 3 / 4));
assert!(
elapsed < ceiling,
"is_recursive routing should overlap sub-agents by default \
({elapsed:?}, expected < {ceiling:?})"
);
}
#[tokio::test]
async fn siblings_serialise_on_default_mutating_tools() {
let dir = std::sync::Arc::new(fresh_tmp_dir("siblings-serialise"));
let dir_for_sub = std::sync::Arc::clone(&dir);
let sub_provider: Arc<dyn LlmProvider> = Arc::new(Mock::new(move |req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("sub done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
let label = req
.messages
.iter()
.find_map(|m| {
m.content.iter().find_map(|c| match c {
Content::Text { text, .. } => Some(text.clone()),
_ => None,
})
})
.unwrap_or_else(|| "anon".into());
let path = dir_for_sub.join(format!("{label}.txt"));
Ok(Response {
content: vec![Content::ToolUse {
id: "w".into(),
name: "slow_write".into(),
input: json!({
"file_path": path.to_string_lossy().into_owned(),
"content": "child write"
}),
}],
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
}));
let parent_mock = fanout_mock(vec!["topic-A", "topic-B"]);
let agent = Agent::builder()
.provider(parent_mock)
.model("mock-parent")
.tool(SlowWriter { delay_ms: 200 })
.tool(SubAgent::new(Arc::clone(&sub_provider), "mock-sub").max_turns(3))
.build()
.unwrap();
let started = Instant::now();
let _ = agent
.run(
vec![Message::user_text("delegate")],
CancellationToken::new(),
)
.await
.expect("agent run");
let elapsed = started.elapsed();
let lower_bound = Duration::from_millis(350);
assert!(
elapsed >= lower_bound,
"siblings must serialise on default-Mutating tools via shared serial_mut \
(got {elapsed:?}, expected >= {lower_bound:?}) — fork() must NOT reset \
the serial_mut pool"
);
let _ = std::fs::remove_dir_all(dir.as_path());
}
#[tokio::test]
async fn parallel_writes_preserve_input_order() {
let dir = fresh_tmp_dir("order");
let p1 = dir.join("1.txt");
let p2 = dir.join("2.txt");
let p3 = dir.join("3.txt");
let p1s = p1.to_string_lossy().into_owned();
let p2s = p2.to_string_lossy().into_owned();
let p3s = p3.to_string_lossy().into_owned();
let mock = Mock::new(move |req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
Ok(Response {
content: vec![
Content::ToolUse {
id: "first".into(),
name: "slow_write".into(),
input: json!({ "file_path": p1s, "content": "1" }),
},
Content::ToolUse {
id: "second".into(),
name: "slow_write".into(),
input: json!({ "file_path": p2s, "content": "2" }),
},
Content::ToolUse {
id: "third".into(),
name: "slow_write".into(),
input: json!({ "file_path": p3s, "content": "3" }),
},
],
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
});
let agent = Agent::builder()
.provider(mock)
.model("mock")
.tool(SlowWriter { delay_ms: 150 })
.tool_concurrency("slow_write", ToolConcurrency::on())
.build()
.unwrap();
let result = agent
.run(
vec![Message::user_text("three ordered writes")],
CancellationToken::new(),
)
.await
.expect("agent run");
let assistant_msg = result
.new_messages
.iter()
.find(|m| matches!(m.role, tkach::Role::Assistant))
.expect("assistant message");
let tool_use_ids: Vec<&str> = assistant_msg
.content
.iter()
.filter_map(|c| match c {
Content::ToolUse { id, .. } => Some(id.as_str()),
_ => None,
})
.collect();
assert_eq!(tool_use_ids, vec!["first", "second", "third"]);
let user_msg = result
.new_messages
.iter()
.find(|m| matches!(m.role, tkach::Role::User))
.expect("user tool_result message");
let tool_result_ids: Vec<&str> = user_msg
.content
.iter()
.filter_map(|c| match c {
Content::ToolResult { tool_use_id, .. } => Some(tool_use_id.as_str()),
_ => None,
})
.collect();
assert_eq!(
tool_result_ids,
vec!["first", "second", "third"],
"tool_result order must match tool_use order"
);
let _ = tokio::fs::remove_dir_all(&dir).await;
}
#[tokio::test]
async fn cancel_during_parallel_batch_short_circuits_pending_calls() {
let dir = fresh_tmp_dir("cancel");
let pa = dir.join("a.txt");
let pb = dir.join("b.txt");
let mock = writes_mock(
pa.to_string_lossy().into_owned(),
pb.to_string_lossy().into_owned(),
);
let agent = Agent::builder()
.provider(mock)
.model("mock")
.tool(SlowWriter { delay_ms: 500 })
.tool_concurrency("slow_write", ToolConcurrency::on().max(1))
.build()
.unwrap();
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
cancel_clone.cancel();
});
let result = agent
.run(vec![Message::user_text("two writes")], cancel)
.await;
assert!(
matches!(result, Err(tkach::AgentError::Cancelled { .. })),
"expected Cancelled, got {result:?}"
);
assert!(
!pb.exists(),
"second file should not have been written — cancel must short-circuit \
the queued call before its execute() body"
);
let _ = tokio::fs::remove_dir_all(&dir).await;
}
#[tokio::test]
async fn read_after_write_observes_the_write() {
let dir = fresh_tmp_dir("ordering");
let path = dir.join("a.txt");
let path_str = path.to_string_lossy().into_owned();
let mock = Mock::new(move |req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
Ok(Response {
content: vec![
Content::ToolUse {
id: "w".into(),
name: "slow_write".into(),
input: json!({ "file_path": path_str, "content": "expected" }),
},
Content::ToolUse {
id: "r".into(),
name: "read_file".into(),
input: json!({ "file_path": path_str }),
},
],
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
});
struct ReadFile;
#[async_trait::async_trait]
impl Tool for ReadFile {
fn name(&self) -> &str {
"read_file"
}
fn description(&self) -> &str {
"read a file"
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": { "file_path": { "type": "string" } },
"required": ["file_path"]
})
}
fn class(&self) -> ToolClass {
ToolClass::ReadOnly
}
async fn execute(&self, input: Value, _ctx: &ToolContext) -> Result<ToolOutput, ToolError> {
let p = input["file_path"]
.as_str()
.ok_or_else(|| ToolError::InvalidInput("file_path required".into()))?;
let content = tokio::fs::read_to_string(p).await.unwrap_or_default();
Ok(ToolOutput::text(content))
}
}
let agent = Agent::builder()
.provider(mock)
.model("mock")
.tool(SlowWriter { delay_ms: 100 })
.tool(ReadFile)
.tool_concurrency("slow_write", ToolConcurrency::on())
.build()
.unwrap();
let result = agent
.run(
vec![Message::user_text("write then read")],
CancellationToken::new(),
)
.await
.expect("agent run");
let user_msg = result
.new_messages
.iter()
.find(|m| matches!(m.role, tkach::Role::User))
.expect("tool_result message");
let read_result = user_msg
.content
.iter()
.find_map(|c| match c {
Content::ToolResult {
tool_use_id,
content,
..
} if tool_use_id == "r" => Some(content.clone()),
_ => None,
})
.expect("read tool_result");
assert_eq!(
read_result, "expected",
"read after write must observe the write — got {read_result:?}"
);
let _ = tokio::fs::remove_dir_all(&dir).await;
}
#[tokio::test]
async fn nested_promoted_fanout_does_not_deadlock_when_parent_saturates_pool() {
let dir = std::sync::Arc::new(fresh_tmp_dir("nested-deadlock"));
let dir_for_sub = std::sync::Arc::clone(&dir);
let sub_provider: Arc<dyn LlmProvider> = Arc::new(Mock::new(move |req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("sub done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
let label = req
.messages
.iter()
.find_map(|m| {
m.content.iter().find_map(|c| match c {
Content::Text { text, .. } => Some(text.clone()),
_ => None,
})
})
.unwrap_or_else(|| "anon".into());
let p1 = dir_for_sub.join(format!("{label}-1.txt"));
let p2 = dir_for_sub.join(format!("{label}-2.txt"));
Ok(Response {
content: vec![
Content::ToolUse {
id: "c1".into(),
name: "slow_write".into(),
input: json!({
"file_path": p1.to_string_lossy().into_owned(),
"content": "child-write-1"
}),
},
Content::ToolUse {
id: "c2".into(),
name: "slow_write".into(),
input: json!({
"file_path": p2.to_string_lossy().into_owned(),
"content": "child-write-2"
}),
},
],
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
}));
let parent_mock = fanout_mock(vec!["topic-A", "topic-B"]);
let agent = Agent::builder()
.provider(parent_mock)
.model("mock-parent")
.tool(SlowWriter { delay_ms: 50 })
.tool(SubAgent::new(Arc::clone(&sub_provider), "mock-sub").max_turns(3))
.max_concurrent_mutations(2)
.tool_concurrency("agent", ToolConcurrency::on())
.tool_concurrency("slow_write", ToolConcurrency::on())
.build()
.unwrap();
let runner = agent.run(
vec![Message::user_text("delegate")],
CancellationToken::new(),
);
let result = tokio::time::timeout(Duration::from_secs(10), runner)
.await
.expect("nested fan-out should not deadlock — Codex P1 review on PR #41")
.expect("agent run");
assert_eq!(result.text, "all done");
let entries: Vec<_> = std::fs::read_dir(dir.as_path())
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name().to_string_lossy().into_owned())
.collect();
assert_eq!(
entries.len(),
4,
"expected 4 files (2 sub-agents × 2 writes each), got {entries:?}"
);
let _ = std::fs::remove_dir_all(dir.as_path());
}
#[tokio::test]
async fn agent_built_without_concurrency_methods_matches_pre_feature_behaviour() {
struct SlowReadOnly {
delay_ms: u64,
}
#[async_trait::async_trait]
impl Tool for SlowReadOnly {
fn name(&self) -> &str {
"slow_ro"
}
fn description(&self) -> &str {
"ro"
}
fn input_schema(&self) -> Value {
json!({})
}
fn class(&self) -> ToolClass {
ToolClass::ReadOnly
}
async fn execute(
&self,
_input: Value,
_ctx: &ToolContext,
) -> Result<ToolOutput, ToolError> {
tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
Ok(ToolOutput::text("ok"))
}
}
let mock = Mock::new(|req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
Ok(Response {
content: vec![
Content::ToolUse {
id: "r1".into(),
name: "slow_ro".into(),
input: json!({}),
},
Content::ToolUse {
id: "r2".into(),
name: "slow_ro".into(),
input: json!({}),
},
],
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
});
let agent = Agent::builder()
.provider(mock)
.model("mock")
.tool(SlowReadOnly { delay_ms: 150 })
.build()
.unwrap();
let started = Instant::now();
let result = agent
.run(
vec![Message::user_text("read both")],
CancellationToken::new(),
)
.await
.expect("agent run");
let elapsed = started.elapsed();
assert_eq!(result.text, "done");
let ceiling = Duration::from_millis(225);
assert!(
elapsed < ceiling,
"default-built RO calls should parallelise, took {elapsed:?}"
);
}