use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use serde_json::Value;
use crate::error::{AgenticError, Result};
use crate::persistence::session::{SessionStore, TranscriptEntry, TranscriptEntryType};
use crate::provider::retry::compute_delay;
use crate::provider::types::{
CompletionResponse, ContentBlock, Message, ResponseStatus, StreamEvent, TokenUsage,
};
use crate::provider::{CompletionRequest, Provider, ProviderError};
use crate::tools::{ToolCall, ToolContext, ToolRegistry, ToolResult};
use crate::util::{format_current_date, now_millis};
use super::compact;
use super::event::{Event, EventKind};
use super::output::{AgentOutput, AgentStatistics, AgentStatus, OutputSchema};
use super::prompts::{self as prompts};
use super::queue::{CommandQueue, QueuePriority};
use super::spec::AgentSpec;
pub(crate) struct LoopRuntime {
pub provider: Arc<dyn Provider>,
pub event_handler: Arc<dyn Fn(Event) + Send + Sync>,
pub cancel_signal: Arc<AtomicBool>,
pub working_directory: PathBuf,
pub command_queue: Option<Arc<CommandQueue>>,
pub session_store: Option<Arc<Mutex<SessionStore>>>,
pub metadata: Option<String>,
pub discovered_tools: Arc<Mutex<HashSet<String>>>,
pub tools: Arc<ToolRegistry>,
pub template_variables: HashMap<String, Value>,
}
impl LoopRuntime {
pub(crate) fn environment(working_directory: &Path) -> String {
let working_directory = working_directory.display();
let platform = std::env::consts::OS;
let os_version = std::process::Command::new("uname")
.arg("-r")
.output()
.map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
.unwrap_or_default();
let date = format_current_date();
format!(
"<environment>\nWorking directory: {working_directory}\nPlatform: {platform}\nOS version: {os_version}\nDate: {date}\n</environment>"
)
}
}
#[derive(Default)]
pub(crate) struct LoopState {
pub messages: Vec<Message>,
pub total_usage: TokenUsage,
pub request_count: u64,
pub tool_call_count: u64,
pub turn: u32,
pub schema_retries: u32,
pub is_idle: bool,
}
impl LoopState {
pub(crate) fn initial(context_prompt: Option<String>, instruction: String) -> Self {
let mut messages = Vec::new();
if let Some(cp) = context_prompt {
messages.push(Message::user(cp));
}
messages.push(Message::user(instruction));
Self {
messages,
..Self::default()
}
}
}
pub(crate) fn run_loop(
runtime: Arc<LoopRuntime>,
spec: Arc<AgentSpec>,
mut state: LoopState,
description: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<AgentOutput>> + Send>> {
Box::pin(async move {
runtime.provider.prewarm().await;
record_transcript(
&runtime,
TranscriptEntryType::UserMessage,
state.messages.last().unwrap(),
None,
);
emit_agent_started(&runtime, &spec, description);
loop {
if let Some(status) = check_guards(&runtime, &spec, &state) {
return Ok(finish_early(&runtime, &spec, &mut state, status));
}
state.turn += 1;
let turn = state.turn;
emit_turn_started(&runtime, &spec, turn);
emit_request_started(&runtime, &spec);
let response = match call_provider_with_retry(&runtime, &spec, &mut state, turn).await {
Ok(r) => r,
Err(e) => {
if runtime.cancel_signal.load(Ordering::Relaxed) {
return Ok(finish_early(
&runtime,
&spec,
&mut state,
AgentStatus::Cancelled,
));
}
return Err(e);
}
};
emit_request_finished(&runtime, &spec);
record_usage(&runtime, &spec, &mut state, &response);
let (text, tool_calls) = parse_response(&response);
state.messages.push(Message::Assistant {
content: response.content.clone(),
});
record_transcript(
&runtime,
TranscriptEntryType::AssistantMessage,
state.messages.last().unwrap(),
Some((&response.usage, &response.model)),
);
if response.status == ResponseStatus::ContextWindowExceeded
&& spec.model().context_window_size.is_some()
{
compact::trigger_reactive(&runtime, &spec, &mut state, turn).await?;
}
compact::trigger_if_over_threshold(&runtime, &spec, &mut state).await?;
let tool_use_ready =
response.status == ResponseStatus::ToolUse && !tool_calls.is_empty();
let response_truncated =
response.status == ResponseStatus::OutputTruncated && tool_calls.is_empty();
if tool_use_ready {
let results = execute_tools(&runtime, &spec, &mut state, &tool_calls).await;
state.messages.push(Message::User { content: results });
record_transcript(
&runtime,
TranscriptEntryType::ToolResult,
state.messages.last().unwrap(),
None,
);
drain_command_queue(&runtime, &spec, &mut state);
emit_turn_finished(&runtime, &spec, turn);
continue;
}
if response_truncated {
emit_output_truncated(&runtime, &spec, turn);
state
.messages
.push(Message::user(prompts::MAX_TOKENS_CONTINUATION));
emit_turn_finished(&runtime, &spec, turn);
continue;
}
let drain_found_messages = drain_pending_messages(&runtime, &spec, &mut state);
if drain_found_messages {
emit_turn_finished(&runtime, &spec, turn);
continue;
}
let idle_found_message =
spec.keep_alive && idle_until_message(&runtime, &spec, &mut state).await;
if idle_found_message {
emit_turn_finished(&runtime, &spec, turn);
continue;
}
let output_validation = match &spec.output_schema {
None => Ok(None),
Some(schema) => schema.validate(&text).map(Some),
};
if let Err(detail) = output_validation.as_ref() {
state.schema_retries += 1;
let retry_limit_exceeded = spec
.max_schema_retries
.filter(|&limit| state.schema_retries > limit);
if let Some(limit) = retry_limit_exceeded {
return Err(AgenticError::SchemaRetryExhausted { retries: limit });
}
let retry_prompt = OutputSchema::retry_message(detail);
state.messages.push(Message::user(retry_prompt));
emit_turn_finished(&runtime, &spec, turn);
continue;
}
let validated = output_validation.expect("Err handled above");
let agent_end = EventKind::AgentFinished {
turns: state.turn,
status: AgentStatus::Completed,
};
emit(&runtime, &spec, agent_end);
emit_turn_finished(&runtime, &spec, turn);
return Ok(build_output(
&spec,
&state,
text,
validated,
AgentStatus::Completed,
));
}
})
}
fn finish_early(
runtime: &LoopRuntime,
spec: &AgentSpec,
state: &mut LoopState,
status: AgentStatus,
) -> AgentOutput {
let text = last_assistant_text(&state.messages);
match &status {
AgentStatus::InputBudgetExhausted { usage, limit } => emit(
runtime,
spec,
EventKind::InputBudgetExhausted {
usage: *usage,
limit: *limit,
},
),
AgentStatus::OutputBudgetExhausted { usage, limit } => emit(
runtime,
spec,
EventKind::OutputBudgetExhausted {
usage: *usage,
limit: *limit,
},
),
_ => {}
}
emit(
runtime,
spec,
EventKind::AgentFinished {
turns: state.turn,
status: status.clone(),
},
);
build_output(spec, state, text, None, status)
}
fn check_guards(runtime: &LoopRuntime, spec: &AgentSpec, state: &LoopState) -> Option<AgentStatus> {
if runtime.cancel_signal.load(Ordering::Relaxed) {
return Some(AgentStatus::Cancelled);
}
if let Some(limit) = spec.max_turns {
if state.turn >= limit {
return Some(AgentStatus::TurnLimitReached { limit });
}
}
if let Some(limit) = spec.max_input_tokens {
if state.total_usage.input_tokens >= limit {
return Some(AgentStatus::InputBudgetExhausted {
usage: state.total_usage.input_tokens,
limit,
});
}
}
if let Some(limit) = spec.max_output_tokens {
if state.total_usage.output_tokens >= limit {
return Some(AgentStatus::OutputBudgetExhausted {
usage: state.total_usage.output_tokens,
limit,
});
}
}
None
}
async fn cancellable_sleep(duration: std::time::Duration, cancel: &Arc<AtomicBool>) -> bool {
let deadline = tokio::time::Instant::now() + duration;
loop {
if cancel.load(Ordering::Relaxed) {
return false;
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return true;
}
let tick = std::cmp::min(remaining, std::time::Duration::from_millis(100));
tokio::time::sleep(tick).await;
}
}
async fn call_provider(
runtime: &LoopRuntime,
spec: &AgentSpec,
state: &LoopState,
) -> Result<CompletionResponse> {
let tool_defs = runtime
.tools
.definitions(&runtime.discovered_tools.lock().unwrap());
let request = CompletionRequest {
model: spec.model().name.clone(),
system_prompt: spec.system_prompt(&runtime.template_variables),
messages: state.messages.clone(),
tools: tool_defs,
max_request_tokens: spec.max_request_tokens,
tool_choice: None,
};
let event_handler = runtime.event_handler.clone();
let agent_name = spec.name.clone();
let on_event = Arc::new(move |event: StreamEvent| {
if let StreamEvent::TextDelta { text, .. } = &event {
event_handler(Event::new(
agent_name.clone(),
EventKind::TextChunkReceived {
content: text.clone(),
},
));
}
});
runtime
.provider
.complete_streaming(request, on_event)
.await
.map_err(AgenticError::from)
}
async fn call_provider_with_retry(
runtime: &LoopRuntime,
spec: &AgentSpec,
state: &mut LoopState,
turn: u32,
) -> Result<CompletionResponse> {
let mut last_err = None;
for attempt in 0..=spec.max_request_retries {
match call_provider(runtime, spec, state).await {
Ok(response) => return Ok(response),
Err(AgenticError::Provider(ProviderError::ContextWindowExceeded {
provider_message,
})) if spec.model().context_window_size.is_some() => {
compact::trigger_reactive(runtime, spec, state, turn).await?;
return Err(AgenticError::Provider(
ProviderError::ContextWindowExceeded { provider_message },
));
}
Err(e) if e.is_retryable() && attempt < spec.max_request_retries => {
let delay_ms = compute_delay(spec.request_retry_delay, attempt, e.retry_after_ms());
if !cancellable_sleep(
std::time::Duration::from_millis(delay_ms),
&runtime.cancel_signal,
)
.await
{
return Err(AgenticError::Aborted);
}
emit_request_retried(
runtime,
spec,
attempt + 1,
spec.max_request_retries,
format!("{e}"),
);
last_err = Some(e);
}
Err(e) => {
emit_request_error(runtime, spec, format!("{e}"));
return Err(e);
}
}
}
let e = last_err.unwrap_or_else(|| AgenticError::Other("retry loop ended unexpectedly".into()));
emit_request_error(runtime, spec, format!("{e}"));
Err(e)
}
fn record_usage(
runtime: &LoopRuntime,
spec: &AgentSpec,
state: &mut LoopState,
response: &CompletionResponse,
) {
state.total_usage += &response.usage;
state.request_count += 1;
emit(
runtime,
spec,
EventKind::TokensReported {
model: response.model.clone(),
usage: response.usage.clone(),
},
);
}
fn parse_response(response: &CompletionResponse) -> (String, Vec<ToolCall>) {
let mut text = String::new();
let mut tool_calls = Vec::new();
for block in &response.content {
match block {
ContentBlock::Text { text: chunk } => text.push_str(chunk),
ContentBlock::ToolUse { id, name, input } => tool_calls.push(ToolCall {
id: id.clone(),
name: name.clone(),
input: input.clone(),
}),
_ => {}
}
}
(text, tool_calls)
}
async fn execute_tools(
runtime: &Arc<LoopRuntime>,
spec: &Arc<AgentSpec>,
state: &mut LoopState,
calls: &[ToolCall],
) -> Vec<ContentBlock> {
state.tool_call_count += calls.len() as u64;
for call in calls {
emit(
runtime,
spec,
EventKind::ToolCallStarted {
tool_name: call.name.clone(),
call_id: call.id.clone(),
input: call.input.clone(),
},
);
}
let tool_ctx = ToolContext::new(runtime.working_directory.clone())
.registry(Arc::clone(&runtime.tools))
.runtime(Arc::clone(runtime))
.caller_spec(Arc::clone(spec));
let raw = runtime.tools.execute(calls, &tool_ctx).await;
let mut blocks = Vec::with_capacity(raw.len());
for (block, result) in raw {
if let ContentBlock::ToolResult { tool_use_id, .. } = &block {
let tool_name = calls
.iter()
.find(|c| c.id == *tool_use_id)
.map(|c| c.name.clone())
.unwrap_or_default();
let event = match result {
ToolResult::Success(output) => EventKind::ToolCallFinished {
tool_name,
call_id: tool_use_id.clone(),
output,
},
ToolResult::Failure(error) => EventKind::ToolCallError {
tool_name,
call_id: tool_use_id.clone(),
error,
},
};
emit(runtime, spec, event);
}
blocks.push(block);
}
blocks
}
fn drain_command_queue(runtime: &LoopRuntime, spec: &AgentSpec, state: &mut LoopState) {
let Some(queue) = runtime.command_queue.as_ref() else {
return;
};
while let Some(cmd) = queue.dequeue_if(Some(&spec.name), |c| c.priority != QueuePriority::Later)
{
state.messages.push(Message::user(cmd.as_user_message()));
}
}
fn drain_pending_messages(runtime: &LoopRuntime, spec: &AgentSpec, state: &mut LoopState) -> bool {
let before = state.messages.len();
drain_command_queue(runtime, spec, state);
state.messages.len() > before
}
async fn idle_until_message(
runtime: &LoopRuntime,
spec: &AgentSpec,
state: &mut LoopState,
) -> bool {
state.is_idle = true;
emit_agent_paused(runtime, spec);
let woken = wait_for_message(runtime, spec, state).await;
state.is_idle = false;
emit_agent_resumed(runtime, spec);
woken
}
async fn wait_for_message(runtime: &LoopRuntime, spec: &AgentSpec, state: &mut LoopState) -> bool {
const POLL_INTERVAL: Duration = Duration::from_millis(100);
loop {
if runtime.cancel_signal.load(Ordering::Relaxed) {
return false;
}
let before = state.messages.len();
drain_command_queue(runtime, spec, state);
if state.messages.len() > before {
return true;
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}
fn emit(runtime: &LoopRuntime, spec: &AgentSpec, kind: EventKind) {
(runtime.event_handler)(Event::new(spec.name.clone(), kind));
}
fn emit_agent_started(runtime: &LoopRuntime, spec: &AgentSpec, description: Option<String>) {
emit(runtime, spec, EventKind::AgentStarted { description });
}
fn emit_turn_started(runtime: &LoopRuntime, spec: &AgentSpec, turn: u32) {
emit(runtime, spec, EventKind::TurnStarted { turn });
}
fn emit_turn_finished(runtime: &LoopRuntime, spec: &AgentSpec, turn: u32) {
emit(runtime, spec, EventKind::TurnFinished { turn });
}
fn emit_request_started(runtime: &LoopRuntime, spec: &AgentSpec) {
emit(
runtime,
spec,
EventKind::RequestStarted {
model: spec.model().name.clone(),
},
);
}
fn emit_request_finished(runtime: &LoopRuntime, spec: &AgentSpec) {
emit(
runtime,
spec,
EventKind::RequestFinished {
model: spec.model().name.clone(),
},
);
}
fn emit_request_retried(
runtime: &LoopRuntime,
spec: &AgentSpec,
attempt: u32,
max_attempts: u32,
error: String,
) {
emit(
runtime,
spec,
EventKind::RequestRetried {
attempt,
max_attempts,
error,
},
);
}
fn emit_request_error(runtime: &LoopRuntime, spec: &AgentSpec, error: String) {
emit(runtime, spec, EventKind::RequestError { error });
}
fn emit_output_truncated(runtime: &LoopRuntime, spec: &AgentSpec, turn: u32) {
emit(runtime, spec, EventKind::OutputTruncated { turn });
}
fn emit_agent_paused(runtime: &LoopRuntime, spec: &AgentSpec) {
emit(runtime, spec, EventKind::AgentPaused);
}
fn emit_agent_resumed(runtime: &LoopRuntime, spec: &AgentSpec) {
emit(runtime, spec, EventKind::AgentResumed);
}
fn record_transcript(
runtime: &LoopRuntime,
entry_type: TranscriptEntryType,
message: &Message,
usage_and_model: Option<(&TokenUsage, &str)>,
) {
let Some(ref store) = runtime.session_store else {
return;
};
store
.lock()
.unwrap()
.record(TranscriptEntry {
recorded_at: now_millis(),
entry_type,
message: message.clone(),
usage: usage_and_model.map(|(u, _)| u.clone()),
model: usage_and_model.map(|(_, m)| m.to_string()),
})
.ok();
}
fn build_output(
spec: &AgentSpec,
state: &LoopState,
text: String,
response: Option<Value>,
status: AgentStatus,
) -> AgentOutput {
AgentOutput {
name: spec.name.clone(),
response,
response_raw: text,
statistics: AgentStatistics {
input_tokens: state.total_usage.input_tokens,
output_tokens: state.total_usage.output_tokens,
requests: state.request_count,
tool_calls: state.tool_call_count,
turns: state.turn,
},
status,
}
}
fn last_assistant_text(messages: &[Message]) -> String {
messages
.iter()
.rev()
.find_map(|m| match m {
Message::Assistant { content } => {
let text: String = content
.iter()
.filter_map(|b| match b {
ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect();
Some(text)
}
_ => None,
})
.unwrap_or_default()
}
#[cfg(test)]
mod tests {
use super::super::agent::Agent;
use super::*;
use crate::agent::queue::{CommandSource, QueuedCommand};
use crate::error::AgenticError;
use crate::provider::types::ContentBlock;
use crate::testutil::*;
fn simple_agent() -> Agent {
Agent::new()
.name("test-agent")
.model_name("mock-model")
.identity_prompt("You are a test assistant.")
}
fn assert_lifecycle_events(harness: &TestHarness, output: &AgentOutput) {
let events = harness.events().all();
let agent_end_status = events.iter().find_map(|e| match &e.kind {
EventKind::AgentFinished { status, .. } => Some(status.clone()),
_ => None,
});
assert_eq!(
agent_end_status.as_ref(),
Some(&output.status),
"AgentFinished status must match output.status"
);
let last_significant = events
.iter()
.rev()
.find(|e| !matches!(e.kind, EventKind::TurnFinished { .. }));
assert!(
matches!(
last_significant.map(|e| &e.kind),
Some(EventKind::AgentFinished { .. })
),
"AgentFinished must be the last significant event"
);
for (i, event) in events.iter().enumerate() {
if matches!(event.kind, EventKind::OutputTruncated { .. }) {
let after_agent_end = events[..i]
.iter()
.any(|e| matches!(e.kind, EventKind::AgentFinished { .. }));
assert!(
!after_agent_end,
"OutputTruncated at {i} after AgentFinished"
);
}
}
}
#[tokio::test]
async fn simple_text_response() {
let harness = TestHarness::new(MockProvider::text("Hello, world!"));
let output = harness.run_agent(&simple_agent(), "Hi").await.unwrap();
assert_eq!(output.response_raw, "Hello, world!");
assert!(output.response.is_none());
assert_eq!(harness.provider().request_count(), 1);
}
#[tokio::test]
async fn failing_tool_emits_tool_call_error() {
let provider = MockProvider::tool_then_text("boom", serde_json::json!({}), "acknowledged");
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.tool(MockTool::error("boom", false, "disk full"));
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap();
let events = harness.events().all();
let saw_error = events.iter().any(|e| {
matches!(
&e.kind,
EventKind::ToolCallError { tool_name, error, .. }
if tool_name == "boom" && error == "disk full"
)
});
let saw_end = events
.iter()
.any(|e| matches!(e.kind, EventKind::ToolCallFinished { .. }));
assert!(saw_error, "a failing tool must emit ToolCallError");
assert!(!saw_end, "a failing tool must not also emit ToolCallEnd");
}
#[tokio::test]
async fn simple_tool_execution() {
let provider =
MockProvider::tool_then_text("echo_tool", serde_json::json!({"text": "ping"}), "Done!");
let agent = Agent::new()
.name("test-agent")
.model_name("mock-model")
.identity_prompt("You are helpful.")
.tool(MockTool::new("echo_tool", false, "pong"));
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "Echo test").await.unwrap();
assert_eq!(output.response_raw, "Done!");
assert_eq!(harness.provider().request_count(), 2);
}
#[tokio::test]
async fn guard_max_turns() {
let provider = MockProvider::new(vec![
tool_response("t", "c1", serde_json::json!({})),
tool_response("t", "c2", serde_json::json!({})),
tool_response("t", "c3", serde_json::json!({})),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_turns(2)
.tool(MockTool::new("t", false, "ok"));
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "go").await.unwrap();
assert_eq!(output.status, AgentStatus::TurnLimitReached { limit: 2 });
assert_eq!(output.statistics.turns, 2);
assert_lifecycle_events(&harness, &output);
}
#[tokio::test]
async fn guard_cancellation() {
let provider = MockProvider::new(vec![
tool_response("t", "c1", serde_json::json!({})),
text_response("done"),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.tool(MockTool::new("t", false, "ok"));
let harness = TestHarness::new(provider);
harness.cancel();
let output = harness.run_agent(&agent, "go").await.unwrap();
assert_eq!(output.status, AgentStatus::Cancelled);
assert_lifecycle_events(&harness, &output);
}
#[tokio::test]
async fn template_variable_interpolates_in_system_prompt() {
let provider = MockProvider::text("Answer about rust");
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("You are an expert on {topic}.");
let harness = TestHarness::new(provider).with_state("topic", serde_json::json!("rust"));
harness.run_agent(&agent, "Tell me").await.unwrap();
let prompts = harness.provider().system_prompts();
assert!(prompts[0].contains("expert on rust"));
}
#[tokio::test]
async fn events_emitted() {
let provider = MockProvider::tool_then_text("read", serde_json::json!({}), "Done");
let agent = Agent::new()
.name("assistant")
.model_name("mock")
.identity_prompt("")
.tool(MockTool::new("read", true, "file contents"));
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "read it").await.unwrap();
let events = harness.events();
assert_eq!(events.agent_starts(), vec!["assistant"]);
assert!(!events.tool_starts().is_empty());
assert!(events.texts().contains(&"Done".to_string()));
assert_eq!(events.agent_ends().len(), 1);
}
#[tokio::test]
async fn command_queue_drains_next_priority() {
use std::sync::Arc;
let provider = MockProvider::new(vec![
tool_response("t", "c1", serde_json::json!({})),
text_response("final"),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.tool(MockTool::new("t", false, "ok"));
let queue = Arc::new(CommandQueue::new());
queue.enqueue(QueuedCommand {
content: "extra instruction".into(),
priority: QueuePriority::Next,
source: CommandSource::UserInput,
agent_name: Some("test".into()),
});
let harness = TestHarness::with_provider_and_queue(Arc::new(provider), queue);
let output = harness.run_agent(&agent, "start").await.unwrap();
assert_eq!(output.response_raw, "final");
let requests = harness.provider().requests.lock().unwrap();
let has_extra = requests[1].messages.iter().any(|m| match m {
Message::User { content } => content.iter().any(|b| match b {
ContentBlock::Text { text } => text.contains("extra instruction"),
_ => false,
}),
_ => false,
});
assert!(has_extra);
}
#[tokio::test]
async fn command_queue_requeues_later_priority() {
use std::sync::Arc;
let provider = MockProvider::new(vec![
tool_response("t", "c1", serde_json::json!({})),
text_response("final"),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.tool(MockTool::new("t", false, "ok"));
let queue = Arc::new(CommandQueue::new());
queue.enqueue(QueuedCommand {
content: "later task".into(),
priority: QueuePriority::Later,
source: CommandSource::TaskNotification {
task_id: "42".into(),
},
agent_name: Some("test".into()),
});
let harness = TestHarness::with_provider_and_queue(Arc::new(provider), queue.clone());
harness.run_agent(&agent, "start").await.unwrap();
let cmd = queue.dequeue_if(Some("test"), |_| true);
assert!(cmd.is_some());
assert_eq!(cmd.unwrap().content, "later task");
}
#[tokio::test]
async fn deferred_tool_filtering() {
let provider = MockProvider::text("ok");
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.tool(MockTool::new("always", true, "ok"))
.tool(DeferredMockTool::new("deferred"));
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap();
let req = harness.provider().last_request().unwrap();
let deferred_def = req.tools.iter().find(|t| t.name == "deferred").unwrap();
assert!(deferred_def.description.is_empty());
}
#[tokio::test]
async fn structured_output_extracted() {
let schema_input = serde_json::json!({"category": "billing", "priority": "high"});
let provider = MockProvider::new(vec![text_response(&schema_input.to_string())]);
let agent = Agent::new()
.name("classifier")
.model_name("mock")
.identity_prompt("Classify.")
.output_schema(serde_json::json!({
"type": "object",
"properties": { "category": {"type": "string"}, "priority": {"type": "string"} },
"required": ["category", "priority"]
}));
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "ticket").await.unwrap();
let so = output.response.unwrap();
assert_eq!(so["category"], "billing");
assert_eq!(so["priority"], "high");
}
#[tokio::test]
async fn structured_output_retry_exhausted() {
let provider = MockProvider::new(vec![
text_response("nope"),
text_response("still nope"),
text_response("nope again"),
text_response("last nope"),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.output_schema(serde_json::json!({
"type": "object",
"properties": {"x": {"type": "string"}},
"required": ["x"]
}))
.max_schema_retries(3);
let harness = TestHarness::new(provider);
let err = harness.run_agent(&agent, "go").await.unwrap_err();
assert!(matches!(
err,
AgenticError::SchemaRetryExhausted { retries: 3 }
));
}
#[tokio::test]
async fn sub_agents_auto_wire_spawn_tool() {
let sub = Agent::new()
.name("helper")
.model_name("mock")
.identity_prompt("I help.");
let provider = MockProvider::text("ok");
let agent = Agent::new()
.name("parent")
.model_name("mock")
.identity_prompt("I coordinate.")
.sub_agents([sub]);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap();
let req = harness.provider().last_request().unwrap();
assert!(
req.tools.iter().any(|t| t.name == "spawn_agent"),
".sub_agents() should register spawn_agent automatically"
);
}
#[tokio::test]
async fn missing_provider_fails_run() {
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("x")
.instruction_prompt("do");
let err = agent.run().await.unwrap_err();
match err {
AgenticError::Other(msg) => assert!(msg.contains("provider"), "got: {msg}"),
other => panic!("expected Other, got {other:?}"),
}
}
#[allow(dead_code)]
fn runtime_with_metadata(meta: &str) -> LoopRuntime {
LoopRuntime {
provider: Arc::new(MockProvider::text("ok")),
event_handler: Arc::new(|_| {}),
cancel_signal: Arc::new(AtomicBool::new(false)),
working_directory: PathBuf::from("/tmp"),
command_queue: None,
session_store: None,
metadata: Some(meta.to_string()),
discovered_tools: Arc::new(Mutex::new(HashSet::new())),
tools: Arc::new(ToolRegistry::new()),
template_variables: HashMap::new(),
}
}
#[tokio::test]
async fn simple_text_response_status_completed() {
let harness = TestHarness::new(MockProvider::text("Hello!"));
let output = harness.run_agent(&simple_agent(), "Hi").await.unwrap();
assert_eq!(output.status, AgentStatus::Completed);
assert_lifecycle_events(&harness, &output);
}
#[tokio::test]
async fn max_tokens_auto_continuation() {
let provider = MockProvider::new(vec![
truncated_response("partial response..."),
text_response("...completed response"),
]);
let harness = TestHarness::new(provider);
let output = harness
.run_agent(&simple_agent(), "write a long essay")
.await
.unwrap();
assert_eq!(output.status, AgentStatus::Completed);
assert_eq!(output.response_raw, "...completed response");
assert_eq!(harness.provider().request_count(), 2);
assert_lifecycle_events(&harness, &output);
let req = &harness.provider().requests.lock().unwrap()[1];
let has_continuation = req.messages.iter().any(|m| match m {
Message::User { content } => content.iter().any(|b| match b {
ContentBlock::Text { text } => text.contains("cut off"),
_ => false,
}),
_ => false,
});
assert!(has_continuation);
}
#[tokio::test]
async fn max_tokens_continuation_events() {
let provider =
MockProvider::new(vec![truncated_response("partial"), text_response("done")]);
let harness = TestHarness::new(provider);
let output = harness.run_agent(&simple_agent(), "go").await.unwrap();
assert_lifecycle_events(&harness, &output);
let truncated: Vec<u32> = harness
.events()
.all()
.iter()
.filter_map(|e| match &e.kind {
EventKind::OutputTruncated { turn } => Some(*turn),
_ => None,
})
.collect();
assert_eq!(truncated, vec![1]);
}
#[tokio::test]
async fn token_budget_guard() {
let mut response = tool_response("t", "c1", serde_json::json!({}));
response.usage = TokenUsage {
input_tokens: 5000,
output_tokens: 100,
..Default::default()
};
let provider = MockProvider::new(vec![response, text_response("done")]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_input_tokens(4000)
.tool(MockTool::new("t", false, "ok"));
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "go").await.unwrap();
assert_eq!(
output.status,
AgentStatus::InputBudgetExhausted {
usage: 5000,
limit: 4000
}
);
assert_lifecycle_events(&harness, &output);
let events = harness.events().all();
let budget_index = events
.iter()
.position(|e| matches!(e.kind, EventKind::InputBudgetExhausted { .. }))
.expect("InputBudgetExhausted must fire before AgentFinished");
let finished_index = events
.iter()
.position(|e| matches!(e.kind, EventKind::AgentFinished { .. }))
.expect("AgentFinished must fire");
assert!(budget_index < finished_index);
assert!(matches!(
events[budget_index].kind,
EventKind::InputBudgetExhausted {
usage: 5000,
limit: 4000,
}
));
}
#[tokio::test]
async fn max_request_tokens_propagates_into_request() {
let provider = MockProvider::text("done");
let agent = simple_agent().max_request_tokens(512);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap();
let req = harness.provider().last_request().unwrap();
assert_eq!(req.max_request_tokens, Some(512));
}
#[tokio::test]
async fn input_budget_guard_fires_at_exact_limit() {
let mut first = tool_response("t", "c1", serde_json::json!({}));
first.usage = TokenUsage {
input_tokens: 4000,
output_tokens: 100,
..Default::default()
};
let provider = MockProvider::new(vec![first, text_response("unused")]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_input_tokens(4000)
.tool(MockTool::new("t", false, "ok"));
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "go").await.unwrap();
assert_eq!(
output.status,
AgentStatus::InputBudgetExhausted {
usage: 4000,
limit: 4000
}
);
}
#[tokio::test]
async fn output_budget_guard_fires_at_exact_limit() {
let mut first = tool_response("t", "c1", serde_json::json!({}));
first.usage = TokenUsage {
input_tokens: 100,
output_tokens: 4000,
..Default::default()
};
let provider = MockProvider::new(vec![first, text_response("unused")]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_output_tokens(4000)
.tool(MockTool::new("t", false, "ok"));
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "go").await.unwrap();
assert_eq!(
output.status,
AgentStatus::OutputBudgetExhausted {
usage: 4000,
limit: 4000
}
);
}
#[tokio::test]
async fn input_budget_event_not_emitted_when_budget_unset() {
let mut response = text_response("done");
response.usage = TokenUsage {
input_tokens: 9_999_999,
output_tokens: 9_999_999,
..Default::default()
};
let provider = MockProvider::new(vec![response]);
let harness = TestHarness::new(provider);
harness.run_agent(&simple_agent(), "go").await.unwrap();
let saw_budget = harness.events().all().iter().any(|e| {
matches!(
e.kind,
EventKind::InputBudgetExhausted { .. } | EventKind::OutputBudgetExhausted { .. }
)
});
assert!(
!saw_budget,
"budget events must not fire when no budget is configured"
);
}
#[tokio::test]
async fn output_budget_trips_before_input_budget() {
let mut response = tool_response("t", "c1", serde_json::json!({}));
response.usage = TokenUsage {
input_tokens: 100,
output_tokens: 5000,
..Default::default()
};
let provider = MockProvider::new(vec![response, text_response("unused")]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_input_tokens(10_000)
.max_output_tokens(4000)
.tool(MockTool::new("t", false, "ok"));
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "go").await.unwrap();
assert_eq!(
output.status,
AgentStatus::OutputBudgetExhausted {
usage: 5000,
limit: 4000
}
);
}
#[tokio::test]
async fn output_token_budget_guard() {
let mut response = tool_response("t", "c1", serde_json::json!({}));
response.usage = TokenUsage {
input_tokens: 100,
output_tokens: 5000,
..Default::default()
};
let provider = MockProvider::new(vec![response, text_response("done")]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_output_tokens(4000)
.tool(MockTool::new("t", false, "ok"));
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "go").await.unwrap();
assert_eq!(
output.status,
AgentStatus::OutputBudgetExhausted {
usage: 5000,
limit: 4000
}
);
assert_lifecycle_events(&harness, &output);
let events = harness.events().all();
let budget_index = events
.iter()
.position(|e| matches!(e.kind, EventKind::OutputBudgetExhausted { .. }))
.expect("OutputBudgetExhausted must fire before AgentFinished");
let finished_index = events
.iter()
.position(|e| matches!(e.kind, EventKind::AgentFinished { .. }))
.expect("AgentFinished must fire");
assert!(budget_index < finished_index);
assert!(matches!(
events[budget_index].kind,
EventKind::OutputBudgetExhausted {
usage: 5000,
limit: 4000,
}
));
}
use crate::agent::queue::CommandQueue;
const AGENT_NAME: &str = "test-agent";
fn peer_msg(target: Option<&str>, from: &str, content: &str) -> QueuedCommand {
QueuedCommand {
content: content.into(),
priority: QueuePriority::Next,
source: CommandSource::PeerMessage {
from: from.into(),
summary: None,
},
agent_name: target.map(|s| s.into()),
}
}
fn task_notification(target: Option<&str>, content: &str) -> QueuedCommand {
QueuedCommand {
content: content.into(),
priority: QueuePriority::Next,
source: CommandSource::TaskNotification {
task_id: "task-1".into(),
},
agent_name: target.map(|s| s.into()),
}
}
fn user_input(target: Option<&str>, content: &str) -> QueuedCommand {
QueuedCommand {
content: content.into(),
priority: QueuePriority::Next,
source: CommandSource::UserInput,
agent_name: target.map(|s| s.into()),
}
}
fn listener_harness(provider: Arc<MockProvider>) -> (TestHarness, Arc<CommandQueue>) {
let queue = Arc::new(CommandQueue::new());
let harness = TestHarness::with_provider_and_queue(provider, queue.clone());
(harness, queue)
}
fn enqueue_after(queue: &Arc<CommandQueue>, delay_ms: u64, cmd: QueuedCommand) {
let q = queue.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
q.enqueue(cmd);
});
}
fn cancel_after(cancel: Arc<AtomicBool>, delay_ms: u64) {
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
cancel.store(true, Ordering::Relaxed);
});
}
fn two_text_responses() -> Arc<MockProvider> {
Arc::new(MockProvider::new(vec![
text_response("first"),
text_response("second"),
]))
}
#[tokio::test]
async fn wake_on_peer_message_targeted_at_me() {
let (harness, queue) = listener_harness(two_text_responses());
enqueue_after(&queue, 120, peer_msg(Some(AGENT_NAME), "peer", "hi"));
cancel_after(harness.cancel_signal_for_test(), 400);
let agent = simple_agent().keep_alive();
let output = harness.run_agent(&agent, "hi").await.unwrap();
assert_eq!(
output.statistics.turns, 2,
"peer message should wake the listener"
);
}
#[tokio::test]
async fn wake_on_task_notification_broadcast() {
let (harness, queue) = listener_harness(two_text_responses());
enqueue_after(&queue, 120, task_notification(None, "Task foo completed"));
cancel_after(harness.cancel_signal_for_test(), 400);
let agent = simple_agent().keep_alive();
let output = harness.run_agent(&agent, "hi").await.unwrap();
assert_eq!(
output.statistics.turns, 2,
"broadcast task notification should wake the listener"
);
}
#[tokio::test]
async fn wake_on_user_input_targeted_at_me() {
let (harness, queue) = listener_harness(two_text_responses());
enqueue_after(&queue, 120, user_input(Some(AGENT_NAME), "hello"));
cancel_after(harness.cancel_signal_for_test(), 400);
let agent = simple_agent().keep_alive();
let output = harness.run_agent(&agent, "hi").await.unwrap();
assert_eq!(
output.statistics.turns, 2,
"user input (targeted) should wake the listener"
);
}
#[tokio::test]
async fn wake_on_user_input_broadcast() {
let (harness, queue) = listener_harness(two_text_responses());
enqueue_after(&queue, 120, user_input(None, "anyone?"));
cancel_after(harness.cancel_signal_for_test(), 400);
let agent = simple_agent().keep_alive();
let output = harness.run_agent(&agent, "hi").await.unwrap();
assert_eq!(
output.statistics.turns, 2,
"user input (broadcast) should wake the listener"
);
}
#[tokio::test]
async fn one_shot_when_keep_alive_unset() {
let harness = TestHarness::new(MockProvider::text("done"));
let agent = simple_agent();
let output = harness.run_agent(&agent, "hi").await.unwrap();
assert_eq!(output.status, AgentStatus::Completed);
assert_eq!(output.statistics.turns, 1);
}
#[tokio::test]
async fn cancel_interrupts_keep_alive() {
let (harness, _queue) = listener_harness(Arc::new(MockProvider::text("done")));
let cancel = harness.cancel_signal_for_test();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
cancel.store(true, Ordering::Relaxed);
});
let agent = simple_agent().keep_alive();
let t0 = std::time::Instant::now();
let _ = harness.run_agent(&agent, "hi").await.unwrap();
let elapsed = t0.elapsed();
assert!(
elapsed >= std::time::Duration::from_millis(100),
"should have actually waited, elapsed = {elapsed:?}"
);
assert!(
elapsed < std::time::Duration::from_millis(1_500),
"should have exited promptly on cancel, elapsed = {elapsed:?}"
);
}
#[tokio::test]
async fn idle_and_resumed_events_fire_in_order() {
let (harness, queue) = listener_harness(two_text_responses());
enqueue_after(&queue, 120, peer_msg(Some(AGENT_NAME), "peer", "hi"));
cancel_after(harness.cancel_signal_for_test(), 400);
let agent = simple_agent().keep_alive();
let _ = harness.run_agent(&agent, "hi").await.unwrap();
let kinds: Vec<&'static str> = harness
.events()
.all()
.iter()
.filter_map(|e| match &e.kind {
EventKind::AgentPaused => Some("idle"),
EventKind::AgentResumed => Some("resumed"),
_ => None,
})
.collect();
let first_idle = kinds.iter().position(|k| *k == "idle").expect("idle fired");
let first_resumed = kinds
.iter()
.position(|k| *k == "resumed")
.expect("resumed fired");
assert!(
first_idle < first_resumed,
"idle must precede resumed: {kinds:?}"
);
}
#[tokio::test]
async fn drain_before_exit_picks_up_preloaded_message() {
let (harness, queue) = listener_harness(two_text_responses());
queue.enqueue(peer_msg(Some(AGENT_NAME), "peer", "pre-loaded"));
let agent = simple_agent();
let output = harness.run_agent(&agent, "hi").await.unwrap();
assert_eq!(
output.statistics.turns, 2,
"drain-before-exit must inject the preloaded message and force a second turn"
);
}
#[tokio::test]
async fn drains_batch_of_messages_into_one_turn() {
let (harness, queue) = listener_harness(two_text_responses());
queue.enqueue(peer_msg(Some(AGENT_NAME), "alice", "first"));
queue.enqueue(peer_msg(Some(AGENT_NAME), "bob", "second"));
cancel_after(harness.cancel_signal_for_test(), 300);
let agent = simple_agent().keep_alive();
let output = harness.run_agent(&agent, "hi").await.unwrap();
assert_eq!(
output.statistics.turns, 2,
"two pending messages should drain into ONE additional turn, not two"
);
}
#[test]
fn compile_uses_externally_supplied_queue_and_cancel() {
let queue = Arc::new(CommandQueue::new());
let cancel = Arc::new(AtomicBool::new(false));
let agent = Agent::new()
.model_name("mock")
.provider(Arc::new(MockProvider::text("x")))
.instruction_prompt("")
.cancel_signal(cancel.clone())
.command_queue(queue.clone());
let (_spec, rt) = agent.compile(None).unwrap();
assert!(
Arc::ptr_eq(rt.command_queue.as_ref().unwrap(), &queue),
"LoopRuntime should reuse the externally supplied queue"
);
assert!(
Arc::ptr_eq(&rt.cancel_signal, &cancel),
"LoopRuntime should reuse the externally supplied cancel signal"
);
}
#[test]
fn compile_allocates_default_queue_when_none_supplied() {
let agent = Agent::new()
.model_name("mock")
.provider(Arc::new(MockProvider::text("x")))
.instruction_prompt("");
let (_spec, rt) = agent.compile(None).unwrap();
assert!(
rt.command_queue.is_some(),
"default queue must be allocated so peer messaging still works"
);
}
}
#[cfg(test)]
mod retry_and_events_tests {
use std::sync::Mutex as StdMutex;
use super::super::agent::Agent;
use super::*;
use crate::error::AgenticError;
use crate::provider::{Model, ProviderError};
use crate::testutil::*;
fn rate_limit_error() -> ProviderError {
ProviderError::RateLimited {
message: "rate limited".into(),
status: 429,
retry_after_ms: None,
}
}
fn retries_in(events: &[Event]) -> Vec<(u32, u32, String)> {
events
.iter()
.filter_map(|e| match &e.kind {
EventKind::RequestRetried {
attempt,
max_attempts,
error,
} => Some((*attempt, *max_attempts, error.clone())),
_ => None,
})
.collect()
}
fn failures_in(events: &[Event]) -> Vec<String> {
events
.iter()
.filter_map(|e| match &e.kind {
EventKind::RequestError { error } => Some(error.clone()),
_ => None,
})
.collect()
}
#[tokio::test]
async fn retry_succeeds_after_rate_limit() {
let provider = MockProvider::with_results(vec![
Err(rate_limit_error()),
Err(rate_limit_error()),
Ok(text_response("hello")),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(10);
let harness = TestHarness::new(provider);
let output = harness.run_agent(&agent, "go").await.unwrap();
assert_eq!(output.response_raw, "hello");
assert_eq!(harness.provider().request_count(), 3);
}
#[tokio::test]
async fn no_retry_on_auth_error() {
let provider = MockProvider::with_results(vec![Err(ProviderError::AuthenticationFailed {
provider_message: "unauthorized".into(),
})]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(10);
let harness = TestHarness::new(provider);
let err = harness.run_agent(&agent, "go").await.unwrap_err();
assert!(matches!(
err,
AgenticError::Provider(ProviderError::AuthenticationFailed { .. })
));
assert_eq!(harness.provider().request_count(), 1);
}
#[tokio::test]
async fn event_sequence_complete() {
let provider = MockProvider::tool_then_text("read", serde_json::json!({}), "done");
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.tool(MockTool::new("read", true, "file contents"));
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap();
let events = harness.events().all();
let names: Vec<&str> = events.iter().map(event_name).collect();
assert_eq!(
names,
vec![
"AgentStarted",
"TurnStarted",
"RequestStarted",
"RequestFinished",
"TokensReported",
"ToolCallStarted",
"ToolCallFinished",
"TurnFinished",
"TurnStarted",
"RequestStarted",
"TextChunkReceived",
"RequestFinished",
"TokensReported",
"AgentFinished",
"TurnFinished",
]
);
}
fn event_name(event: &Event) -> &'static str {
match &event.kind {
EventKind::AgentStarted { .. } => "AgentStarted",
EventKind::AgentFinished { .. } => "AgentFinished",
EventKind::TurnStarted { .. } => "TurnStarted",
EventKind::TurnFinished { .. } => "TurnFinished",
EventKind::RequestStarted { .. } => "RequestStarted",
EventKind::RequestFinished { .. } => "RequestFinished",
EventKind::RequestRetried { .. } => "RequestRetried",
EventKind::RequestError { .. } => "RequestError",
EventKind::TextChunkReceived { .. } => "TextChunkReceived",
EventKind::ToolCallStarted { .. } => "ToolCallStarted",
EventKind::ToolCallFinished { .. } => "ToolCallFinished",
EventKind::ToolCallError { .. } => "ToolCallError",
EventKind::TokensReported { .. } => "TokensReported",
EventKind::OutputTruncated { .. } => "OutputTruncated",
EventKind::ContextCompacted { .. } => "ContextCompacted",
EventKind::InputBudgetExhausted { .. } => "InputBudgetExhausted",
EventKind::OutputBudgetExhausted { .. } => "OutputBudgetExhausted",
EventKind::AgentPaused => "AgentPaused",
EventKind::AgentResumed => "AgentResumed",
}
}
#[tokio::test]
async fn retry_emits_request_retried_with_attempt_numbers() {
let provider = MockProvider::with_results(vec![
Err(rate_limit_error()),
Err(rate_limit_error()),
Ok(text_response("hello")),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(4)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap();
let retries: Vec<(u32, u32)> = harness
.events()
.all()
.iter()
.filter_map(|e| match &e.kind {
EventKind::RequestRetried {
attempt,
max_attempts,
..
} => Some((*attempt, *max_attempts)),
_ => None,
})
.collect();
assert_eq!(retries, vec![(1, 4), (2, 4)]);
let failed_count = harness
.events()
.all()
.iter()
.filter(|e| matches!(e.kind, EventKind::RequestError { .. }))
.count();
assert_eq!(failed_count, 0, "no terminal failure on eventual success");
}
#[tokio::test]
async fn terminal_error_emits_request_failed_once() {
let provider = MockProvider::with_results(vec![Err(ProviderError::AuthenticationFailed {
provider_message: "unauthorized".into(),
})]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
let err = harness.run_agent(&agent, "go").await.unwrap_err();
assert!(matches!(
err,
AgenticError::Provider(ProviderError::AuthenticationFailed { .. })
));
let events = harness.events().all();
let failed: Vec<&str> = events
.iter()
.filter_map(|e| match &e.kind {
EventKind::RequestError { error } => Some(error.as_str()),
_ => None,
})
.collect();
assert_eq!(failed.len(), 1);
assert!(failed[0].contains("unauthorized"));
assert!(!events
.iter()
.any(|e| matches!(e.kind, EventKind::RequestRetried { .. })));
}
#[tokio::test]
async fn retries_exhausted_emits_single_request_failed() {
let provider = MockProvider::with_results(vec![
Err(rate_limit_error()),
Err(rate_limit_error()),
Err(rate_limit_error()),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(2)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap_err();
let events = harness.events().all();
let retries: Vec<(u32, u32)> = retries_in(&events)
.into_iter()
.map(|(a, m, _)| (a, m))
.collect();
assert_eq!(retries, vec![(1, 2), (2, 2)]);
assert_eq!(failures_in(&events).len(), 1);
}
#[tokio::test]
async fn happy_path_emits_no_request_failed() {
let provider = MockProvider::text("done");
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("");
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap();
let events = harness.events().all();
assert!(retries_in(&events).is_empty());
assert!(failures_in(&events).is_empty());
}
#[tokio::test]
async fn max_retries_on_event_matches_spec_max_request_retries() {
for max_retries in [0u32, 1, 3, 5] {
let results: Vec<_> = (0..=max_retries).map(|_| Err(rate_limit_error())).collect();
let provider = MockProvider::with_results(results);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(max_retries)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap_err();
let events = harness.events().all();
let retries = retries_in(&events);
assert_eq!(
retries.len() as u32,
max_retries,
"max_retries={max_retries}"
);
for (_, evt_max_retries, _) in &retries {
assert_eq!(
*evt_max_retries, max_retries,
"event.max_retries must equal spec.max_request_retries (got {evt_max_retries} for {max_retries})"
);
}
}
}
#[tokio::test]
async fn max_request_retries_zero_goes_straight_to_request_failed() {
let provider = MockProvider::with_results(vec![Err(rate_limit_error())]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(0)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap_err();
let events = harness.events().all();
assert!(retries_in(&events).is_empty());
assert_eq!(failures_in(&events).len(), 1);
}
#[tokio::test]
async fn request_retried_carries_provider_error_display() {
let provider = MockProvider::with_results(vec![
Err(ProviderError::ConnectionFailed {
reason: "dns lookup failed: no such host".into(),
}),
Ok(text_response("ok")),
]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap();
let events = harness.events().all();
let retries = retries_in(&events);
assert_eq!(retries.len(), 1);
assert!(
retries[0].2.contains("dns lookup failed"),
"retry error must surface provider message, got: {}",
retries[0].2
);
}
#[tokio::test]
async fn request_failed_carries_terminal_error_display_for_each_non_retryable_variant() {
let cases: Vec<(ProviderError, &'static str)> = vec![
(
ProviderError::AuthenticationFailed {
provider_message: "bad key 401".into(),
},
"bad key 401",
),
(
ProviderError::PermissionDenied {
provider_message: "no access 403".into(),
},
"no access 403",
),
(
ProviderError::ModelNotFound {
provider_message: "unknown-model-xyz".into(),
},
"unknown-model-xyz",
),
(
ProviderError::SafetyFilterTriggered {
provider_message: "blocked by safety-filter-7".into(),
},
"safety-filter-7",
),
(
ProviderError::InvalidResponse {
reason: "malformed-json-token".into(),
},
"malformed-json-token",
),
];
for (err, needle) in cases {
let provider = MockProvider::with_results(vec![Err(err)]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap_err();
let events = harness.events().all();
let failures = failures_in(&events);
assert_eq!(failures.len(), 1, "{needle}");
assert!(
failures[0].contains(needle),
"RequestError must carry error detail '{needle}', got: {}",
failures[0]
);
assert!(retries_in(&events).is_empty(), "{needle}");
}
}
#[tokio::test]
async fn context_window_exceeded_with_known_window_does_not_emit_request_error() {
let provider =
MockProvider::with_results(vec![Err(ProviderError::ContextWindowExceeded {
provider_message: "context overflow".into(),
})]);
let agent = Agent::new()
.name("test")
.model(Model::from_name("mock").context_window_size(100_000))
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap_err();
let events = harness.events().all();
let compact_count = events
.iter()
.filter(|e| matches!(e.kind, EventKind::ContextCompacted { .. }))
.count();
assert_eq!(compact_count, 1);
assert!(failures_in(&events).is_empty());
}
#[tokio::test]
async fn context_window_exceeded_with_unknown_window_emits_request_failed() {
let provider =
MockProvider::with_results(vec![Err(ProviderError::ContextWindowExceeded {
provider_message: "context overflow 413".into(),
})]);
let agent = Agent::new()
.name("test")
.model_name("mock")
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(1);
let harness = TestHarness::new(provider);
harness.run_agent(&agent, "go").await.unwrap_err();
let events = harness.events().all();
let failures = failures_in(&events);
assert_eq!(failures.len(), 1);
assert!(failures[0].contains("context overflow 413"));
}
#[tokio::test(start_paused = true)]
async fn request_retried_fires_after_backoff_sleep_not_before() {
let provider = MockProvider::with_results(vec![
Err(ProviderError::RateLimited {
message: "rl".into(),
status: 429,
retry_after_ms: Some(1_000),
}),
Ok(text_response("ok")),
]);
let collected: Arc<StdMutex<Vec<Event>>> = Arc::new(StdMutex::new(Vec::new()));
let handler: Arc<dyn Fn(Event) + Send + Sync> = {
let c = collected.clone();
Arc::new(move |e| c.lock().unwrap().push(e))
};
let agent = Agent::new()
.name("test")
.model_name("mock")
.provider(Arc::new(provider))
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(1_000)
.event_handler(handler)
.instruction_prompt("go");
let run_fut = agent.run();
let check_fut = async {
for _ in 0..20 {
tokio::task::yield_now().await;
}
let retries = || {
collected
.lock()
.unwrap()
.iter()
.filter(|e| matches!(e.kind, EventKind::RequestRetried { .. }))
.count()
};
assert_eq!(retries(), 0, "no retry event before sleep");
tokio::time::advance(std::time::Duration::from_millis(999)).await;
for _ in 0..20 {
tokio::task::yield_now().await;
}
assert_eq!(retries(), 0, "no retry event at 999ms into backoff");
tokio::time::advance(std::time::Duration::from_millis(2)).await;
for _ in 0..20 {
tokio::task::yield_now().await;
}
assert_eq!(retries(), 1, "retry event fires once sleep completes");
};
let (run_result, _) = tokio::join!(run_fut, check_fut);
run_result.unwrap();
}
#[tokio::test(start_paused = true)]
async fn backoff_between_consecutive_retries_grows_exponentially() {
let provider = MockProvider::with_results(vec![
Err(rate_limit_error()),
Err(rate_limit_error()),
Err(rate_limit_error()),
Err(rate_limit_error()),
]);
let collected: Arc<StdMutex<Vec<Event>>> = Arc::new(StdMutex::new(Vec::new()));
let handler: Arc<dyn Fn(Event) + Send + Sync> = {
let c = collected.clone();
Arc::new(move |e| c.lock().unwrap().push(e))
};
let agent = Agent::new()
.name("test")
.model_name("mock")
.provider(Arc::new(provider))
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(2_000)
.event_handler(handler)
.instruction_prompt("go");
let drain = || async {
for _ in 0..20 {
tokio::task::yield_now().await;
}
};
let retries = || {
collected
.lock()
.unwrap()
.iter()
.filter(|e| matches!(e.kind, EventKind::RequestRetried { .. }))
.count()
};
let run_fut = agent.run();
let check_fut = async {
drain().await;
assert_eq!(retries(), 0, "T=0: no retries yet");
tokio::time::advance(std::time::Duration::from_millis(2_500)).await;
drain().await;
assert_eq!(retries(), 1, "T=2.5s: retry 1 fired");
tokio::time::advance(std::time::Duration::from_millis(3_499)).await;
drain().await;
assert_eq!(retries(), 1, "T≈6s-1ms: retry 2 has not fired yet");
tokio::time::advance(std::time::Duration::from_millis(1_501)).await;
drain().await;
assert_eq!(retries(), 2, "T=7.5s: retry 2 fired");
tokio::time::advance(std::time::Duration::from_millis(6_499)).await;
drain().await;
assert_eq!(retries(), 2, "T≈14s-1ms: retry 3 has not fired yet");
tokio::time::advance(std::time::Duration::from_millis(3_501)).await;
drain().await;
assert_eq!(retries(), 3, "T=17.5s: retry 3 fired");
};
let (run_result, _) = tokio::join!(run_fut, check_fut);
run_result.unwrap_err();
let failures = collected
.lock()
.unwrap()
.iter()
.filter(|e| matches!(e.kind, EventKind::RequestError { .. }))
.count();
assert_eq!(failures, 1, "one terminal failure after retries exhaust");
}
#[tokio::test]
async fn cancel_during_backoff_exits_quickly_with_cancelled_status() {
let provider = MockProvider::with_results(vec![
Err(rate_limit_error()),
Err(rate_limit_error()),
Err(rate_limit_error()),
Err(rate_limit_error()),
Err(rate_limit_error()),
]);
let cancel: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let agent = Agent::new()
.name("test")
.model_name("mock")
.provider(Arc::new(provider))
.identity_prompt("")
.max_request_retries(4)
.request_retry_delay(30_000)
.cancel_signal(cancel.clone())
.instruction_prompt("go");
let cancel_setter = {
let c = cancel.clone();
async move {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
c.store(true, Ordering::Relaxed);
}
};
let start = std::time::Instant::now();
let (run_result, _) = tokio::join!(agent.run(), cancel_setter);
let elapsed = start.elapsed();
let output = run_result.unwrap();
assert_eq!(output.status, AgentStatus::Cancelled);
assert!(
elapsed < std::time::Duration::from_secs(2),
"cancel during a 30s backoff must exit within 2s (took {:?})",
elapsed
);
}
#[tokio::test]
async fn custom_event_handler_observes_retry_and_failure() {
let provider = MockProvider::with_results(vec![
Err(rate_limit_error()),
Err(rate_limit_error()),
Err(ProviderError::AuthenticationFailed {
provider_message: "terminal".into(),
}),
]);
let collected: Arc<StdMutex<Vec<Event>>> = Arc::new(StdMutex::new(Vec::new()));
let handler: Arc<dyn Fn(Event) + Send + Sync> = {
let c = collected.clone();
Arc::new(move |e| c.lock().unwrap().push(e))
};
let agent = Agent::new()
.name("test")
.model_name("mock")
.provider(Arc::new(provider))
.identity_prompt("")
.max_request_retries(3)
.request_retry_delay(1)
.event_handler(handler)
.instruction_prompt("go");
agent.run().await.unwrap_err();
let events = collected.lock().unwrap().clone();
assert_eq!(
retries_in(&events).len(),
2,
"custom handler must receive both retry events"
);
assert_eq!(
failures_in(&events).len(),
1,
"custom handler must receive the terminal failure"
);
}
}