use futures::stream::StreamExt;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio_util::sync::CancellationToken;
use crate::config::LoopConfig;
use crate::error::{LoopError, StreamError};
use crate::event::AgentEvent;
use crate::exec::{execute_tool_batch, ExecutedBatch};
use crate::plugin::TransformContext;
use crate::stream::{ReasoningEffort, StreamErrorKind, StreamEvent, StreamRequest, ToolSchema};
use crate::types::{
AgentContext, AgentMessage, AssistantContent, StopReason, ToolResultContent, Usage,
};
const EMPTY_STREAM_MAX_ATTEMPTS: u8 = 3;
const EMPTY_STREAM_RETRY_INITIAL_DELAY: std::time::Duration = std::time::Duration::from_millis(250);
const ZERO_OUTPUT_TRANSPORT_MAX_ATTEMPTS: u8 = 2;
const ZERO_OUTPUT_TRANSPORT_RETRY_INITIAL_DELAY: std::time::Duration =
std::time::Duration::from_millis(500);
const ZERO_OUTPUT_TRANSPORT_RECOVERY_CONTEXT: &str = "\
[runtime context — transport recovery, not user instruction]\n\
The previous provider attempt produced no actionable output: no visible assistant text and no usable tool call reached the runtime. \
It may have produced private-only reasoning or an unusable burst of partial tool calls. \
Do not continue with private reasoning only. Re-read the latest observation and immediately choose exactly one next structured tool call; \
if the answer is ready, use the final response tool.";
const MAX_PLAIN_TEXT_NUDGE_RETRIES: usize = 2;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LoopOutcome {
Done,
WrappedUp,
HitMaxIterations,
}
impl LoopOutcome {
pub fn is_complete(self) -> bool {
matches!(self, LoopOutcome::Done | LoopOutcome::WrappedUp)
}
pub fn label(self) -> &'static str {
match self {
LoopOutcome::Done => "done",
LoopOutcome::WrappedUp => "wrapped_up",
LoopOutcome::HitMaxIterations => "hit_max_iterations",
}
}
}
#[derive(Debug, Clone)]
pub struct RunResult {
pub messages: Vec<AgentMessage>,
pub outcome: LoopOutcome,
}
pub async fn run(
prompts: Vec<AgentMessage>,
context: AgentContext,
config: &LoopConfig,
signal: CancellationToken,
) -> Result<RunResult, LoopError> {
let mut current = context;
let mut new_messages = prompts.clone();
current.messages.extend(prompts.iter().cloned());
emit(config, AgentEvent::AgentStart).await;
if let Some(identity) = current.identity.clone() {
emit(config, AgentEvent::RunIdentified { identity }).await;
}
emit(config, AgentEvent::TurnStart).await;
for prompt in &prompts {
emit(
config,
AgentEvent::MessageStart {
message: prompt.clone(),
},
)
.await;
emit(
config,
AgentEvent::MessageEnd {
message: prompt.clone(),
},
)
.await;
}
let outcome = inner_run(&mut current, &mut new_messages, config, &signal).await?;
Ok(RunResult {
messages: new_messages,
outcome,
})
}
pub async fn run_continue(
context: AgentContext,
config: &LoopConfig,
signal: CancellationToken,
) -> Result<RunResult, LoopError> {
let last = context
.messages
.last()
.ok_or_else(|| LoopError::InvalidContinuation("no messages in context".into()))?;
if matches!(last, AgentMessage::Assistant { .. }) {
return Err(LoopError::InvalidContinuation(
"trailing message is assistant".into(),
));
}
let mut current = context;
let mut new_messages = Vec::new();
emit(config, AgentEvent::AgentStart).await;
if let Some(identity) = current.identity.clone() {
emit(config, AgentEvent::RunIdentified { identity }).await;
}
emit(config, AgentEvent::TurnStart).await;
let outcome = inner_run(&mut current, &mut new_messages, config, &signal).await?;
Ok(RunResult {
messages: new_messages,
outcome,
})
}
async fn emit(config: &LoopConfig, event: AgentEvent) {
config.event_sink.emit(event.clone()).await;
for observer in &config.plugins.event_observer {
observer.on_event(&event).await;
}
}
async fn inner_run(
current: &mut AgentContext,
new_messages: &mut Vec<AgentMessage>,
config: &LoopConfig,
signal: &CancellationToken,
) -> Result<LoopOutcome, LoopError> {
let mut first_turn = true;
let mut iterations: usize = 0;
let mut empty_outcomes_seen: usize = 0;
let mut last_turn_stopped_without_tool = false;
let mut plain_text_terminal_fallback_candidate: Option<AgentMessage> = None;
let mut pending = collect_steering(config).await;
'outer: loop {
let mut has_more_tool_calls = true;
let mut last_batch_terminated = false;
while has_more_tool_calls || !pending.is_empty() {
if signal.is_cancelled() {
return Err(LoopError::Aborted);
}
if let Some(max) = config.max_iterations {
if iterations >= max {
break;
}
}
iterations += 1;
if !first_turn {
emit(config, AgentEvent::TurnStart).await;
} else {
first_turn = false;
}
if !pending.is_empty() {
for msg in pending.drain(..) {
emit(
config,
AgentEvent::MessageStart {
message: msg.clone(),
},
)
.await;
emit(
config,
AgentEvent::MessageEnd {
message: msg.clone(),
},
)
.await;
current.messages.push(msg.clone());
new_messages.push(msg);
}
}
let (assistant, turn_allowlist) =
stream_with_max_tokens_recovery(current, config, signal, iterations - 1).await?;
current.messages.push(assistant.clone());
new_messages.push(assistant.clone());
let stop_reason = match &assistant {
AgentMessage::Assistant { stop_reason, .. } => *stop_reason,
_ => StopReason::Other,
};
if matches!(stop_reason, StopReason::Error | StopReason::Aborted) {
let loop_error = match &assistant {
AgentMessage::Assistant {
stop_reason: StopReason::Aborted,
..
} => LoopError::Aborted,
AgentMessage::Assistant { error_message, .. } => LoopError::Stream(
StreamError::Transient(error_message.clone().unwrap_or_else(|| {
"assistant stream ended with error stop reason".into()
})),
),
_ => LoopError::Stream(StreamError::Transient(
"assistant stream ended with error stop reason".into(),
)),
};
emit(
config,
AgentEvent::TurnEnd {
message: assistant,
tool_results: Vec::new(),
},
)
.await;
emit(
config,
AgentEvent::AgentEnd {
messages: new_messages.clone(),
},
)
.await;
return Err(loop_error);
}
let tool_calls: Vec<_> = match &assistant {
AgentMessage::Assistant { content, .. } => {
content.tool_calls().into_iter().cloned().collect()
}
_ => Vec::new(),
};
last_turn_stopped_without_tool = tool_calls.is_empty();
if last_turn_stopped_without_tool {
empty_outcomes_seen = empty_outcomes_seen.saturating_add(1);
}
let mut tool_result_messages = Vec::new();
has_more_tool_calls = false;
if tool_calls.is_empty() {
if let Some(tool_name) = config.plain_text_terminal_fallback_tool.as_deref() {
let eager = config.plain_text_terminal_fallback_eager;
let terminal_tool_names = config.protocol.terminal_tool_names();
let narrowed_to_terminators = is_terminal_only_allowlist(
turn_allowlist.as_ref(),
tool_name,
&terminal_tool_names,
);
let preserve_plain_text_candidate = plain_assistant_text(&assistant)
.is_some_and(|text| should_preserve_plain_text_terminal_candidate(&text));
if plain_text_terminal_fallback_candidate.is_none()
&& preserve_plain_text_candidate
{
plain_text_terminal_fallback_candidate = Some(assistant.clone());
}
let nudge_mode = config.plain_text_terminal_fallback_eager_nudge
&& eager
&& !narrowed_to_terminators
&& empty_outcomes_seen <= MAX_PLAIN_TEXT_NUDGE_RETRIES;
if nudge_mode {
let available_tool_names: Vec<&str> =
config.tools.iter().map(|t| t.name()).collect();
let nudge_text = config
.protocol
.plain_text_recovery_prompt(crate::protocol::PlainTextRecoveryContext {
messages: ¤t.messages,
iteration: iterations - 1,
available_tool_names: &available_tool_names,
terminal_fallback_tool: Some(tool_name),
})
.unwrap_or_else(|| {
crate::protocol::DEFAULT_PLAIN_TEXT_RECOVERY_PROMPT.to_string()
});
let nudge = AgentMessage::System {
content: nudge_text,
timestamp: Some(now_ms()),
};
current.messages.push(nudge.clone());
new_messages.push(nudge);
has_more_tool_calls = true;
} else if let Some(result_msg) = synthesize_plain_text_terminal_result(
plain_text_terminal_fallback_candidate
.as_ref()
.unwrap_or(&assistant),
turn_allowlist.as_ref(),
tool_name,
eager,
&terminal_tool_names,
) {
plain_text_terminal_fallback_candidate = None;
last_turn_stopped_without_tool = false;
empty_outcomes_seen = 0;
last_batch_terminated = true;
current.messages.push(result_msg.clone());
new_messages.push(result_msg.clone());
tool_result_messages.push(result_msg);
}
}
} else {
let ExecutedBatch {
messages,
terminate,
} = execute_tool_batch(
&assistant,
tool_calls,
current,
config,
signal,
turn_allowlist.as_ref(),
)
.await?;
empty_outcomes_seen = 0;
plain_text_terminal_fallback_candidate = None;
tool_result_messages = messages;
has_more_tool_calls = !terminate;
last_batch_terminated = terminate;
for result_msg in &tool_result_messages {
current.messages.push(result_msg.clone());
new_messages.push(result_msg.clone());
}
}
emit(
config,
AgentEvent::TurnEnd {
message: assistant,
tool_results: tool_result_messages,
},
)
.await;
pending = if last_batch_terminated {
Vec::new()
} else {
collect_steering(config).await
};
}
let cap_hit = config.max_iterations.is_some_and(|max| iterations >= max);
let follow_up = if last_batch_terminated {
Vec::new()
} else {
collect_follow_up(config).await
};
if last_turn_stopped_without_tool {
if let Some(budget) = config.empty_outcome_retry_budget {
if empty_outcomes_seen > budget {
emit(
config,
AgentEvent::AgentEnd {
messages: new_messages.clone(),
},
)
.await;
return Err(LoopError::EmptyOutcomeBudgetExhausted {
budget,
observed: empty_outcomes_seen,
});
}
}
}
if !follow_up.is_empty() && !cap_hit {
pending = follow_up;
continue 'outer;
}
if cap_hit {
for msg in follow_up {
emit(
config,
AgentEvent::MessageStart {
message: msg.clone(),
},
)
.await;
emit(
config,
AgentEvent::MessageEnd {
message: msg.clone(),
},
)
.await;
current.messages.push(msg.clone());
new_messages.push(msg);
}
}
break;
}
emit(
config,
AgentEvent::AgentEnd {
messages: new_messages.clone(),
},
)
.await;
let cap_hit_final = config.max_iterations.is_some_and(|max| iterations >= max);
let wrap_up_fired = config
.grace_signal
.as_ref()
.is_some_and(|flag| flag.load(std::sync::atomic::Ordering::Relaxed));
let outcome = if cap_hit_final {
LoopOutcome::HitMaxIterations
} else if wrap_up_fired {
LoopOutcome::WrappedUp
} else {
LoopOutcome::Done
};
Ok(outcome)
}
async fn collect_steering(config: &LoopConfig) -> Vec<AgentMessage> {
let mut out = Vec::new();
for source in &config.plugins.steering {
out.extend(source.next_steering_messages().await);
}
out
}
async fn collect_follow_up(config: &LoopConfig) -> Vec<AgentMessage> {
let mut out = Vec::new();
for source in &config.plugins.follow_up {
out.extend(source.next_follow_up_messages().await);
}
out
}
fn synthesize_plain_text_terminal_result(
assistant: &AgentMessage,
turn_allowlist: Option<&std::collections::HashSet<String>>,
tool_name: &str,
eager: bool,
terminal_tool_names: &std::collections::HashSet<String>,
) -> Option<AgentMessage> {
if !eager && !is_terminal_only_allowlist(turn_allowlist, tool_name, terminal_tool_names) {
return None;
}
let text = plain_assistant_text(assistant)?;
Some(AgentMessage::ToolResult {
tool_call_id: format!("plain_text_terminal_fallback_{}", now_ms()),
tool_name: tool_name.to_string(),
content: ToolResultContent::text(text),
is_error: false,
narration: Some(
"Converted plain assistant text into terminal delivery for an auto-tool-choice provider."
.to_string(),
),
details: None,
timestamp: Some(now_ms()),
})
}
fn plain_assistant_text(assistant: &AgentMessage) -> Option<String> {
let AgentMessage::Assistant { content, .. } = assistant else {
return None;
};
let text = crate::strip_thinking_tags(&content.plain_text())
.trim()
.to_string();
(!text.is_empty()).then_some(text)
}
fn should_preserve_plain_text_terminal_candidate(text: &str) -> bool {
!looks_like_permission_or_clarification_question(text)
}
fn looks_like_permission_or_clarification_question(text: &str) -> bool {
let trimmed = text.trim();
if !trimmed.contains('?') {
return false;
}
let lower = trimmed.to_ascii_lowercase();
let starts_with_prompt = [
"would you like",
"shall i",
"should i",
"do you want",
"what would you like",
"what do you need",
"what's your next move",
"what is your next move",
"continue what",
]
.iter()
.any(|prefix| lower.starts_with(prefix));
starts_with_prompt
|| (trimmed.len() <= 500
&& lower.contains("what")
&& (lower.contains("next") || lower.contains("continue")))
}
fn is_terminal_only_allowlist(
turn_allowlist: Option<&std::collections::HashSet<String>>,
terminal_tool: &str,
terminal_tool_names: &std::collections::HashSet<String>,
) -> bool {
let Some(allowlist) = turn_allowlist else {
return false;
};
!allowlist.is_empty()
&& allowlist.contains(terminal_tool)
&& allowlist
.iter()
.all(|tool| tool == terminal_tool || terminal_tool_names.contains(tool))
}
async fn stream_with_max_tokens_recovery(
context: &AgentContext,
config: &LoopConfig,
signal: &CancellationToken,
iteration: usize,
) -> Result<(AgentMessage, Option<std::collections::HashSet<String>>), LoopError> {
let mut current_cap = config.max_output_tokens;
let mut max_tokens_attempt: u8 = 0;
let mut empty_stream_attempts: u8 = 0;
let mut zero_output_transport_attempts: u8 = 0;
let mut zero_output_recovery_context: Option<AgentContext> = None;
let mut reasoning = config.reasoning;
loop {
let attempt_context = zero_output_recovery_context.as_ref().unwrap_or(context);
let (assistant, allowlist) = match stream_assistant_response(
attempt_context,
config,
signal,
iteration,
current_cap,
reasoning,
)
.await
{
Ok(pair) => pair,
Err(LoopError::Stream(StreamError::Empty))
if empty_stream_attempts + 1 < EMPTY_STREAM_MAX_ATTEMPTS =>
{
empty_stream_attempts = empty_stream_attempts.saturating_add(1);
let delay = EMPTY_STREAM_RETRY_INITIAL_DELAY * u32::from(empty_stream_attempts);
tokio::select! {
_ = signal.cancelled() => return Err(LoopError::Aborted),
_ = tokio::time::sleep(delay) => {}
}
continue;
}
Err(LoopError::Stream(StreamError::ZeroOutputTransport(_)))
if zero_output_transport_attempts + 1 < ZERO_OUTPUT_TRANSPORT_MAX_ATTEMPTS =>
{
zero_output_transport_attempts = zero_output_transport_attempts.saturating_add(1);
zero_output_recovery_context =
Some(context_with_zero_output_transport_recovery(context));
reasoning = zero_output_transport_retry_reasoning(config.reasoning);
let delay = ZERO_OUTPUT_TRANSPORT_RETRY_INITIAL_DELAY
* u32::from(zero_output_transport_attempts);
tokio::select! {
_ = signal.cancelled() => return Err(LoopError::Aborted),
_ = tokio::time::sleep(delay) => {}
}
continue;
}
Err(err) => return Err(err),
};
let stop_reason = match &assistant {
AgentMessage::Assistant { stop_reason, .. } => *stop_reason,
_ => StopReason::Other,
};
if stop_reason != StopReason::MaxTokens {
return Ok((assistant, allowlist));
}
let Some(recovery) = config.max_output_tokens_recovery.as_ref() else {
return Ok((assistant, allowlist));
};
if max_tokens_attempt >= recovery.max_attempts {
return Ok((assistant, allowlist));
}
let Some(prev_cap) = current_cap else {
return Ok((assistant, allowlist));
};
let Some(new_cap) = recovery.next_cap(prev_cap, max_tokens_attempt) else {
return Ok((assistant, allowlist));
};
max_tokens_attempt = max_tokens_attempt.saturating_add(1);
emit(
config,
AgentEvent::OutputTokensEscalation {
attempt: max_tokens_attempt,
prev_cap,
new_cap,
},
)
.await;
current_cap = Some(new_cap);
}
}
async fn stream_assistant_response(
context: &AgentContext,
config: &LoopConfig,
signal: &CancellationToken,
iteration: usize,
max_output_tokens: Option<u32>,
reasoning: ReasoningEffort,
) -> Result<(AgentMessage, Option<std::collections::HashSet<String>>), LoopError> {
let last_provider_usage = last_provider_usage(&context.messages);
let cx = TransformContext {
signal,
model_id: config.model_id.as_deref().unwrap_or(""),
iteration,
last_provider_usage: last_provider_usage.as_ref(),
estimator: &*config.token_estimator,
};
let mut messages = context.messages.clone();
for transform in &config.plugins.context_transform {
if !transform.should_run(&messages, &cx) {
continue;
}
let before = messages.clone();
messages = transform.transform(messages, &cx).await;
emit(
config,
AgentEvent::ContextTransformApplied {
iteration,
plugin: transform.name(),
before,
after: messages.clone(),
},
)
.await;
}
let allowlist = collect_tool_allowlist_with_events(config, iteration, &messages).await;
let tools = build_tool_schemas(config, allowlist.as_ref());
emit(
config,
AgentEvent::ProviderRequestPrepared {
iteration,
model_id: config.model_id.clone(),
system_prompt: context.system_prompt.clone(),
messages: messages.clone(),
tools: tools.clone(),
temperature: config.temperature,
max_output_tokens,
},
)
.await;
let request = StreamRequest {
system_prompt: context.system_prompt.clone(),
messages,
tools,
temperature: config.temperature,
max_output_tokens,
reasoning,
provider_extras: config
.provider_extras
.clone()
.unwrap_or(serde_json::Value::Null),
force_tool_call: true,
};
let mut stream = config.stream.stream(request, signal.clone()).await;
let mut last_partial: Option<AgentMessage> = None;
while let Some(event) = stream.next().await {
match event {
StreamEvent::Start { partial } => {
emit(
config,
AgentEvent::MessageStart {
message: partial.clone(),
},
)
.await;
last_partial = Some(partial);
}
StreamEvent::Chunk(chunk) => {
if let Some(ref partial) = last_partial {
emit(
config,
AgentEvent::MessageUpdate {
partial: partial.clone(),
chunk,
},
)
.await;
}
}
StreamEvent::Done { message } => {
emit(
config,
AgentEvent::MessageEnd {
message: message.clone(),
},
)
.await;
return Ok((message, allowlist));
}
StreamEvent::Error {
partial,
kind,
message,
} => {
let stop_reason = match kind {
StreamErrorKind::Aborted => StopReason::Aborted,
_ => StopReason::Error,
};
let error_message = AgentMessage::Assistant {
content: match &partial {
AgentMessage::Assistant { content, .. } => content.clone(),
_ => AssistantContent { blocks: Vec::new() },
},
stop_reason,
error_message: Some(message.clone()),
timestamp: Some(now_ms()),
usage: None,
};
emit(
config,
AgentEvent::MessageEnd {
message: error_message.clone(),
},
)
.await;
return Err(loop_error_from_stream_kind(kind, message));
}
}
}
let empty = AgentMessage::Assistant {
content: AssistantContent { blocks: Vec::new() },
stop_reason: StopReason::Error,
error_message: Some("stream ended without terminal event".into()),
timestamp: Some(now_ms()),
usage: None,
};
emit(
config,
AgentEvent::MessageEnd {
message: empty.clone(),
},
)
.await;
Err(LoopError::Stream(StreamError::Empty))
}
fn context_with_zero_output_transport_recovery(context: &AgentContext) -> AgentContext {
let mut recovered = context.clone();
recovered.messages.push(AgentMessage::System {
content: ZERO_OUTPUT_TRANSPORT_RECOVERY_CONTEXT.to_string(),
timestamp: Some(now_ms()),
});
recovered
}
fn zero_output_transport_retry_reasoning(reasoning: ReasoningEffort) -> ReasoningEffort {
match reasoning {
ReasoningEffort::Medium | ReasoningEffort::High | ReasoningEffort::XHigh => {
ReasoningEffort::Minimal
}
ReasoningEffort::None | ReasoningEffort::Minimal | ReasoningEffort::Low => reasoning,
}
}
fn loop_error_from_stream_kind(kind: StreamErrorKind, message: String) -> LoopError {
match kind {
StreamErrorKind::Transient => LoopError::Stream(StreamError::Transient(message)),
StreamErrorKind::ProviderRateLimited => {
LoopError::Stream(StreamError::ProviderRateLimited(message))
}
StreamErrorKind::ZeroOutputTransport => {
LoopError::Stream(StreamError::ZeroOutputTransport(message))
}
StreamErrorKind::Fatal => LoopError::Stream(StreamError::Fatal(message)),
StreamErrorKind::Empty => LoopError::Stream(StreamError::Empty),
StreamErrorKind::Aborted => LoopError::Aborted,
StreamErrorKind::ContextOverflow => {
LoopError::Stream(StreamError::ContextOverflow(message))
}
}
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn last_provider_usage(messages: &[AgentMessage]) -> Option<Usage> {
messages.iter().rev().find_map(|message| match message {
AgentMessage::Assistant {
usage: Some(usage), ..
} => Some(usage.clone()),
_ => None,
})
}
fn build_tool_schemas(
config: &LoopConfig,
allowlist: Option<&std::collections::HashSet<String>>,
) -> Vec<ToolSchema> {
config
.tools
.iter()
.filter(|tool| match allowlist {
Some(set) => set.contains(tool.name()),
None => true,
})
.map(|tool| ToolSchema {
name: tool.name().to_string(),
description: tool.description().to_string(),
parameters: tool.parameters_schema(),
})
.collect()
}
async fn collect_tool_allowlist_with_events(
config: &LoopConfig,
iteration: usize,
messages: &[AgentMessage],
) -> Option<std::collections::HashSet<String>> {
if config.plugins.tool_gate.is_empty() {
return None;
}
let conversation_id = config.conversation_id.as_deref();
let available_tool_names: Vec<&str> = config.tools.iter().map(|t| t.name()).collect();
let mut decisions: Vec<GateAllowDecision> = Vec::new();
for gate in &config.plugins.tool_gate {
let ctx = crate::plugin::ToolGateContext {
iteration,
messages,
conversation_id,
available_tool_names: &available_tool_names,
};
let decision = gate.next_turn_tool_allowlist(ctx).await;
emit(
config,
AgentEvent::ToolGateApplied {
iteration,
plugin: gate.name(),
allow: decision.as_ref().map(|set| {
let mut sorted: Vec<String> = set.iter().cloned().collect();
sorted.sort();
sorted
}),
},
)
.await;
if let Some(set) = decision {
let suppresses_advisory =
gate.suppresses_advisory_gates(crate::plugin::ToolGateContext {
iteration,
messages,
conversation_id,
available_tool_names: &available_tool_names,
});
decisions.push(GateAllowDecision {
plugin: gate.name(),
priority: gate.conflict_priority(),
class: gate.tool_gate_class(),
suppresses_advisory,
allow: set,
});
}
}
let suppression_priority = decisions
.iter()
.filter(|decision| decision.suppresses_advisory)
.map(|decision| decision.priority)
.max();
let active_decisions = decisions
.iter()
.filter(|decision| {
!matches!(
suppression_priority,
Some(priority)
if decision.class == crate::plugin::ToolGateClass::Advisory
&& decision.priority < priority
)
})
.collect::<Vec<_>>();
let mut combined: Option<std::collections::HashSet<String>> = None;
for decision in &active_decisions {
combined = Some(match combined {
Some(prev) => prev.intersection(&decision.allow).cloned().collect(),
None => decision.allow.clone(),
});
}
if combined.as_ref().is_some_and(|allow| allow.is_empty()) {
let non_empty_decisions = active_decisions
.iter()
.filter(|decision| !decision.allow.is_empty())
.map(|decision| (decision.plugin, decision.priority, decision.allow.clone()))
.collect::<Vec<_>>();
let resolved = resolve_empty_tool_gate_intersection(&non_empty_decisions);
let (chosen_plugin, allow, reason) = match resolved {
Some((plugin, allow, reason)) => (Some(plugin.to_string()), allow, reason),
None => (
None,
std::collections::HashSet::new(),
"all gating plugins returned empty allowlists".to_string(),
),
};
let sorted_allow = sorted_tool_names(&allow);
emit(
config,
AgentEvent::ToolGateConflictResolved {
iteration,
plugins: active_decisions
.iter()
.map(|decision| decision.plugin.to_string())
.collect(),
chosen_plugin,
allow: sorted_allow,
reason,
},
)
.await;
return if allow.is_empty() { None } else { Some(allow) };
}
combined
}
struct GateAllowDecision {
plugin: &'static str,
priority: i32,
class: crate::plugin::ToolGateClass,
suppresses_advisory: bool,
allow: std::collections::HashSet<String>,
}
fn resolve_empty_tool_gate_intersection(
decisions: &[(&'static str, i32, std::collections::HashSet<String>)],
) -> Option<(&'static str, std::collections::HashSet<String>, String)> {
decisions
.iter()
.max_by(|(left_plugin, left_priority, left), (right_plugin, right_priority, right)| {
left_priority
.cmp(right_priority)
.then_with(|| right.len().cmp(&left.len()))
.then_with(|| right_plugin.cmp(left_plugin))
})
.map(|(plugin, priority, allow)| {
(
*plugin,
allow.clone(),
format!(
"empty intersection repaired by highest-priority owner `{plugin}` (priority {priority})"
),
)
})
}
fn sorted_tool_names(set: &std::collections::HashSet<String>) -> Vec<String> {
let mut sorted: Vec<String> = set.iter().cloned().collect();
sorted.sort();
sorted
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::AgentBuilder;
use crate::plugin::{
FollowUpSource, Plugin, PluginCapabilities, ToolGate, ToolGateClass, ToolGateContext,
};
use crate::stream::{ReasoningEffort, StreamFn};
use crate::types::{AssistantBlock, UserContent};
use futures::stream::{self, BoxStream};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
};
fn empty_assistant_message() -> AgentMessage {
AgentMessage::Assistant {
content: AssistantContent { blocks: Vec::new() },
stop_reason: StopReason::Other,
error_message: None,
timestamp: None,
usage: None,
}
}
fn text_assistant_message(text: impl Into<String>) -> AgentMessage {
AgentMessage::Assistant {
content: AssistantContent::text(text),
stop_reason: StopReason::EndTurn,
error_message: None,
timestamp: None,
usage: None,
}
}
fn tool_call_assistant_message(name: impl Into<String>, id: impl Into<String>) -> AgentMessage {
AgentMessage::Assistant {
content: AssistantContent::with_tool_calls(
None,
vec![crate::tool::ToolCall {
id: id.into(),
name: name.into(),
arguments: serde_json::json!({}),
}],
),
stop_reason: StopReason::ToolUse,
error_message: None,
timestamp: None,
usage: None,
}
}
#[derive(Default)]
struct EmptyThenTextStream {
calls: AtomicUsize,
}
#[derive(Default)]
struct ZeroOutputThenTextStream {
calls: AtomicUsize,
requests: Mutex<Vec<StreamRequest>>,
}
impl ZeroOutputThenTextStream {
fn requests(&self) -> Vec<StreamRequest> {
self.requests.lock().unwrap().clone()
}
}
#[derive(Default)]
struct RepeatedTextStream {
calls: AtomicUsize,
}
#[derive(Default)]
struct EmptyStopsAroundProgressStream {
calls: AtomicUsize,
}
struct CountingFollowUp {
remaining: AtomicUsize,
}
struct TerminalOnlyGate;
struct TerminalWithStatusGate;
struct TestTerminalPolicy;
impl crate::protocol::ProtocolPolicy for TestTerminalPolicy {
fn terminal_tool_names(&self) -> std::collections::HashSet<String> {
[
"message_info",
"message_ask",
"message_result",
"terminator",
]
.iter()
.map(|s| s.to_string())
.collect()
}
}
struct StaticAllowGate {
name: &'static str,
tools: &'static [&'static str],
priority: i32,
class: ToolGateClass,
suppresses_advisory: bool,
}
impl Plugin for TerminalOnlyGate {
fn name(&self) -> &'static str {
"terminal_only_gate"
}
fn capabilities(&self) -> PluginCapabilities {
PluginCapabilities::tool_gate()
}
}
#[async_trait::async_trait]
impl ToolGate for TerminalOnlyGate {
async fn next_turn_tool_allowlist(
&self,
_ctx: ToolGateContext<'_>,
) -> Option<std::collections::HashSet<String>> {
Some(["message_result".to_string()].into_iter().collect())
}
}
impl Plugin for TerminalWithStatusGate {
fn name(&self) -> &'static str {
"terminal_with_status_gate"
}
fn capabilities(&self) -> PluginCapabilities {
PluginCapabilities::tool_gate()
}
}
#[async_trait::async_trait]
impl ToolGate for TerminalWithStatusGate {
async fn next_turn_tool_allowlist(
&self,
_ctx: ToolGateContext<'_>,
) -> Option<std::collections::HashSet<String>> {
Some(
["message_info".to_string(), "message_result".to_string()]
.into_iter()
.collect(),
)
}
}
impl Plugin for StaticAllowGate {
fn name(&self) -> &'static str {
self.name
}
fn capabilities(&self) -> PluginCapabilities {
PluginCapabilities::tool_gate()
}
}
#[async_trait::async_trait]
impl ToolGate for StaticAllowGate {
fn conflict_priority(&self) -> i32 {
self.priority
}
fn tool_gate_class(&self) -> ToolGateClass {
self.class
}
fn suppresses_advisory_gates(&self, _ctx: ToolGateContext<'_>) -> bool {
self.suppresses_advisory
}
async fn next_turn_tool_allowlist(
&self,
_ctx: ToolGateContext<'_>,
) -> Option<std::collections::HashSet<String>> {
Some(self.tools.iter().map(|name| (*name).to_string()).collect())
}
}
impl CountingFollowUp {
fn new(remaining: usize) -> Self {
Self {
remaining: AtomicUsize::new(remaining),
}
}
}
impl Plugin for CountingFollowUp {
fn name(&self) -> &'static str {
"counting_follow_up"
}
fn capabilities(&self) -> PluginCapabilities {
PluginCapabilities::follow_up()
}
}
#[async_trait::async_trait]
impl FollowUpSource for CountingFollowUp {
async fn next_follow_up_messages(&self) -> Vec<AgentMessage> {
let used = self
.remaining
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |remaining| {
remaining.checked_sub(1)
})
.unwrap_or(0);
if used == 0 {
return Vec::new();
}
vec![AgentMessage::System {
content: "retry after no-tool stop".into(),
timestamp: None,
}]
}
}
#[async_trait::async_trait]
impl StreamFn for EmptyThenTextStream {
async fn stream(
&self,
_request: StreamRequest,
_signal: CancellationToken,
) -> BoxStream<'static, StreamEvent> {
let call = self.calls.fetch_add(1, Ordering::SeqCst);
let partial = empty_assistant_message();
if call == 0 {
return Box::pin(stream::iter(vec![
StreamEvent::Start {
partial: partial.clone(),
},
StreamEvent::Error {
partial,
kind: StreamErrorKind::Empty,
message: "empty provider response".to_string(),
},
]));
}
Box::pin(stream::iter(vec![
StreamEvent::Start { partial },
StreamEvent::Done {
message: text_assistant_message("recovered"),
},
]))
}
}
#[async_trait::async_trait]
impl StreamFn for RepeatedTextStream {
async fn stream(
&self,
_request: StreamRequest,
_signal: CancellationToken,
) -> BoxStream<'static, StreamEvent> {
let call = self.calls.fetch_add(1, Ordering::SeqCst);
let partial = empty_assistant_message();
Box::pin(stream::iter(vec![
StreamEvent::Start { partial },
StreamEvent::Done {
message: text_assistant_message(format!("plain stop {call}")),
},
]))
}
}
#[async_trait::async_trait]
impl StreamFn for EmptyStopsAroundProgressStream {
async fn stream(
&self,
_request: StreamRequest,
_signal: CancellationToken,
) -> BoxStream<'static, StreamEvent> {
let call = self.calls.fetch_add(1, Ordering::SeqCst);
let partial = empty_assistant_message();
let message = match call {
0 | 2 | 4 => text_assistant_message(format!("plain stop {call}")),
1 | 3 => tool_call_assistant_message("progress", format!("tc-progress-{call}")),
5 => tool_call_assistant_message("terminator", "tc-terminator"),
other => panic!("unexpected stream call after terminal turn: {other}"),
};
Box::pin(stream::iter(vec![
StreamEvent::Start { partial },
StreamEvent::Done { message },
]))
}
}
#[async_trait::async_trait]
impl StreamFn for ZeroOutputThenTextStream {
async fn stream(
&self,
request: StreamRequest,
_signal: CancellationToken,
) -> BoxStream<'static, StreamEvent> {
self.requests.lock().unwrap().push(request);
let call = self.calls.fetch_add(1, Ordering::SeqCst);
let partial = empty_assistant_message();
if call == 0 {
return Box::pin(stream::iter(vec![
StreamEvent::Start {
partial: partial.clone(),
},
StreamEvent::Error {
partial,
kind: StreamErrorKind::ZeroOutputTransport,
message: "response body decode failed before output".to_string(),
},
]));
}
Box::pin(stream::iter(vec![
StreamEvent::Start { partial },
StreamEvent::Done {
message: text_assistant_message("recovered from transport"),
},
]))
}
}
#[test]
fn wrapped_up_is_complete() {
assert!(LoopOutcome::Done.is_complete());
assert!(LoopOutcome::WrappedUp.is_complete());
assert!(!LoopOutcome::HitMaxIterations.is_complete());
}
#[tokio::test]
async fn empty_stream_response_is_retried_before_returning() {
let stream = Arc::new(EmptyThenTextStream::default());
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("test-model")
.build()
.expect("config builds");
let context = AgentContext::new("system").with_messages(vec![AgentMessage::User {
content: UserContent::Text("continue".to_string()),
timestamp: None,
}]);
let (assistant, _allowlist) =
stream_with_max_tokens_recovery(&context, &config, &CancellationToken::new(), 0)
.await
.expect("second stream attempt should recover");
let AgentMessage::Assistant { content, .. } = assistant else {
panic!("expected assistant response");
};
assert_eq!(content.plain_text(), "recovered");
assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn zero_output_transport_error_is_retried_before_returning() {
let stream = Arc::new(ZeroOutputThenTextStream::default());
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("test-model")
.reasoning(ReasoningEffort::High)
.build()
.expect("config builds");
let context = AgentContext::new("system").with_messages(vec![AgentMessage::User {
content: UserContent::Text("continue".to_string()),
timestamp: None,
}]);
let (assistant, _allowlist) =
stream_with_max_tokens_recovery(&context, &config, &CancellationToken::new(), 0)
.await
.expect("second zero-output transport attempt should recover");
let AgentMessage::Assistant { content, .. } = assistant else {
panic!("expected assistant response");
};
assert_eq!(content.plain_text(), "recovered from transport");
assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
let requests = stream.requests();
assert_eq!(requests.len(), 2);
assert_eq!(requests[0].reasoning, ReasoningEffort::High);
assert_eq!(
requests[1].reasoning,
ReasoningEffort::Minimal,
"zero-output replay should lower high reasoning so reasoning-heavy private-only spins can produce a tool call"
);
assert!(
requests[1].messages.iter().any(|message| matches!(
message,
AgentMessage::System { content, .. }
if content.contains("transport recovery")
&& content.contains("no visible assistant text")
&& content.contains("no usable tool call")
&& content.contains("unusable burst of partial tool calls")
&& content.contains("exactly one next structured tool call")
&& content.contains("next structured tool call")
)),
"zero-output replay must carry explicit recovery context"
);
}
struct TerminatorOnlyStream {
calls: AtomicUsize,
}
impl Default for TerminatorOnlyStream {
fn default() -> Self {
Self {
calls: AtomicUsize::new(0),
}
}
}
#[async_trait::async_trait]
impl StreamFn for TerminatorOnlyStream {
async fn stream(
&self,
_request: StreamRequest,
_signal: CancellationToken,
) -> BoxStream<'static, StreamEvent> {
let call = self.calls.fetch_add(1, Ordering::SeqCst);
assert_eq!(
call, 0,
"terminate-on-turn-1 test must NOT re-enter the LLM after a successful terminator"
);
let partial = empty_assistant_message();
let assistant = AgentMessage::Assistant {
content: AssistantContent {
blocks: vec![AssistantBlock::ToolCall(crate::tool::ToolCall {
id: "tc-terminator-1".into(),
name: "terminator".into(),
arguments: serde_json::json!({}),
})],
},
stop_reason: StopReason::ToolUse,
error_message: None,
timestamp: None,
usage: None,
};
Box::pin(stream::iter(vec![
StreamEvent::Start { partial },
StreamEvent::Done { message: assistant },
]))
}
}
struct TerminatorTool;
#[async_trait::async_trait]
impl crate::tool::AgentTool for TerminatorTool {
fn name(&self) -> &str {
"terminator"
}
fn description(&self) -> &str {
"test terminator"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({"type": "object"})
}
async fn execute(
&self,
_call_id: &str,
_args: serde_json::Value,
_signal: CancellationToken,
_update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
Ok(crate::tool::ToolResult {
content: vec![crate::types::ToolResultBlock::Text(
crate::types::TextContent {
text: "delivered".into(),
},
)],
is_error: false,
details: serde_json::Value::Null,
terminate: true,
narration: None,
})
}
}
struct ProgressTool;
#[async_trait::async_trait]
impl crate::tool::AgentTool for ProgressTool {
fn name(&self) -> &str {
"progress"
}
fn description(&self) -> &str {
"test progress tool"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({"type": "object"})
}
async fn execute(
&self,
_call_id: &str,
_args: serde_json::Value,
_signal: CancellationToken,
_update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
Ok(crate::tool::ToolResult::text("made progress"))
}
}
struct AlwaysSteer {
polls: Arc<AtomicUsize>,
}
impl Plugin for AlwaysSteer {
fn name(&self) -> &'static str {
"always_steer"
}
fn capabilities(&self) -> PluginCapabilities {
PluginCapabilities {
steering: true,
..PluginCapabilities::default()
}
}
}
#[async_trait::async_trait]
impl crate::plugin::SteeringSource for AlwaysSteer {
async fn next_steering_messages(&self) -> Vec<AgentMessage> {
self.polls.fetch_add(1, Ordering::SeqCst);
vec![AgentMessage::System {
content: "wrap up now".into(),
timestamp: None,
}]
}
}
#[tokio::test]
async fn terminator_vote_skips_post_batch_steering_collection() {
let stream = Arc::new(TerminatorOnlyStream::default());
let polls = Arc::new(AtomicUsize::new(0));
let mut tool_registry = crate::tool::ToolRegistry::new();
tool_registry = tool_registry.with(Arc::new(TerminatorTool));
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("test-model")
.tools(tool_registry)
.steering(AlwaysSteer {
polls: polls.clone(),
})
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("deliver".to_string()),
timestamp: None,
}];
let result = run(prompts, context, &config, CancellationToken::new())
.await
.expect("run completes after one terminator turn");
assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
assert_eq!(result.outcome, LoopOutcome::Done);
assert_eq!(
polls.load(Ordering::SeqCst),
1,
"steering source polled more than once — terminator vote did not gate post-batch re-entry"
);
}
struct AlwaysFollowUp {
polls: Arc<AtomicUsize>,
}
impl Plugin for AlwaysFollowUp {
fn name(&self) -> &'static str {
"always_follow_up"
}
fn capabilities(&self) -> PluginCapabilities {
PluginCapabilities::follow_up()
}
}
#[async_trait::async_trait]
impl FollowUpSource for AlwaysFollowUp {
async fn next_follow_up_messages(&self) -> Vec<AgentMessage> {
self.polls.fetch_add(1, Ordering::SeqCst);
vec![AgentMessage::System {
content: "deliver something".into(),
timestamp: None,
}]
}
}
#[tokio::test]
async fn terminator_vote_skips_post_batch_follow_up_collection() {
let stream = Arc::new(TerminatorOnlyStream::default());
let polls = Arc::new(AtomicUsize::new(0));
let mut tool_registry = crate::tool::ToolRegistry::new();
tool_registry = tool_registry.with(Arc::new(TerminatorTool));
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("test-model")
.tools(tool_registry)
.follow_up(AlwaysFollowUp {
polls: polls.clone(),
})
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("deliver".to_string()),
timestamp: None,
}];
let result = run(prompts, context, &config, CancellationToken::new())
.await
.expect("run completes after one terminator turn");
assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
assert_eq!(result.outcome, LoopOutcome::Done);
assert_eq!(
polls.load(Ordering::SeqCst),
0,
"follow-up source polled after a terminator vote — terminator did not gate post-batch re-entry"
);
}
#[tokio::test]
async fn exhausted_empty_outcome_budget_returns_typed_loop_error() {
let stream = Arc::new(RepeatedTextStream::default());
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("test-model")
.empty_outcome_retry_budget(1)
.follow_up(CountingFollowUp::new(1))
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("continue".to_string()),
timestamp: None,
}];
let err = run(prompts, context, &config, CancellationToken::new())
.await
.expect_err("second no-tool stop should exhaust the budget");
assert!(
matches!(
err,
LoopError::EmptyOutcomeBudgetExhausted {
budget: 1,
observed: 2,
}
),
"unexpected error: {err:?}"
);
assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn empty_tool_gate_intersection_prefers_delivery_repair_owner() {
let (sink, mut rx) = crate::event::ChannelSink::new();
let config = AgentBuilder::new()
.stream(Arc::new(RepeatedTextStream::default()))
.event_sink(Arc::new(sink))
.tool_gate_arc(Arc::new(StaticAllowGate {
name: "delivery_repair_gate",
tools: &["browser_interact"],
priority: 100,
class: ToolGateClass::Required,
suppresses_advisory: false,
}))
.tool_gate_arc(Arc::new(StaticAllowGate {
name: "terminal_message_guard",
tools: &["message_result"],
priority: 10,
class: ToolGateClass::Required,
suppresses_advisory: false,
}))
.build()
.expect("config builds");
let allow = collect_tool_allowlist_with_events(&config, 3, &[])
.await
.expect("conflict repair should keep a non-empty allowlist");
assert_eq!(
allow,
["browser_interact".to_string()].into_iter().collect()
);
let mut saw_conflict = false;
while let Ok(event) = rx.try_recv() {
if let AgentEvent::ToolGateConflictResolved {
chosen_plugin,
allow,
..
} = event
{
saw_conflict = true;
assert_eq!(chosen_plugin.as_deref(), Some("delivery_repair_gate"));
assert_eq!(allow, vec!["browser_interact".to_string()]);
}
}
assert!(saw_conflict, "tool-gate deadlock should be diagnosable");
}
#[tokio::test]
async fn repair_owner_suppresses_advisory_gate_before_plan_only_intersection() {
let config = AgentBuilder::new()
.stream(Arc::new(RepeatedTextStream::default()))
.tool_gate_arc(Arc::new(StaticAllowGate {
name: "delivery_repair_gate",
tools: &["plan", "file_write"],
priority: 100,
class: ToolGateClass::Required,
suppresses_advisory: true,
}))
.tool_gate_arc(Arc::new(StaticAllowGate {
name: "wrap_up_gate",
tools: &["plan", "message_result", "message_ask"],
priority: 0,
class: ToolGateClass::Advisory,
suppresses_advisory: false,
}))
.build()
.expect("config builds");
let allow = collect_tool_allowlist_with_events(&config, 3, &[])
.await
.expect("repair owner should keep its own allowlist");
assert_eq!(
allow,
["plan".to_string(), "file_write".to_string()]
.into_iter()
.collect()
);
}
#[tokio::test]
async fn productive_tool_batch_resets_empty_outcome_budget() {
let stream = Arc::new(EmptyStopsAroundProgressStream::default());
let mut tool_registry = crate::tool::ToolRegistry::new();
tool_registry = tool_registry
.with(Arc::new(ProgressTool))
.with(Arc::new(TerminatorTool));
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("test-model")
.tools(tool_registry)
.empty_outcome_retry_budget(1)
.follow_up(CountingFollowUp::new(3))
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("continue".to_string()),
timestamp: None,
}];
let result = run(prompts, context, &config, CancellationToken::new())
.await
.expect("productive tool batches should reset the empty-outcome budget");
assert_eq!(result.outcome, LoopOutcome::Done);
assert_eq!(stream.calls.load(Ordering::SeqCst), 6);
}
#[tokio::test]
async fn terminal_only_plain_text_fallback_synthesizes_terminal_result() {
let stream = Arc::new(RepeatedTextStream::default());
let mut tool_registry = crate::tool::ToolRegistry::new();
tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("auto-tool-provider")
.tools(tool_registry)
.tool_gate_arc(Arc::new(TerminalOnlyGate))
.plain_text_terminal_fallback_tool("message_result")
.empty_outcome_retry_budget(0)
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("answer directly".to_string()),
timestamp: None,
}];
let result = run(prompts, context, &config, CancellationToken::new())
.await
.expect("plain text should be converted on terminal-only turn");
assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
assert_eq!(result.outcome, LoopOutcome::Done);
assert!(result.messages.iter().any(|message| matches!(
message,
AgentMessage::ToolResult {
tool_name,
content,
is_error: false,
..
} if tool_name == "message_result"
&& content.plain_text() == "plain stop 0"
)));
}
#[tokio::test]
async fn eager_plain_text_fallback_fires_without_terminal_only_allowlist() {
let stream = Arc::new(RepeatedTextStream::default());
let mut tool_registry = crate::tool::ToolRegistry::new();
tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("auto-tool-provider-eager")
.tools(tool_registry)
.plain_text_terminal_fallback_tool("message_result")
.plain_text_terminal_fallback_eager(true)
.empty_outcome_retry_budget(0)
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("answer directly".to_string()),
timestamp: None,
}];
let result = run(prompts, context, &config, CancellationToken::new())
.await
.expect("eager fallback should convert plain text on first stop");
assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
assert_eq!(result.outcome, LoopOutcome::Done);
assert!(result.messages.iter().any(|message| matches!(
message,
AgentMessage::ToolResult {
tool_name,
content,
is_error: false,
..
} if tool_name == "message_result"
&& content.plain_text() == "plain stop 0"
)));
}
#[tokio::test]
async fn eager_nudge_mode_injects_protocol_recovery_before_synthesizing() {
let stream = Arc::new(RepeatedTextStream::default());
let mut tool_registry = crate::tool::ToolRegistry::new();
tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("auto-tool-provider-eager-nudge")
.tools(tool_registry)
.plain_text_terminal_fallback_tool("message_result")
.plain_text_terminal_fallback_eager(true)
.plain_text_terminal_fallback_eager_nudge(true)
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("answer directly".to_string()),
timestamp: None,
}];
let result = run(prompts, context, &config, CancellationToken::new())
.await
.expect("nudge mode should eventually synthesize after retries");
assert_eq!(stream.calls.load(Ordering::SeqCst), 3);
assert_eq!(result.outcome, LoopOutcome::Done);
let nudge_count = result
.messages
.iter()
.filter(|m| matches!(m, AgentMessage::System { content, .. } if content == crate::protocol::DEFAULT_PLAIN_TEXT_RECOVERY_PROMPT))
.count();
assert_eq!(
nudge_count, 2,
"expected two protocol-recovery system messages in the run output, got {nudge_count}",
);
let synthesized_text = result
.messages
.iter()
.find_map(|message| match message {
AgentMessage::ToolResult {
tool_name,
content,
is_error: false,
..
} if tool_name == "message_result" => Some(content.plain_text()),
_ => None,
})
.expect("a terminal tool result should be synthesized as last resort");
assert_eq!(
synthesized_text, "plain stop 0",
"synthesizer should deliver the first preserved plain text, not later recovery drift",
);
}
#[test]
fn plain_text_fallback_candidate_skips_obvious_clarifying_questions() {
assert!(!should_preserve_plain_text_terminal_candidate(
"Continue what, exactly? What's your next move?"
));
assert!(!should_preserve_plain_text_terminal_candidate(
"Would you like me to proceed?"
));
assert!(should_preserve_plain_text_terminal_candidate(
"# Machine Learning\n\nMachine learning is the branch of artificial intelligence that studies systems which improve from data."
));
}
#[tokio::test]
async fn non_eager_plain_text_fallback_still_requires_narrowed_allowlist() {
let stream = Arc::new(RepeatedTextStream::default());
let mut tool_registry = crate::tool::ToolRegistry::new();
tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("non-eager-provider")
.tools(tool_registry)
.plain_text_terminal_fallback_tool("message_result")
.empty_outcome_retry_budget(0)
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("answer directly".to_string()),
timestamp: None,
}];
let err = run(prompts, context, &config, CancellationToken::new())
.await
.expect_err("non-eager fallback must not convert without narrowed allowlist");
assert!(
matches!(err, LoopError::EmptyOutcomeBudgetExhausted { .. }),
"unexpected error: {err:?}"
);
}
#[tokio::test]
async fn terminal_plain_text_fallback_allows_status_delivery_gate() {
let stream = Arc::new(RepeatedTextStream::default());
let mut tool_registry = crate::tool::ToolRegistry::new();
tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
let config = AgentBuilder::new()
.stream(stream.clone())
.model_id("auto-tool-provider")
.tools(tool_registry)
.protocol_policy(Arc::new(TestTerminalPolicy))
.tool_gate_arc(Arc::new(TerminalWithStatusGate))
.plain_text_terminal_fallback_tool("message_result")
.empty_outcome_retry_budget(0)
.build()
.expect("config builds");
let context = AgentContext::new("system");
let prompts = vec![AgentMessage::User {
content: UserContent::Text("answer directly".to_string()),
timestamp: None,
}];
let result = run(prompts, context, &config, CancellationToken::new())
.await
.expect(
"plain text should be converted when only status and terminal tools are allowed",
);
assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
assert_eq!(result.outcome, LoopOutcome::Done);
assert!(result.messages.iter().any(|message| matches!(
message,
AgentMessage::ToolResult {
tool_name,
content,
is_error: false,
..
} if tool_name == "message_result"
&& content.plain_text() == "plain stop 0"
)));
}
struct TerminalNamedTool(&'static str);
#[async_trait::async_trait]
impl crate::tool::AgentTool for TerminalNamedTool {
fn name(&self) -> &str {
self.0
}
fn description(&self) -> &str {
"test terminal tool"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({"type": "object"})
}
async fn execute(
&self,
_call_id: &str,
_args: serde_json::Value,
_signal: CancellationToken,
_update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
Ok(crate::tool::ToolResult {
content: vec![crate::types::ToolResultBlock::Text(
crate::types::TextContent {
text: "not used".into(),
},
)],
is_error: false,
details: serde_json::Value::Null,
terminate: true,
narration: None,
})
}
}
}