use std::sync::Arc;
use bob_core::{
error::AgentError,
journal::{JournalEntry, ToolJournalPort},
normalize_tool_list,
ports::{
ApprovalPort, ArtifactStorePort, ContextCompactorPort, CostMeterPort, EventSink, LlmPort,
SessionStore, ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
},
types::{
AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
},
};
use futures_util::StreamExt;
use tokio::time::Instant;
const STREAM_CHANNEL_CAPACITY: usize = 256;
#[derive(Debug)]
pub struct LoopGuard {
policy: TurnPolicy,
steps: u32,
tool_calls: u32,
consecutive_errors: u32,
start: Instant,
}
impl LoopGuard {
#[must_use]
pub fn new(policy: TurnPolicy) -> Self {
Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
}
#[must_use]
pub fn can_continue(&self) -> bool {
self.steps < self.policy.max_steps &&
self.tool_calls < self.policy.max_tool_calls &&
self.consecutive_errors < self.policy.max_consecutive_errors &&
!self.timed_out()
}
pub fn record_step(&mut self) {
self.steps += 1;
}
pub fn record_tool_call(&mut self) {
self.tool_calls += 1;
}
pub fn record_error(&mut self) {
self.consecutive_errors += 1;
}
pub fn reset_errors(&mut self) {
self.consecutive_errors = 0;
}
#[must_use]
pub fn reason(&self) -> GuardReason {
if self.steps >= self.policy.max_steps {
GuardReason::MaxSteps
} else if self.tool_calls >= self.policy.max_tool_calls {
GuardReason::MaxToolCalls
} else if self.consecutive_errors >= self.policy.max_consecutive_errors {
GuardReason::MaxConsecutiveErrors
} else if self.timed_out() {
GuardReason::TurnTimeout
} else {
GuardReason::Cancelled
}
}
#[must_use]
pub fn timed_out(&self) -> bool {
self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
}
}
const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
You are a helpful AI assistant. \
Think step by step before answering. \
When you need external information, use the available tools.";
const MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS: u32 = 3;
fn resolve_system_instructions(req: &AgentRequest) -> String {
if let Some(skills_prompt) = req.context.system_prompt.as_deref() {
if skills_prompt.trim().is_empty() {
DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
} else {
format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
}
} else {
DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
}
}
fn resolve_system_instructions_with_memory(
req: &AgentRequest,
session: &bob_core::types::SessionState,
) -> String {
let base_instructions = resolve_system_instructions(req);
crate::memory_context::inject_memory_prompt(
&base_instructions,
session.memory_summary.as_deref(),
)
}
fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
req.context.selected_skills.clone()
}
#[derive(Debug, Clone, Default)]
struct ToolCallPolicy {
deny_tools: Vec<String>,
allow_tools: Option<Vec<String>>,
}
fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
let deny_tools =
normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
let allow_tools = req
.context
.tool_policy
.allow_tools
.as_ref()
.map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
ToolCallPolicy { deny_tools, allow_tools }
}
fn prompt_options_for_mode(
dispatch_mode: crate::DispatchMode,
llm: &dyn LlmPort,
output_schema: Option<serde_json::Value>,
) -> crate::prompt::PromptBuildOptions {
let mut opts = match dispatch_mode {
crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
crate::DispatchMode::NativePreferred => {
if llm.capabilities().native_tool_calling {
crate::prompt::PromptBuildOptions {
include_action_schema: false,
include_tool_schema: false,
..Default::default()
}
} else {
crate::prompt::PromptBuildOptions::default()
}
}
};
opts.structured_output = output_schema;
opts
}
fn parse_action_for_mode(
dispatch_mode: crate::DispatchMode,
llm: &dyn LlmPort,
response: &bob_core::types::LlmResponse,
) -> Result<AgentAction, crate::action::ActionParseError> {
match dispatch_mode {
crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
crate::DispatchMode::NativePreferred => {
if llm.capabilities().native_tool_calling &&
let Some(tool_call) = response.tool_calls.first()
{
return Ok(AgentAction::ToolCall {
name: tool_call.name.clone(),
arguments: tool_call.arguments.clone(),
});
}
crate::action::parse_action(&response.content)
}
}
}
#[expect(
clippy::too_many_arguments,
reason = "tool execution needs explicit policy, approval, and timeout dependencies"
)]
async fn execute_tool_call(
tools: &dyn ToolPort,
guard: &mut LoopGuard,
tool_call: ToolCall,
policy: &ToolCallPolicy,
tool_policy_port: &dyn ToolPolicyPort,
approval_port: &dyn ApprovalPort,
approval_context: &ApprovalContext,
timeout_ms: u64,
) -> ToolResult {
if !tool_policy_port.is_tool_allowed(
&tool_call.name,
&policy.deny_tools,
policy.allow_tools.as_deref(),
) {
guard.record_error();
return ToolResult {
name: tool_call.name.clone(),
output: serde_json::json!({
"error": format!("tool '{}' denied by policy", tool_call.name)
}),
is_error: true,
};
}
match approval_port.approve_tool_call(&tool_call, approval_context).await {
Ok(ApprovalDecision::Approved) => {}
Ok(ApprovalDecision::Denied { reason }) => {
guard.record_error();
return ToolResult {
name: tool_call.name.clone(),
output: serde_json::json!({"error": reason}),
is_error: true,
};
}
Err(err) => {
guard.record_error();
return ToolResult {
name: tool_call.name.clone(),
output: serde_json::json!({"error": err.to_string()}),
is_error: true,
};
}
}
match tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms),
tools.call_tool(tool_call.clone()),
)
.await
{
Ok(Ok(result)) => {
guard.reset_errors();
result
}
Ok(Err(err)) => {
guard.record_error();
ToolResult {
name: tool_call.name,
output: serde_json::json!({"error": err.to_string()}),
is_error: true,
}
}
Err(_) => {
guard.record_error();
ToolResult {
name: tool_call.name,
output: serde_json::json!({"error": "tool call timed out"}),
is_error: true,
}
}
}
}
pub async fn run_turn(
llm: &dyn LlmPort,
tools: &dyn ToolPort,
store: &dyn SessionStore,
events: &dyn EventSink,
req: AgentRequest,
policy: &TurnPolicy,
default_model: &str,
) -> Result<AgentRunResult, AgentError> {
let tool_policy = crate::DefaultToolPolicyPort;
let approval = crate::AllowAllApprovalPort;
let checkpoint_store = crate::NoOpCheckpointStorePort;
let artifact_store = crate::NoOpArtifactStorePort;
let cost_meter = crate::NoOpCostMeterPort;
let compactor = crate::prompt::WindowContextCompactor::default();
let journal = crate::NoOpToolJournalPort;
run_turn_with_extensions(
llm,
tools,
store,
events,
req,
policy,
default_model,
&tool_policy,
&approval,
crate::DispatchMode::NativePreferred,
&checkpoint_store,
&artifact_store,
&cost_meter,
&compactor,
&journal,
)
.await
}
#[cfg_attr(
not(test),
expect(
dead_code,
reason = "reserved wrapper for partial control injection in external integrations"
)
)]
#[expect(
clippy::too_many_arguments,
reason = "wrapper exposes explicit dependency ports for compatibility and testability"
)]
pub(crate) async fn run_turn_with_controls(
llm: &dyn LlmPort,
tools: &dyn ToolPort,
store: &dyn SessionStore,
events: &dyn EventSink,
req: AgentRequest,
policy: &TurnPolicy,
default_model: &str,
tool_policy_port: &dyn ToolPolicyPort,
approval_port: &dyn ApprovalPort,
) -> Result<AgentRunResult, AgentError> {
let checkpoint_store = crate::NoOpCheckpointStorePort;
let artifact_store = crate::NoOpArtifactStorePort;
let cost_meter = crate::NoOpCostMeterPort;
let compactor = crate::prompt::WindowContextCompactor::default();
let journal = crate::NoOpToolJournalPort;
run_turn_with_extensions(
llm,
tools,
store,
events,
req,
policy,
default_model,
tool_policy_port,
approval_port,
crate::DispatchMode::PromptGuided,
&checkpoint_store,
&artifact_store,
&cost_meter,
&compactor,
&journal,
)
.await
}
#[expect(
clippy::too_many_arguments,
reason = "core entrypoint exposes all ports explicitly for adapter injection"
)]
pub(crate) async fn run_turn_with_extensions(
llm: &dyn LlmPort,
tools: &dyn ToolPort,
store: &dyn SessionStore,
events: &dyn EventSink,
req: AgentRequest,
policy: &TurnPolicy,
default_model: &str,
tool_policy_port: &dyn ToolPolicyPort,
approval_port: &dyn ApprovalPort,
dispatch_mode: crate::DispatchMode,
checkpoint_store: &dyn TurnCheckpointStorePort,
artifact_store: &dyn ArtifactStorePort,
cost_meter: &dyn CostMeterPort,
context_compactor: &dyn ContextCompactorPort,
journal: &dyn ToolJournalPort,
) -> Result<AgentRunResult, AgentError> {
let model = req.model.as_deref().unwrap_or(default_model);
let cancel_token = req.cancel_token.clone();
let selected_skills = resolve_selected_skills(&req);
let tool_call_policy = resolve_tool_call_policy(&req);
let mut session = store.load(&req.session_id).await?.unwrap_or_default();
let system_instructions = resolve_system_instructions_with_memory(&req, &session);
let tool_descriptors = tools.list_tools().await?;
let mut guard = LoopGuard::new(policy.clone());
let mut tool_view = crate::progressive_tools::ProgressiveToolView::new(tool_descriptors);
events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
if !selected_skills.is_empty() {
events.emit(AgentEvent::SkillsSelected {
session_id: req.session_id.clone(),
skill_names: selected_skills.clone(),
});
}
session.messages.push(Message::text(Role::User, req.input.clone()));
let mut tool_transcript: Vec<ToolResult> = Vec::new();
let mut total_usage = TokenUsage::default();
let mut consecutive_parse_failures: u32 = 0;
let mut consecutive_validation_failures: u32 = 0;
let max_output_retries = req.max_output_retries;
let mut last_tool_call_signature: Option<String> = None;
let mut same_tool_call_streak: u32 = 0;
loop {
let current_step = guard.steps.saturating_add(1);
if let Some(ref token) = cancel_token &&
token.is_cancelled()
{
return finish_turn(
store,
events,
&req.session_id,
&session,
FinishResult {
content: "Turn cancelled.",
tool_transcript,
usage: total_usage,
finish_reason: FinishReason::Cancelled,
},
)
.await;
}
cost_meter.check_budget(&req.session_id).await?;
if !guard.can_continue() {
let reason = guard.reason();
let msg = format!("Turn stopped: {reason:?}");
return finish_turn(
store,
events,
&req.session_id,
&session,
FinishResult {
content: &msg,
tool_transcript,
usage: total_usage,
finish_reason: FinishReason::GuardExceeded,
},
)
.await;
}
let mut augmented_instructions = system_instructions.clone();
let tool_summary = tool_view.summary_prompt();
if !tool_summary.is_empty() {
augmented_instructions.push('\n');
augmented_instructions.push('\n');
augmented_instructions.push_str(&tool_summary);
}
let active_tools = tool_view.activated_tools();
let llm_request = crate::prompt::build_llm_request_with_options(
model,
&session,
&active_tools,
&augmented_instructions,
prompt_options_for_mode(dispatch_mode, llm, req.output_schema.clone()),
context_compactor,
)
.await;
events.emit(AgentEvent::LlmCallStarted {
session_id: req.session_id.clone(),
step: current_step,
model: model.to_string(),
});
let llm_response = if let Some(ref token) = cancel_token {
tokio::select! {
result = llm.complete(llm_request.clone()) => result?,
() = token.cancelled() => {
return finish_turn(
store, events, &req.session_id, &session,
FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
).await;
}
}
} else {
llm.complete(llm_request).await?
};
guard.record_step();
total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
total_usage.completion_tokens += llm_response.usage.completion_tokens;
session.total_usage.prompt_tokens =
session.total_usage.prompt_tokens.saturating_add(llm_response.usage.prompt_tokens);
session.total_usage.completion_tokens = session
.total_usage
.completion_tokens
.saturating_add(llm_response.usage.completion_tokens);
cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
events.emit(AgentEvent::LlmCallCompleted {
session_id: req.session_id.clone(),
step: current_step,
model: model.to_string(),
usage: llm_response.usage.clone(),
});
let native_tool_call = if llm.capabilities().native_tool_calling {
llm_response.tool_calls.first().cloned()
} else {
None
};
tool_view.activate_hints(&llm_response.content);
let assistant_message = if llm_response.tool_calls.is_empty() {
Message::text(Role::Assistant, llm_response.content.clone())
} else {
Message::assistant_tool_calls(
llm_response.content.clone(),
llm_response.tool_calls.clone(),
)
};
session.messages.push(assistant_message);
let _ = checkpoint_store
.save_checkpoint(&TurnCheckpoint {
session_id: req.session_id.clone(),
step: guard.steps,
tool_calls: guard.tool_calls,
usage: total_usage.clone(),
})
.await;
match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
Ok(action) => {
consecutive_parse_failures = 0;
match action {
AgentAction::Final { content } => {
if let Some(ref schema) = req.output_schema {
match crate::output_validation::validate_output_str(&content, schema) {
Ok(_) => {}
Err(validation_err) => {
consecutive_validation_failures += 1;
if consecutive_validation_failures > max_output_retries {
tracing::warn!(
session_id = %req.session_id,
"output schema validation failed after {} retries",
max_output_retries,
);
} else {
let prompt =
crate::output_validation::validation_error_prompt(
&content,
&validation_err,
);
session.messages.push(Message::text(Role::User, prompt));
continue;
}
}
}
}
return finish_turn(
store,
events,
&req.session_id,
&session,
FinishResult {
content: &content,
tool_transcript,
usage: total_usage,
finish_reason: FinishReason::Stop,
},
)
.await;
}
AgentAction::AskUser { question } => {
return finish_turn(
store,
events,
&req.session_id,
&session,
FinishResult {
content: &question,
tool_transcript,
usage: total_usage,
finish_reason: FinishReason::Stop,
},
)
.await;
}
AgentAction::ToolCall { name, arguments } => {
let tool_call_id = native_tool_call
.as_ref()
.filter(|call| call.name == name && call.arguments == arguments)
.and_then(|call| call.call_id.clone());
tool_view.activate(&name);
let call_signature = format!(
"{}:{}",
name,
serde_json::to_string(&arguments).unwrap_or_default()
);
if last_tool_call_signature.as_ref() == Some(&call_signature) {
same_tool_call_streak = same_tool_call_streak.saturating_add(1);
} else {
same_tool_call_streak = 1;
last_tool_call_signature = Some(call_signature);
}
if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
events.emit(AgentEvent::ToolCallStarted {
session_id: req.session_id.clone(),
step: current_step,
name: name.clone(),
});
let dup_result = ToolResult {
name: name.clone(),
output: serde_json::json!({
"error": format!(
"consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
)
}),
is_error: true,
};
guard.record_tool_call();
let _ =
cost_meter.record_tool_result(&req.session_id, &dup_result).await;
let output_str =
serde_json::to_string(&dup_result.output).unwrap_or_default();
session.messages.push(Message::tool_result(
name.clone(),
tool_call_id.clone(),
output_str,
));
events.emit(AgentEvent::ToolCallCompleted {
session_id: req.session_id.clone(),
step: current_step,
name: name.clone(),
is_error: true,
});
let _ = artifact_store
.put(ArtifactRecord {
session_id: req.session_id.clone(),
kind: "tool_result".to_string(),
name: name.clone(),
content: dup_result.output.clone(),
})
.await;
tool_transcript.push(dup_result);
continue;
}
events.emit(AgentEvent::ToolCallStarted {
session_id: req.session_id.clone(),
step: current_step,
name: name.clone(),
});
let approval_context = ApprovalContext {
session_id: req.session_id.clone(),
turn_step: guard.steps.max(1),
selected_skills: selected_skills.clone(),
};
let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
let tool_result = if let Ok(Some(cached)) =
journal.lookup(&req.session_id, &call_fingerprint).await
{
tracing::debug!(
session_id = %req.session_id,
tool = %name,
"replaying tool result from journal"
);
ToolResult {
name: cached.tool_name,
output: cached.result,
is_error: cached.is_error,
}
} else {
let result = execute_tool_call(
tools,
&mut guard,
ToolCall::new(name.clone(), arguments.clone()),
&tool_call_policy,
tool_policy_port,
approval_port,
&approval_context,
policy.tool_timeout_ms,
)
.await;
let _ = journal
.append(JournalEntry {
session_id: req.session_id.clone(),
call_fingerprint: call_fingerprint.clone(),
tool_name: name.clone(),
arguments: arguments.clone(),
result: result.output.clone(),
is_error: result.is_error,
timestamp_ms: bob_core::tape::now_ms(),
})
.await;
result
};
guard.record_tool_call();
let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
let is_error = tool_result.is_error;
events.emit(AgentEvent::ToolCallCompleted {
session_id: req.session_id.clone(),
step: current_step,
name: name.clone(),
is_error,
});
let output_str =
serde_json::to_string(&tool_result.output).unwrap_or_default();
session.messages.push(Message::tool_result(
name.clone(),
tool_call_id,
output_str,
));
let _ = artifact_store
.put(ArtifactRecord {
session_id: req.session_id.clone(),
kind: "tool_result".to_string(),
name: name.clone(),
content: tool_result.output.clone(),
})
.await;
tool_transcript.push(tool_result);
}
}
}
Err(_parse_err) => {
consecutive_parse_failures += 1;
last_tool_call_signature = None;
same_tool_call_streak = 0;
if consecutive_parse_failures >= 2 {
let _ = store.save(&req.session_id, &session).await;
return Err(AgentError::Internal(
"LLM produced invalid JSON after re-prompt".into(),
));
}
session.messages.push(Message::text(
Role::User,
"Your response was not valid JSON. \
Please respond with exactly one JSON object \
matching the required schema.",
));
}
}
}
}
struct FinishResult<'a> {
content: &'a str,
tool_transcript: Vec<ToolResult>,
usage: TokenUsage,
finish_reason: FinishReason,
}
async fn finish_turn(
store: &dyn SessionStore,
events: &dyn EventSink,
session_id: &bob_core::types::SessionId,
session: &bob_core::types::SessionState,
result: FinishResult<'_>,
) -> Result<AgentRunResult, AgentError> {
store.save(session_id, session).await?;
events.emit(AgentEvent::TurnCompleted {
session_id: session_id.clone(),
finish_reason: result.finish_reason,
usage: result.usage.clone(),
});
Ok(AgentRunResult::Finished(AgentResponse {
content: result.content.to_string(),
tool_transcript: result.tool_transcript,
usage: result.usage,
finish_reason: result.finish_reason,
}))
}
pub async fn run_turn_stream(
llm: Arc<dyn LlmPort>,
tools: Arc<dyn ToolPort>,
store: Arc<dyn SessionStore>,
events: Arc<dyn EventSink>,
req: AgentRequest,
policy: TurnPolicy,
default_model: String,
) -> Result<AgentEventStream, AgentError> {
let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
Arc::new(crate::NoOpCheckpointStorePort);
let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
let context_compactor: Arc<dyn ContextCompactorPort> =
Arc::new(crate::prompt::WindowContextCompactor::default());
let journal: Arc<dyn ToolJournalPort> = Arc::new(crate::NoOpToolJournalPort);
run_turn_stream_with_controls(
llm,
tools,
store,
events,
req,
policy,
default_model,
tool_policy,
approval,
crate::DispatchMode::NativePreferred,
checkpoint_store,
artifact_store,
cost_meter,
context_compactor,
journal,
)
.await
}
#[expect(
clippy::too_many_arguments,
reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
)]
pub(crate) async fn run_turn_stream_with_controls(
llm: Arc<dyn LlmPort>,
tools: Arc<dyn ToolPort>,
store: Arc<dyn SessionStore>,
events: Arc<dyn EventSink>,
req: AgentRequest,
policy: TurnPolicy,
default_model: String,
tool_policy: Arc<dyn ToolPolicyPort>,
approval: Arc<dyn ApprovalPort>,
dispatch_mode: crate::DispatchMode,
checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
artifact_store: Arc<dyn ArtifactStorePort>,
cost_meter: Arc<dyn CostMeterPort>,
context_compactor: Arc<dyn ContextCompactorPort>,
journal: Arc<dyn ToolJournalPort>,
) -> Result<AgentEventStream, AgentError> {
let (tx, rx) = flume::bounded::<AgentStreamEvent>(STREAM_CHANNEL_CAPACITY);
let config = StreamRunConfig {
policy,
default_model,
tool_policy,
approval,
dispatch_mode,
checkpoint_store,
artifact_store,
cost_meter,
context_compactor,
journal,
};
tokio::spawn(async move {
if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
{
let _ = tx.send_async(AgentStreamEvent::Error { error: err.to_string() }).await;
}
});
Ok(Box::pin(rx.into_stream()))
}
struct StreamRunConfig {
policy: TurnPolicy,
default_model: String,
tool_policy: Arc<dyn ToolPolicyPort>,
approval: Arc<dyn ApprovalPort>,
dispatch_mode: crate::DispatchMode,
checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
artifact_store: Arc<dyn ArtifactStorePort>,
cost_meter: Arc<dyn CostMeterPort>,
context_compactor: Arc<dyn ContextCompactorPort>,
journal: Arc<dyn ToolJournalPort>,
}
async fn run_turn_stream_inner(
llm: Arc<dyn LlmPort>,
tools: Arc<dyn ToolPort>,
store: Arc<dyn SessionStore>,
events: Arc<dyn EventSink>,
req: AgentRequest,
config: &StreamRunConfig,
tx: &flume::Sender<AgentStreamEvent>,
) -> Result<(), AgentError> {
let model = req.model.as_deref().unwrap_or(&config.default_model);
let cancel_token = req.cancel_token.clone();
let selected_skills = resolve_selected_skills(&req);
let tool_call_policy = resolve_tool_call_policy(&req);
let mut session = store.load(&req.session_id).await?.unwrap_or_default();
let system_instructions = resolve_system_instructions_with_memory(&req, &session);
let tool_descriptors = tools.list_tools().await?;
let mut guard = LoopGuard::new(config.policy.clone());
let mut total_usage = TokenUsage::default();
let mut consecutive_parse_failures: u32 = 0;
let mut next_call_id: u64 = 0;
let mut last_tool_call_signature: Option<String> = None;
let mut same_tool_call_streak: u32 = 0;
events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
if !selected_skills.is_empty() {
events.emit(AgentEvent::SkillsSelected {
session_id: req.session_id.clone(),
skill_names: selected_skills.clone(),
});
}
session.messages.push(Message::text(Role::User, req.input.clone()));
loop {
let current_step = guard.steps.saturating_add(1);
if let Some(ref token) = cancel_token &&
token.is_cancelled()
{
events.emit(AgentEvent::Error {
session_id: req.session_id.clone(),
step: Some(current_step),
error: "turn cancelled".to_string(),
});
events.emit(AgentEvent::TurnCompleted {
session_id: req.session_id.clone(),
finish_reason: FinishReason::Cancelled,
usage: total_usage.clone(),
});
store.save(&req.session_id, &session).await?;
let _ = tx
.send_async(AgentStreamEvent::Error { error: "turn cancelled".to_string() })
.await;
let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
return Ok(());
}
config.cost_meter.check_budget(&req.session_id).await?;
if !guard.can_continue() {
let reason = guard.reason();
let msg = format!("Turn stopped: {reason:?}");
events.emit(AgentEvent::Error {
session_id: req.session_id.clone(),
step: Some(current_step),
error: msg.clone(),
});
events.emit(AgentEvent::TurnCompleted {
session_id: req.session_id.clone(),
finish_reason: FinishReason::GuardExceeded,
usage: total_usage.clone(),
});
store.save(&req.session_id, &session).await?;
let _ = tx.send_async(AgentStreamEvent::Error { error: msg }).await;
let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
return Ok(());
}
let llm_request = crate::prompt::build_llm_request_with_options(
model,
&session,
&tool_descriptors,
&system_instructions,
prompt_options_for_mode(config.dispatch_mode, llm.as_ref(), req.output_schema.clone()),
config.context_compactor.as_ref(),
);
events.emit(AgentEvent::LlmCallStarted {
session_id: req.session_id.clone(),
step: current_step,
model: model.to_string(),
});
let mut assistant_content = String::new();
let mut llm_usage = TokenUsage::default();
let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
let mut llm_finish_reason = FinishReason::Stop;
let mut fallback_to_complete = false;
let llm_request = llm_request.await;
if llm.capabilities().native_tool_calling {
fallback_to_complete = true;
} else {
match llm.complete_stream(llm_request.clone()).await {
Ok(mut stream) => {
while let Some(item) = stream.next().await {
match item {
Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
assistant_content.push_str(&delta);
let _ = tx
.send_async(AgentStreamEvent::TextDelta { content: delta })
.await;
}
Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
llm_usage = usage;
}
Err(err) => {
events.emit(AgentEvent::Error {
session_id: req.session_id.clone(),
step: Some(current_step),
error: err.to_string(),
});
return Err(AgentError::Llm(err));
}
}
}
}
Err(err) => {
fallback_to_complete = true;
let _ = err;
}
}
}
if fallback_to_complete {
let llm_response = llm.complete(llm_request).await?;
assistant_content = llm_response.content.clone();
llm_usage = llm_response.usage;
llm_finish_reason = llm_response.finish_reason;
llm_tool_calls = llm_response.tool_calls;
let _ =
tx.send_async(AgentStreamEvent::TextDelta { content: llm_response.content }).await;
}
guard.record_step();
total_usage.prompt_tokens += llm_usage.prompt_tokens;
total_usage.completion_tokens += llm_usage.completion_tokens;
session.total_usage.prompt_tokens =
session.total_usage.prompt_tokens.saturating_add(llm_usage.prompt_tokens);
session.total_usage.completion_tokens =
session.total_usage.completion_tokens.saturating_add(llm_usage.completion_tokens);
config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
events.emit(AgentEvent::LlmCallCompleted {
session_id: req.session_id.clone(),
step: current_step,
model: model.to_string(),
usage: llm_usage.clone(),
});
let native_tool_call = if llm.capabilities().native_tool_calling {
llm_tool_calls.first().cloned()
} else {
None
};
let assistant_message = if llm_tool_calls.is_empty() {
Message::text(Role::Assistant, assistant_content.clone())
} else {
Message::assistant_tool_calls(assistant_content.clone(), llm_tool_calls.clone())
};
session.messages.push(assistant_message);
let _ = config
.checkpoint_store
.save_checkpoint(&TurnCheckpoint {
session_id: req.session_id.clone(),
step: guard.steps,
tool_calls: guard.tool_calls,
usage: total_usage.clone(),
})
.await;
let response_for_dispatch = bob_core::types::LlmResponse {
content: assistant_content.clone(),
usage: llm_usage.clone(),
finish_reason: llm_finish_reason,
tool_calls: llm_tool_calls,
};
if let Ok(action) =
parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
{
consecutive_parse_failures = 0;
match action {
AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
store.save(&req.session_id, &session).await?;
events.emit(AgentEvent::TurnCompleted {
session_id: req.session_id.clone(),
finish_reason: FinishReason::Stop,
usage: total_usage.clone(),
});
let _ = tx
.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() })
.await;
return Ok(());
}
AgentAction::ToolCall { name, arguments } => {
let tool_call_id = native_tool_call
.as_ref()
.filter(|call| call.name == name && call.arguments == arguments)
.and_then(|call| call.call_id.clone());
let call_signature = format!(
"{}:{}",
name,
serde_json::to_string(&arguments).unwrap_or_default()
);
if last_tool_call_signature.as_ref() == Some(&call_signature) {
same_tool_call_streak = same_tool_call_streak.saturating_add(1);
} else {
same_tool_call_streak = 1;
last_tool_call_signature = Some(call_signature);
}
if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
next_call_id += 1;
let call_id = format!("call-{next_call_id}");
events.emit(AgentEvent::ToolCallStarted {
session_id: req.session_id.clone(),
step: current_step,
name: name.clone(),
});
let _ = tx.send(AgentStreamEvent::ToolCallStarted {
name: name.clone(),
call_id: call_id.clone(),
});
guard.record_tool_call();
let duplicate_result = ToolResult {
name: name.clone(),
output: serde_json::json!({
"error": format!(
"consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
)
}),
is_error: true,
};
let _ = config
.cost_meter
.record_tool_result(&req.session_id, &duplicate_result)
.await;
events.emit(AgentEvent::ToolCallCompleted {
session_id: req.session_id.clone(),
step: current_step,
name: name.clone(),
is_error: true,
});
let output_str =
serde_json::to_string(&duplicate_result.output).unwrap_or_default();
session.messages.push(Message::tool_result(
name.clone(),
tool_call_id.clone(),
output_str,
));
let _ = config
.artifact_store
.put(ArtifactRecord {
session_id: req.session_id.clone(),
kind: "tool_result".to_string(),
name: name.clone(),
content: duplicate_result.output.clone(),
})
.await;
let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
call_id,
result: duplicate_result,
});
continue;
}
events.emit(AgentEvent::ToolCallStarted {
session_id: req.session_id.clone(),
step: current_step,
name: name.clone(),
});
next_call_id += 1;
let call_id = format!("call-{next_call_id}");
let _ = tx.send(AgentStreamEvent::ToolCallStarted {
name: name.clone(),
call_id: call_id.clone(),
});
let approval_context = ApprovalContext {
session_id: req.session_id.clone(),
turn_step: guard.steps.max(1),
selected_skills: selected_skills.clone(),
};
let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
let tool_result = if let Ok(Some(cached)) =
config.journal.lookup(&req.session_id, &call_fingerprint).await
{
tracing::debug!(
session_id = %req.session_id,
tool = %name,
"replaying tool result from journal"
);
ToolResult {
name: cached.tool_name,
output: cached.result,
is_error: cached.is_error,
}
} else {
let result = execute_tool_call(
tools.as_ref(),
&mut guard,
ToolCall::new(name.clone(), arguments.clone()),
&tool_call_policy,
config.tool_policy.as_ref(),
config.approval.as_ref(),
&approval_context,
config.policy.tool_timeout_ms,
)
.await;
let _ = config
.journal
.append(JournalEntry {
session_id: req.session_id.clone(),
call_fingerprint: call_fingerprint.clone(),
tool_name: name.clone(),
arguments: arguments.clone(),
result: result.output.clone(),
is_error: result.is_error,
timestamp_ms: bob_core::tape::now_ms(),
})
.await;
result
};
guard.record_tool_call();
let _ =
config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
let is_error = tool_result.is_error;
events.emit(AgentEvent::ToolCallCompleted {
session_id: req.session_id.clone(),
step: current_step,
name: name.clone(),
is_error,
});
let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
call_id,
result: tool_result.clone(),
});
let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
session.messages.push(Message::tool_result(
name.clone(),
tool_call_id,
output_str,
));
let _ = config
.artifact_store
.put(ArtifactRecord {
session_id: req.session_id.clone(),
kind: "tool_result".to_string(),
name: name.clone(),
content: tool_result.output.clone(),
})
.await;
}
}
} else {
consecutive_parse_failures += 1;
last_tool_call_signature = None;
same_tool_call_streak = 0;
if consecutive_parse_failures >= 2 {
store.save(&req.session_id, &session).await?;
events.emit(AgentEvent::Error {
session_id: req.session_id.clone(),
step: Some(current_step),
error: "LLM produced invalid JSON after re-prompt".to_string(),
});
return Err(AgentError::Internal(
"LLM produced invalid JSON after re-prompt".into(),
));
}
session.messages.push(Message::text(
Role::User,
"Your response was not valid JSON. \
Please respond with exactly one JSON object \
matching the required schema.",
));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_policy() -> TurnPolicy {
TurnPolicy {
max_steps: 3,
max_tool_calls: 2,
max_consecutive_errors: 2,
turn_timeout_ms: 100,
tool_timeout_ms: 50,
}
}
#[test]
fn trips_on_max_steps() {
let mut guard = LoopGuard::new(test_policy());
assert!(guard.can_continue());
for _ in 0..3 {
guard.record_step();
}
assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
assert_eq!(guard.reason(), GuardReason::MaxSteps);
}
#[test]
fn trips_on_max_tool_calls() {
let mut guard = LoopGuard::new(test_policy());
assert!(guard.can_continue());
for _ in 0..2 {
guard.record_tool_call();
}
assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
}
#[test]
fn trips_on_max_consecutive_errors() {
let mut guard = LoopGuard::new(test_policy());
assert!(guard.can_continue());
for _ in 0..2 {
guard.record_error();
}
assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
}
#[tokio::test]
async fn trips_on_timeout() {
let guard = LoopGuard::new(test_policy());
assert!(guard.can_continue());
assert!(!guard.timed_out());
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
assert!(!guard.can_continue(), "guard should trip after timeout");
assert!(guard.timed_out());
assert_eq!(guard.reason(), GuardReason::TurnTimeout);
}
#[test]
fn reset_errors_clears_counter() {
let mut guard = LoopGuard::new(test_policy());
guard.record_error();
guard.reset_errors();
guard.record_error();
assert!(guard.can_continue(), "single error after reset should not trip guard");
}
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, Mutex},
};
use bob_core::{
error::{CostError, LlmError, StoreError, ToolError},
ports::{
ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
},
types::{
AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
TurnCheckpoint,
},
};
use futures_util::StreamExt;
struct SequentialLlm {
responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
}
impl SequentialLlm {
fn from_contents(contents: Vec<&str>) -> Self {
let responses = contents
.into_iter()
.map(|c| {
Ok(LlmResponse {
content: c.to_string(),
usage: TokenUsage::default(),
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
})
})
.collect();
Self { responses: Mutex::new(responses) }
}
fn from_responses(responses: Vec<LlmResponse>) -> Self {
let queued = responses.into_iter().map(Ok).collect();
Self { responses: Mutex::new(queued) }
}
}
#[async_trait::async_trait]
impl LlmPort for SequentialLlm {
async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
q.pop_front().unwrap_or_else(|| {
Ok(LlmResponse {
content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
usage: TokenUsage::default(),
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
})
})
}
async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
Err(LlmError::Provider("not implemented".into()))
}
}
struct MockToolPort {
tools: Vec<ToolDescriptor>,
call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
}
impl MockToolPort {
fn empty() -> Self {
Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
}
fn with_tool_and_results(
tool_name: &str,
results: Vec<Result<ToolResult, ToolError>>,
) -> Self {
Self {
tools: vec![
ToolDescriptor::new(tool_name, format!("{tool_name} tool"))
.with_input_schema(serde_json::json!({"type": "object"})),
],
call_results: Mutex::new(results.into()),
}
}
}
#[async_trait::async_trait]
impl ToolPort for MockToolPort {
async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
Ok(self.tools.clone())
}
async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
q.pop_front().unwrap_or_else(|| {
Ok(ToolResult {
name: call.name,
output: serde_json::json!({"result": "default"}),
is_error: false,
})
})
}
}
struct NoCallToolPort {
tools: Vec<ToolDescriptor>,
}
#[async_trait::async_trait]
impl ToolPort for NoCallToolPort {
async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
Ok(self.tools.clone())
}
async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
Err(ToolError::Execution(
"tool call should be blocked by policy before execution".to_string(),
))
}
}
struct AllowAllPolicyPort;
impl ToolPolicyPort for AllowAllPolicyPort {
fn is_tool_allowed(
&self,
_tool: &str,
_deny_tools: &[String],
_allow_tools: Option<&[String]>,
) -> bool {
true
}
}
struct DenySearchPolicyPort;
impl ToolPolicyPort for DenySearchPolicyPort {
fn is_tool_allowed(
&self,
tool: &str,
_deny_tools: &[String],
_allow_tools: Option<&[String]>,
) -> bool {
tool != "search"
}
}
struct AlwaysApprovePort;
#[async_trait::async_trait]
impl ApprovalPort for AlwaysApprovePort {
async fn approve_tool_call(
&self,
_call: &ToolCall,
_context: &ApprovalContext,
) -> Result<ApprovalDecision, ToolError> {
Ok(ApprovalDecision::Approved)
}
}
struct AlwaysDenyApprovalPort;
#[async_trait::async_trait]
impl ApprovalPort for AlwaysDenyApprovalPort {
async fn approve_tool_call(
&self,
_call: &ToolCall,
_context: &ApprovalContext,
) -> Result<ApprovalDecision, ToolError> {
Ok(ApprovalDecision::Denied {
reason: "approval policy rejected tool call".to_string(),
})
}
}
struct CountingCheckpointPort {
saved: Mutex<Vec<TurnCheckpoint>>,
}
impl CountingCheckpointPort {
fn new() -> Self {
Self { saved: Mutex::new(Vec::new()) }
}
}
#[async_trait::async_trait]
impl TurnCheckpointStorePort for CountingCheckpointPort {
async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
Ok(())
}
async fn load_latest(
&self,
_session_id: &SessionId,
) -> Result<Option<TurnCheckpoint>, StoreError> {
Ok(None)
}
}
struct NoopArtifactStore;
#[async_trait::async_trait]
impl ArtifactStorePort for NoopArtifactStore {
async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
Ok(())
}
async fn list_by_session(
&self,
_session_id: &SessionId,
) -> Result<Vec<ArtifactRecord>, StoreError> {
Ok(Vec::new())
}
}
struct CountingArtifactStore {
saved: Mutex<Vec<ArtifactRecord>>,
}
impl CountingArtifactStore {
fn new() -> Self {
Self { saved: Mutex::new(Vec::new()) }
}
}
#[async_trait::async_trait]
impl ArtifactStorePort for CountingArtifactStore {
async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError> {
self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(artifact);
Ok(())
}
async fn list_by_session(
&self,
_session_id: &SessionId,
) -> Result<Vec<ArtifactRecord>, StoreError> {
Ok(self.saved.lock().unwrap_or_else(|p| p.into_inner()).clone())
}
}
struct CountingCostMeter {
llm_calls: Mutex<u32>,
tool_results: Mutex<u32>,
}
impl CountingCostMeter {
fn new() -> Self {
Self { llm_calls: Mutex::new(0), tool_results: Mutex::new(0) }
}
}
#[async_trait::async_trait]
impl CostMeterPort for CountingCostMeter {
async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
Ok(())
}
async fn record_llm_usage(
&self,
_session_id: &SessionId,
_model: &str,
_usage: &TokenUsage,
) -> Result<(), CostError> {
let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
*count += 1;
Ok(())
}
async fn record_tool_result(
&self,
_session_id: &SessionId,
_tool_result: &ToolResult,
) -> Result<(), CostError> {
let mut count = self.tool_results.lock().unwrap_or_else(|p| p.into_inner());
*count += 1;
Ok(())
}
}
struct MemoryStore {
data: Mutex<HashMap<SessionId, SessionState>>,
}
impl MemoryStore {
fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
}
}
struct FailingSaveStore;
#[async_trait::async_trait]
impl SessionStore for FailingSaveStore {
async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
Ok(None)
}
async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
Err(StoreError::Backend("simulated save failure".into()))
}
}
#[async_trait::async_trait]
impl SessionStore for MemoryStore {
async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
Ok(map.get(id).cloned())
}
async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
map.insert(id.clone(), state.clone());
Ok(())
}
}
struct CollectingSink {
events: Mutex<Vec<AgentEvent>>,
}
impl CollectingSink {
fn new() -> Self {
Self { events: Mutex::new(Vec::new()) }
}
fn event_count(&self) -> usize {
self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
}
fn all_events(&self) -> Vec<AgentEvent> {
self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
}
}
impl EventSink for CollectingSink {
fn emit(&self, event: AgentEvent) {
self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
}
}
fn make_request(input: &str) -> AgentRequest {
AgentRequest {
input: input.into(),
session_id: "test-session".into(),
model: None,
context: bob_core::types::RequestContext::default(),
cancel_token: None,
output_schema: None,
max_output_retries: 0,
}
}
fn generous_policy() -> TurnPolicy {
TurnPolicy {
max_steps: 20,
max_tool_calls: 10,
max_consecutive_errors: 3,
turn_timeout_ms: 30_000,
tool_timeout_ms: 5_000,
}
}
struct StreamLlm {
chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
}
impl StreamLlm {
fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
Self { chunks: Mutex::new(chunks.into()) }
}
}
#[async_trait::async_trait]
impl LlmPort for StreamLlm {
async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
Err(LlmError::Provider("complete() should not be called in stream test".into()))
}
async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
Ok(Box::pin(futures_util::stream::iter(items)))
}
}
struct InspectingLlm {
expected_substring: String,
}
#[async_trait::async_trait]
impl LlmPort for InspectingLlm {
async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
let system = req
.messages
.iter()
.find(|m| m.role == Role::System)
.map(|m| m.content.clone())
.unwrap_or_default();
if !system.contains(&self.expected_substring) {
return Err(LlmError::Provider(format!(
"expected system prompt to include '{}', got: {}",
self.expected_substring, system
)));
}
Ok(LlmResponse {
content: r#"{"type": "final", "content": "ok"}"#.to_string(),
usage: TokenUsage::default(),
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
})
}
async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
Err(LlmError::Provider("not used".into()))
}
}
#[tokio::test]
async fn tc01_simple_final_response() {
let llm =
SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
let tools = MockToolPort::empty();
let store = MemoryStore::new();
let sink = CollectingSink::new();
let result = run_turn(
&llm,
&tools,
&store,
&sink,
make_request("Hi"),
&generous_policy(),
"test-model",
)
.await;
assert!(
matches!(&result, Ok(AgentRunResult::Finished(_))),
"expected Finished, got {result:?}"
);
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.content, "Hello there!");
assert_eq!(resp.finish_reason, FinishReason::Stop);
assert!(resp.tool_transcript.is_empty());
assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
}
#[tokio::test]
async fn tc02_tool_call_then_final() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
r#"{"type": "final", "content": "Found results."}"#,
]);
let tools = MockToolPort::with_tool_and_results(
"search",
vec![Ok(ToolResult {
name: "search".into(),
output: serde_json::json!({"hits": 42}),
is_error: false,
})],
);
let store = MemoryStore::new();
let sink = CollectingSink::new();
let result = run_turn(
&llm,
&tools,
&store,
&sink,
make_request("Search for rust"),
&generous_policy(),
"test-model",
)
.await;
assert!(
matches!(&result, Ok(AgentRunResult::Finished(_))),
"expected Finished, got {result:?}"
);
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.content, "Found results.");
assert_eq!(resp.finish_reason, FinishReason::Stop);
assert_eq!(resp.tool_transcript.len(), 1);
assert_eq!(resp.tool_transcript[0].name, "search");
assert!(!resp.tool_transcript[0].is_error);
}
#[tokio::test]
async fn tc03_parse_error_reprompt_success() {
let llm = SequentialLlm::from_contents(vec![
"This is not JSON at all.",
r#"{"type": "final", "content": "Recovered"}"#,
]);
let tools = MockToolPort::empty();
let store = MemoryStore::new();
let sink = CollectingSink::new();
let result = run_turn(
&llm,
&tools,
&store,
&sink,
make_request("Hi"),
&generous_policy(),
"test-model",
)
.await;
assert!(
matches!(&result, Ok(AgentRunResult::Finished(_))),
"expected Finished after re-prompt, got {result:?}"
);
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.content, "Recovered");
assert_eq!(resp.finish_reason, FinishReason::Stop);
}
#[tokio::test]
async fn tc04_double_parse_error() {
let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
let tools = MockToolPort::empty();
let store = MemoryStore::new();
let sink = CollectingSink::new();
let result = run_turn(
&llm,
&tools,
&store,
&sink,
make_request("Hi"),
&generous_policy(),
"test-model",
)
.await;
assert!(result.is_err(), "should return error after two parse failures");
let msg = match result {
Err(err) => err.to_string(),
Ok(value) => format!("unexpected success: {value:?}"),
};
assert!(msg.contains("invalid JSON"), "error message = {msg}");
}
#[tokio::test]
async fn tc05_max_steps_exhaustion() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
]);
let tools = MockToolPort::with_tool_and_results(
"t1",
vec![
Ok(ToolResult {
name: "t1".into(),
output: serde_json::json!(null),
is_error: false,
}),
Ok(ToolResult {
name: "t1".into(),
output: serde_json::json!(null),
is_error: false,
}),
Ok(ToolResult {
name: "t1".into(),
output: serde_json::json!(null),
is_error: false,
}),
],
);
let store = MemoryStore::new();
let sink = CollectingSink::new();
let policy = TurnPolicy {
max_steps: 2,
max_tool_calls: 10,
max_consecutive_errors: 5,
turn_timeout_ms: 30_000,
tool_timeout_ms: 5_000,
};
let result =
run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
.await;
assert!(
matches!(&result, Ok(AgentRunResult::Finished(_))),
"expected Finished with GuardExceeded, got {result:?}"
);
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
}
#[tokio::test]
async fn tc06_cancellation() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type": "final", "content": "should not reach"}"#,
]);
let tools = MockToolPort::empty();
let store = MemoryStore::new();
let sink = CollectingSink::new();
let token = CancelToken::new();
token.cancel();
let mut req = make_request("Hi");
req.cancel_token = Some(token);
let result =
run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
assert!(
matches!(&result, Ok(AgentRunResult::Finished(_))),
"expected Finished with Cancelled, got {result:?}"
);
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.finish_reason, FinishReason::Cancelled);
}
#[tokio::test]
async fn tc07_tool_error_then_final() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
r#"{"type": "final", "content": "Recovered from tool error."}"#,
]);
let tools = MockToolPort::with_tool_and_results(
"flaky_tool",
vec![Err(ToolError::Execution("connection refused".into()))],
);
let store = MemoryStore::new();
let sink = CollectingSink::new();
let result = run_turn(
&llm,
&tools,
&store,
&sink,
make_request("call flaky"),
&generous_policy(),
"test-model",
)
.await;
assert!(
matches!(&result, Ok(AgentRunResult::Finished(_))),
"expected Finished, got {result:?}"
);
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.content, "Recovered from tool error.");
assert_eq!(resp.tool_transcript.len(), 1);
assert!(resp.tool_transcript[0].is_error);
}
#[tokio::test]
async fn tc08_save_failure_is_propagated() {
let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
let tools = MockToolPort::empty();
let store = FailingSaveStore;
let sink = CollectingSink::new();
let result = run_turn(
&llm,
&tools,
&store,
&sink,
make_request("hello"),
&generous_policy(),
"test-model",
)
.await;
assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
}
#[tokio::test]
async fn tc09_stream_turn_emits_text_and_finished() {
let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
Ok(LlmStreamChunk::Done {
usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
}),
]));
let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
let stream_result = run_turn_stream(
llm,
tools,
store,
sink,
make_request("hello"),
generous_policy(),
"test-model".to_string(),
)
.await;
assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
let mut stream = match stream_result {
Ok(stream) => stream,
Err(_) => return,
};
let mut saw_text = false;
let mut saw_finished = false;
while let Some(event) = stream.next().await {
match event {
AgentStreamEvent::TextDelta { content } => {
saw_text = saw_text || !content.is_empty();
}
AgentStreamEvent::Finished { usage } => {
saw_finished = true;
assert_eq!(usage.prompt_tokens, 3);
assert_eq!(usage.completion_tokens, 4);
}
AgentStreamEvent::ToolCallStarted { .. } |
AgentStreamEvent::ToolCallCompleted { .. } |
AgentStreamEvent::Error { .. } => {}
}
}
assert!(saw_text, "expected at least one text delta");
assert!(saw_finished, "expected a finished event");
}
#[tokio::test]
async fn tc10_skills_prompt_context_is_injected() {
let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
let tools = MockToolPort::empty();
let store = MemoryStore::new();
let sink = CollectingSink::new();
let mut req = make_request("review this code");
req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
let result =
run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
assert!(result.is_ok(), "run should succeed when skills prompt is injected");
}
#[tokio::test]
async fn tc11_selected_skills_context_emits_event() {
let llm =
SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
let tools = MockToolPort::empty();
let store = MemoryStore::new();
let sink = CollectingSink::new();
let mut req = make_request("review code");
req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
let result =
run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
assert!(result.is_ok(), "run should succeed");
let events = sink.all_events();
assert!(
events.iter().any(|event| matches!(
event,
AgentEvent::SkillsSelected { skill_names, .. }
if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
)),
"skills.selected event should be emitted with context skill names"
);
}
#[tokio::test]
async fn tc12_policy_deny_tool_blocks_execution() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
r#"{"type": "final", "content": "done"}"#,
]);
let tools = NoCallToolPort {
tools: vec![
ToolDescriptor::new("search", "search tool")
.with_input_schema(serde_json::json!({"type":"object"})),
],
};
let store = MemoryStore::new();
let sink = CollectingSink::new();
let mut req = make_request("search rust");
req.context.tool_policy.deny_tools =
vec!["search".to_string(), "local/shell_exec".to_string()];
let result =
run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
assert!(
matches!(&result, Ok(AgentRunResult::Finished(_))),
"expected finished response, got {result:?}"
);
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.finish_reason, FinishReason::Stop);
assert_eq!(resp.tool_transcript.len(), 1);
assert!(resp.tool_transcript[0].is_error);
assert!(
resp.tool_transcript[0].output.to_string().contains("denied"),
"tool error should explain policy denial"
);
}
#[tokio::test]
async fn tc13_approval_denied_blocks_execution() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
r#"{"type": "final", "content": "done"}"#,
]);
let tools = NoCallToolPort {
tools: vec![
ToolDescriptor::new("search", "search tool")
.with_input_schema(serde_json::json!({"type":"object"})),
],
};
let store = MemoryStore::new();
let sink = CollectingSink::new();
let req = make_request("search rust");
let tool_policy = AllowAllPolicyPort;
let approval = AlwaysDenyApprovalPort;
let result = run_turn_with_controls(
&llm,
&tools,
&store,
&sink,
req,
&generous_policy(),
"test-model",
&tool_policy,
&approval,
)
.await;
assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.tool_transcript.len(), 1);
assert!(resp.tool_transcript[0].is_error);
assert!(
resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
"tool error should explain approval denial"
);
}
#[tokio::test]
async fn tc14_custom_policy_port_blocks_execution() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
r#"{"type": "final", "content": "done"}"#,
]);
let tools = NoCallToolPort {
tools: vec![
ToolDescriptor::new("search", "search tool")
.with_input_schema(serde_json::json!({"type":"object"})),
],
};
let store = MemoryStore::new();
let sink = CollectingSink::new();
let req = make_request("search rust");
let tool_policy = DenySearchPolicyPort;
let approval = AlwaysApprovePort;
let result = run_turn_with_controls(
&llm,
&tools,
&store,
&sink,
req,
&generous_policy(),
"test-model",
&tool_policy,
&approval,
)
.await;
assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.tool_transcript.len(), 1);
assert!(resp.tool_transcript[0].is_error);
assert!(
resp.tool_transcript[0].output.to_string().contains("denied"),
"tool error should explain policy denial"
);
}
#[tokio::test]
async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
struct NativeToolLlm {
responses: Mutex<VecDeque<LlmResponse>>,
}
#[async_trait::async_trait]
impl LlmPort for NativeToolLlm {
fn capabilities(&self) -> bob_core::types::LlmCapabilities {
bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
}
async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
Ok(q.pop_front().unwrap_or(LlmResponse {
content: r#"{"type":"final","content":"fallback"}"#.to_string(),
usage: TokenUsage::default(),
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
}))
}
async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
Err(LlmError::Provider("not used".into()))
}
}
let llm = NativeToolLlm {
responses: Mutex::new(VecDeque::from(vec![
LlmResponse {
content: "ignored".to_string(),
usage: TokenUsage::default(),
finish_reason: FinishReason::Stop,
tool_calls: vec![
ToolCall::new("search", serde_json::json!({"q":"rust"}))
.with_call_id("call-search-1"),
],
},
LlmResponse {
content: r#"{"type":"final","content":"done"}"#.to_string(),
usage: TokenUsage::default(),
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
},
])),
};
let tools = MockToolPort::with_tool_and_results(
"search",
vec![Ok(ToolResult {
name: "search".to_string(),
output: serde_json::json!({"hits": 2}),
is_error: false,
})],
);
let store = MemoryStore::new();
let sink = CollectingSink::new();
let checkpoint = CountingCheckpointPort::new();
let artifacts = NoopArtifactStore;
let cost = CountingCostMeter::new();
let policy = AllowAllPolicyPort;
let approval = AlwaysApprovePort;
let result = run_turn_with_extensions(
&llm,
&tools,
&store,
&sink,
make_request("search rust"),
&generous_policy(),
"test-model",
&policy,
&approval,
crate::DispatchMode::NativePreferred,
&checkpoint,
&artifacts,
&cost,
&crate::prompt::WindowContextCompactor::default(),
&crate::NoOpToolJournalPort,
)
.await;
assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
let resp = match result {
Ok(AgentRunResult::Finished(r)) => r,
_ => return,
};
assert_eq!(resp.tool_transcript.len(), 1);
assert_eq!(resp.tool_transcript[0].name, "search");
let saved = store.load(&"test-session".to_string()).await;
let saved = match saved {
Ok(Some(state)) => state,
other => panic!("expected saved state, got {other:?}"),
};
assert!(
saved.messages.iter().any(|message| {
message.role == Role::Assistant &&
message.tool_calls.len() == 1 &&
message.tool_calls[0].call_id.as_deref() == Some("call-search-1")
}),
"assistant tool call should be persisted structurally",
);
assert!(
saved.messages.iter().any(|message| {
message.role == Role::Tool &&
message.tool_call_id.as_deref() == Some("call-search-1") &&
message.tool_name.as_deref() == Some("search")
}),
"tool result should retain tool metadata",
);
}
#[tokio::test]
async fn tc16_checkpoint_and_cost_ports_are_invoked() {
let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
let tools = MockToolPort::empty();
let store = MemoryStore::new();
let sink = CollectingSink::new();
let checkpoint = CountingCheckpointPort::new();
let artifacts = NoopArtifactStore;
let cost = CountingCostMeter::new();
let policy = AllowAllPolicyPort;
let approval = AlwaysApprovePort;
let result = run_turn_with_extensions(
&llm,
&tools,
&store,
&sink,
make_request("hello"),
&generous_policy(),
"test-model",
&policy,
&approval,
crate::DispatchMode::PromptGuided,
&checkpoint,
&artifacts,
&cost,
&crate::prompt::WindowContextCompactor::default(),
&crate::NoOpToolJournalPort,
)
.await;
assert!(result.is_ok(), "turn should succeed");
let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
assert!(llm_calls >= 1, "cost meter should record llm usage");
}
#[tokio::test]
async fn tc17_session_usage_accumulates_and_persists() {
let llm_first = SequentialLlm::from_responses(vec![LlmResponse {
content: r#"{"type":"final","content":"first"}"#.to_string(),
usage: TokenUsage { prompt_tokens: 10, completion_tokens: 5 },
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
}]);
let llm_second = SequentialLlm::from_responses(vec![LlmResponse {
content: r#"{"type":"final","content":"second"}"#.to_string(),
usage: TokenUsage { prompt_tokens: 3, completion_tokens: 2 },
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
}]);
let tools = MockToolPort::empty();
let store = MemoryStore::new();
let sink = CollectingSink::new();
let first = run_turn(
&llm_first,
&tools,
&store,
&sink,
make_request("hello"),
&generous_policy(),
"test-model",
)
.await;
assert!(first.is_ok(), "first run should succeed");
let second = run_turn(
&llm_second,
&tools,
&store,
&sink,
make_request("again"),
&generous_policy(),
"test-model",
)
.await;
assert!(second.is_ok(), "second run should succeed");
let loaded = store.load(&"test-session".to_string()).await;
assert!(loaded.is_ok(), "session should be persisted");
let state = loaded.ok().flatten();
assert!(state.is_some(), "session state should exist");
let state = state.unwrap_or_default();
assert_eq!(state.total_usage.prompt_tokens, 13);
assert_eq!(state.total_usage.completion_tokens, 7);
}
struct FallbackOnlyLlm {
response: LlmResponse,
}
#[async_trait::async_trait]
impl LlmPort for FallbackOnlyLlm {
async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
Ok(self.response.clone())
}
async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
Err(LlmError::Provider("streaming not available".to_string()))
}
}
#[tokio::test]
async fn tc18_stream_fallback_does_not_emit_error_event() {
let llm: Arc<dyn LlmPort> = Arc::new(FallbackOnlyLlm {
response: LlmResponse {
content: r#"{"type":"final","content":"done"}"#.to_string(),
usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
},
});
let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let sink = Arc::new(CollectingSink::new());
let sink_dyn: Arc<dyn EventSink> = sink.clone();
let stream_result = run_turn_stream(
llm,
tools,
store,
sink_dyn,
make_request("hello"),
generous_policy(),
"test-model".to_string(),
)
.await;
assert!(stream_result.is_ok(), "stream run should succeed with fallback");
let mut stream = match stream_result {
Ok(stream) => stream,
Err(_) => return,
};
while let Some(_event) = stream.next().await {}
let events = sink.all_events();
assert!(
!events.iter().any(|event| matches!(event, AgentEvent::Error { .. })),
"fallback should not emit AgentEvent::Error when complete() succeeds"
);
}
struct NativeStreamingBypassLlm {
complete_calls: std::sync::atomic::AtomicUsize,
complete_stream_calls: std::sync::atomic::AtomicUsize,
response: LlmResponse,
}
#[async_trait::async_trait]
impl LlmPort for NativeStreamingBypassLlm {
fn capabilities(&self) -> bob_core::types::LlmCapabilities {
bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
}
async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
self.complete_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(self.response.clone())
}
async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
self.complete_stream_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Err(LlmError::Provider(
"complete_stream() should be bypassed for native tool calling".to_string(),
))
}
}
#[tokio::test]
async fn tc19_stream_native_tool_calling_bypasses_complete_stream() {
let llm_impl = Arc::new(NativeStreamingBypassLlm {
complete_calls: std::sync::atomic::AtomicUsize::new(0),
complete_stream_calls: std::sync::atomic::AtomicUsize::new(0),
response: LlmResponse {
content: r#"{"type":"final","content":"done"}"#.to_string(),
usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
finish_reason: FinishReason::Stop,
tool_calls: Vec::new(),
},
});
let llm: Arc<dyn LlmPort> = llm_impl.clone();
let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
let sink = Arc::new(CollectingSink::new());
let sink_dyn: Arc<dyn EventSink> = sink.clone();
let stream_result = run_turn_stream(
llm,
tools,
store,
sink_dyn,
make_request("hello"),
generous_policy(),
"test-model".to_string(),
)
.await;
assert!(
stream_result.is_ok(),
"stream run should succeed through complete() for native tool calling"
);
let mut stream = match stream_result {
Ok(stream) => stream,
Err(_) => return,
};
while let Some(_event) = stream.next().await {}
assert_eq!(
llm_impl.complete_calls.load(std::sync::atomic::Ordering::SeqCst),
1,
"native tool-calling stream path should use complete()"
);
assert_eq!(
llm_impl.complete_stream_calls.load(std::sync::atomic::Ordering::SeqCst),
0,
"native tool-calling stream path should bypass complete_stream()"
);
assert!(
!sink.all_events().iter().any(|event| matches!(event, AgentEvent::Error { .. })),
"native-tool fallback should stay on the success path"
);
}
#[tokio::test]
async fn tc20_non_consecutive_duplicate_tool_calls_are_allowed() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"tool_call","name":"tool_b","arguments":{"q":"docs"}}"#,
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"final","content":"done"}"#,
]);
let tools = MockToolPort::with_tool_and_results(
"tool_a",
vec![
Ok(ToolResult {
name: "tool_a".to_string(),
output: serde_json::json!({"ok": 1}),
is_error: false,
}),
Ok(ToolResult {
name: "tool_b".to_string(),
output: serde_json::json!({"ok": 2}),
is_error: false,
}),
Ok(ToolResult {
name: "tool_a".to_string(),
output: serde_json::json!({"ok": 3}),
is_error: false,
}),
],
);
let store = MemoryStore::new();
let sink = CollectingSink::new();
let result = run_turn(
&llm,
&tools,
&store,
&sink,
make_request("repeat searches"),
&generous_policy(),
"test-model",
)
.await;
assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
let resp = match result {
Ok(AgentRunResult::Finished(resp)) => resp,
_ => return,
};
assert_eq!(resp.tool_transcript.len(), 3);
assert!(resp.tool_transcript.iter().all(|entry| !entry.is_error));
}
#[tokio::test]
async fn tc21_excessive_consecutive_duplicate_tool_calls_are_blocked() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"final","content":"done"}"#,
]);
let tools = MockToolPort::with_tool_and_results(
"tool_a",
vec![
Ok(ToolResult {
name: "tool_a".to_string(),
output: serde_json::json!({"ok": 1}),
is_error: false,
}),
Ok(ToolResult {
name: "tool_a".to_string(),
output: serde_json::json!({"ok": 2}),
is_error: false,
}),
Ok(ToolResult {
name: "tool_a".to_string(),
output: serde_json::json!({"ok": 3}),
is_error: false,
}),
],
);
let store = MemoryStore::new();
let sink = CollectingSink::new();
let result = run_turn(
&llm,
&tools,
&store,
&sink,
make_request("poll repeatedly"),
&generous_policy(),
"test-model",
)
.await;
assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
let resp = match result {
Ok(AgentRunResult::Finished(resp)) => resp,
_ => return,
};
assert_eq!(resp.tool_transcript.len(), 4);
assert!(!resp.tool_transcript[2].is_error);
assert!(resp.tool_transcript[3].is_error);
assert!(
resp.tool_transcript[3]
.output
.to_string()
.contains("consecutive duplicate tool call limit reached"),
"expected duplicate-call protection error in transcript"
);
}
#[tokio::test]
async fn tc22_duplicate_block_path_records_artifact_and_cost() {
let llm = SequentialLlm::from_contents(vec![
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
r#"{"type":"final","content":"done"}"#,
]);
let tools = MockToolPort::with_tool_and_results(
"tool_a",
vec![
Ok(ToolResult {
name: "tool_a".to_string(),
output: serde_json::json!({"ok": 1}),
is_error: false,
}),
Ok(ToolResult {
name: "tool_a".to_string(),
output: serde_json::json!({"ok": 2}),
is_error: false,
}),
Ok(ToolResult {
name: "tool_a".to_string(),
output: serde_json::json!({"ok": 3}),
is_error: false,
}),
],
);
let store = MemoryStore::new();
let sink = CollectingSink::new();
let checkpoint = CountingCheckpointPort::new();
let artifacts = CountingArtifactStore::new();
let cost = CountingCostMeter::new();
let policy = AllowAllPolicyPort;
let approval = AlwaysApprovePort;
let result = run_turn_with_extensions(
&llm,
&tools,
&store,
&sink,
make_request("poll repeatedly"),
&generous_policy(),
"test-model",
&policy,
&approval,
crate::DispatchMode::PromptGuided,
&checkpoint,
&artifacts,
&cost,
&crate::prompt::WindowContextCompactor::default(),
&crate::NoOpToolJournalPort,
)
.await;
assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
let tool_results = *cost.tool_results.lock().unwrap_or_else(|p| p.into_inner());
assert_eq!(tool_results, 4, "cost meter should record all tool outcomes");
let saved_artifacts = artifacts.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
assert_eq!(saved_artifacts, 4, "artifact store should record all tool outcomes");
}
}