use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::StreamExt;
use tkach::message::Content;
use tkach::{
Agent, AgentError, AgentResult, CancellationToken, Message, StopReason, StreamEvent,
providers::Anthropic,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = dotenvy::dotenv_override();
let provider = Arc::new(Anthropic::from_env());
eprintln!("=== phase 1: tool failure mid-stream ===");
phase1_tool_failure(&provider).await?;
eprintln!();
eprintln!("=== phase 2: cancel during tool execution ===");
phase2_cancel_during_tool(&provider).await?;
eprintln!();
eprintln!("=== phase 3: multi-block assistant turn ===");
phase3_multi_block(&provider).await?;
eprintln!();
eprintln!("✓ all three resilience scenarios passed");
Ok(())
}
async fn phase1_tool_failure(provider: &Arc<Anthropic>) -> Result<(), Box<dyn std::error::Error>> {
let dir = std::env::temp_dir().join("tkach_streaming_resilience_phase1");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir)?;
let bogus = dir.join("does_not_exist_xyz_123.txt");
let agent = Agent::builder()
.provider_arc(provider.clone() as Arc<dyn tkach::LlmProvider>)
.model(tkach::model::claude::HAIKU_20251001)
.system(
"You are a concise assistant. Use tools when helpful. \
If a tool fails, explain the failure briefly to the user \
instead of retrying blindly.",
)
.tools(tkach::tools::defaults())
.max_turns(5)
.max_tokens(512)
.working_dir(&dir)
.build()
.unwrap();
let mut stream = agent.stream(
vec![Message::user_text(format!(
"Read the file at {} and tell me what it contains.",
bogus.display()
))],
CancellationToken::new(),
);
let mut tools_called = Vec::new();
while let Some(event) = stream.next().await {
match event? {
StreamEvent::ToolUse { name, .. } => tools_called.push(name),
StreamEvent::ContentDelta(_) => {}
_ => {}
}
}
let result = stream.into_result().await?;
eprintln!(
" tools called : {tools_called:?}\n \
turns : {} delta msgs\n \
stop reason : {:?}\n \
text len : {}",
result.new_messages.len(),
result.stop_reason,
result.text.len()
);
assert!(
tools_called.iter().any(|t| t == "read"),
"phase 1: agent should have called `read`, got: {tools_called:?}"
);
assert_eq!(
result.stop_reason,
StopReason::EndTurn,
"phase 1: should have ended cleanly after the tool failure"
);
assert!(
!result.text.is_empty(),
"phase 1: agent should have explained the failure"
);
let saw_error_tool_result = result.new_messages.iter().any(|m| {
m.content
.iter()
.any(|c| matches!(c, Content::ToolResult { is_error, .. } if *is_error))
});
assert!(
saw_error_tool_result,
"phase 1: history should contain a tool_result with is_error: true"
);
eprintln!(" ✓ tool failure surfaced and recovered cleanly");
Ok(())
}
async fn phase2_cancel_during_tool(
provider: &Arc<Anthropic>,
) -> Result<(), Box<dyn std::error::Error>> {
let dir = std::env::temp_dir().join("tkach_streaming_resilience_phase2");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir)?;
let agent = Agent::builder()
.provider_arc(provider.clone() as Arc<dyn tkach::LlmProvider>)
.model(tkach::model::claude::HAIKU_20251001)
.system("You are concise. Run shell commands as asked, exactly.")
.tools(tkach::tools::defaults())
.max_turns(2)
.max_tokens(256)
.working_dir(&dir)
.build()
.unwrap();
let cancel = CancellationToken::new();
let cancel_arm = cancel.clone();
let started = Instant::now();
let mut stream = agent.stream(
vec![Message::user_text(
"Use the bash tool to run exactly: `sleep 10` (no other arguments).",
)],
cancel,
);
let mut bash_seen = false;
while let Some(event) = stream.next().await {
match event? {
StreamEvent::ToolUse { name, .. } if name == "bash" => {
bash_seen = true;
let arm = cancel_arm.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
arm.cancel();
});
}
_ => {}
}
}
let outcome = stream.into_result().await;
let elapsed = started.elapsed();
eprintln!(
" bash event seen : {bash_seen}\n \
wall time : {elapsed:?}\n \
outcome : {}",
match &outcome {
Ok(_) => "Ok (unexpected!)".to_string(),
Err(AgentError::Cancelled { .. }) => "Cancelled".to_string(),
Err(other) => format!("Err: {other:?}"),
}
);
assert!(
bash_seen,
"phase 2: should have seen ToolUse{{name:'bash'}} on the stream"
);
let err = outcome.expect_err("phase 2: expected Cancelled, got Ok");
let AgentError::Cancelled { partial } = &err else {
panic!("phase 2: expected Cancelled, got {err:?}");
};
let _ = partial; assert!(
elapsed < Duration::from_secs(5),
"phase 2: cancel did not abort bash promptly — took {elapsed:?}"
);
eprintln!(" ✓ bash killed mid-sleep, agent returned Cancelled");
Ok(())
}
async fn phase3_multi_block(provider: &Arc<Anthropic>) -> Result<(), Box<dyn std::error::Error>> {
let dir = std::env::temp_dir().join("tkach_streaming_resilience_phase3");
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir)?;
std::fs::write(dir.join("note.txt"), "The codeword is BANANA.")?;
let agent = Agent::builder()
.provider_arc(provider.clone() as Arc<dyn tkach::LlmProvider>)
.model(tkach::model::claude::SONNET)
.system(
"You are a thoughtful assistant. Before calling a tool, \
briefly state what you're about to do in one sentence \
of plain text, THEN call the tool in the same response.",
)
.tools(tkach::tools::defaults())
.max_turns(3)
.max_tokens(1024)
.working_dir(&dir)
.build()
.unwrap();
let mut stream = agent.stream(
vec![Message::user_text(
"There's a file called note.txt. Tell me out loud what \
you're about to do, then read it, then report what you \
found.",
)],
CancellationToken::new(),
);
while let Some(event) = stream.next().await {
let _ = event?;
}
let result: AgentResult = stream.into_result().await?;
let multi_block_assistant_turn = result.new_messages.iter().find(|m| {
let has_text = m.content.iter().any(|c| matches!(c, Content::Text { .. }));
let has_tool = m
.content
.iter()
.any(|c| matches!(c, Content::ToolUse { .. }));
has_text && has_tool
});
eprintln!(
" delta msgs : {}\n \
multi-block turn found : {}",
result.new_messages.len(),
multi_block_assistant_turn.is_some(),
);
match multi_block_assistant_turn {
Some(m) => {
let blocks: Vec<&'static str> = m
.content
.iter()
.map(|c| match c {
Content::Text { .. } => "Text",
Content::Thinking { .. } => "Thinking",
Content::ToolUse { .. } => "ToolUse",
Content::ToolResult { .. } => "ToolResult",
})
.collect();
eprintln!(" block sequence : {blocks:?}");
eprintln!(
" ✓ multi-block assistant turn correctly reconstructed \
(Text + ToolUse in same Message)"
);
}
None => {
eprintln!(
" ! model skipped pre-tool narration on this run; \
multi-block PATH not exercised this time, but \
streaming-with-tool still verified by content"
);
assert!(
result.text.contains("BANANA"),
"phase 3 fallback: agent should have read the codeword. got: {:?}",
result.text
);
}
}
Ok(())
}