use std::time::{Duration, Instant};
use serde_json::{Value, json};
use tkach::message::{Content, StopReason, Usage};
use tkach::provider::Response;
use tkach::providers::Mock;
use tkach::{
Agent, CancellationToken, Message, Tool, ToolConcurrency, ToolContext, ToolError, ToolOutput,
};
struct SlowWriter {
delay_ms: u64,
}
#[async_trait::async_trait]
impl Tool for SlowWriter {
fn name(&self) -> &str {
"slow_write"
}
fn description(&self) -> &str {
"Mutating tool that sleeps then writes content to a file"
}
fn input_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"file_path": { "type": "string" },
"content": { "type": "string" }
},
"required": ["file_path", "content"]
})
}
async fn execute(&self, input: Value, _ctx: &ToolContext) -> Result<ToolOutput, ToolError> {
let path = input["file_path"]
.as_str()
.ok_or_else(|| ToolError::InvalidInput("file_path required".into()))?
.to_string();
let content = input["content"]
.as_str()
.ok_or_else(|| ToolError::InvalidInput("content required".into()))?
.to_string();
tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
tokio::fs::write(&path, &content)
.await
.map_err(ToolError::Io)?;
Ok(ToolOutput::text(format!("wrote {path}")))
}
}
#[tokio::main]
async fn main() {
let dir = std::env::temp_dir().join(format!("tkach-parallel-writes-{}", std::process::id()));
tokio::fs::create_dir_all(&dir).await.expect("mkdir tmp");
let path_a = dir.join("a.txt");
let path_b = dir.join("b.txt");
let pa = path_a.to_string_lossy().to_string();
let pb = path_b.to_string_lossy().to_string();
let mock = Mock::new(move |req| {
let has_results = req.messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { .. }))
});
if has_results {
return Ok(Response {
content: vec![Content::text("both writes done")],
stop_reason: StopReason::EndTurn,
usage: Usage::default(),
});
}
Ok(Response {
content: vec![
Content::ToolUse {
id: "w1".into(),
name: "slow_write".into(),
input: json!({ "file_path": pa, "content": "alpha" }),
},
Content::ToolUse {
id: "w2".into(),
name: "slow_write".into(),
input: json!({ "file_path": pb, "content": "beta" }),
},
],
stop_reason: StopReason::ToolUse,
usage: Usage::default(),
})
});
let delay_ms: u64 = 200;
let agent = Agent::builder()
.provider(mock)
.model("mock")
.tool(SlowWriter { delay_ms })
.tool_concurrency("slow_write", ToolConcurrency::on())
.build()
.unwrap();
let started = Instant::now();
let result = agent
.run(
vec![Message::user_text("write two files")],
CancellationToken::new(),
)
.await
.expect("agent run");
let elapsed = started.elapsed();
let a_content = tokio::fs::read_to_string(&path_a).await.expect("read a");
let b_content = tokio::fs::read_to_string(&path_b).await.expect("read b");
assert_eq!(a_content, "alpha", "a.txt content mismatch");
assert_eq!(b_content, "beta", "b.txt content mismatch");
let parallel_threshold = Duration::from_millis(delay_ms + delay_ms / 2);
assert!(
elapsed < parallel_threshold,
"expected parallel wall-time (< {parallel_threshold:?}), got {elapsed:?} \
— looks like writes serialised through the width-1 pool"
);
println!("result: {}", result.text);
println!("a.txt: {a_content}");
println!("b.txt: {b_content}");
println!("wall time: {elapsed:?} (single-call delay: {delay_ms}ms)");
println!();
println!("Without tool_concurrency promotion, this batch would take ≈ 2× delay.");
println!("With ToolConcurrency::on(), both writes overlap in the concurrent-mutator pool.");
let _ = tokio::fs::remove_dir_all(&dir).await;
}