use chrono::Utc;
use futures::future::BoxFuture;
use futures::StreamExt;
use serde_json::{json, Map, Number, Value};
use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::path::{Path, PathBuf};
use std::time::Duration;
use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
use tandem_providers::{ChatAttachment, ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
use tandem_tools::{validate_tool_schemas, ToolRegistry};
use tandem_types::{
ContextMode, EngineEvent, HostOs, HostRuntimeContext, Message, MessagePart, MessagePartInput,
MessageRole, ModelSpec, PathStyle, PrewriteCoverageMode, PrewriteRequirements,
SendMessageRequest, ShellFamily, ToolMode, ToolSchema,
};
use tandem_wire::WireMessagePart;
use tokio_util::sync::CancellationToken;
use tracing::Level;
mod loop_guards;
#[cfg(test)]
use loop_guards::parse_budget_override;
use loop_guards::{
duplicate_signature_limit_for, tool_budget_for, websearch_duplicate_signature_limit,
};
use crate::tool_router::{
classify_intent, default_mode_name, is_short_simple_prompt, max_tools_per_call_expanded,
select_tool_subset, should_escalate_auto_tools, tool_router_enabled, ToolIntent,
ToolRoutingDecision,
};
use crate::{
any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
PermissionAction, PermissionManager, PluginRegistry, Storage,
};
use tokio::sync::RwLock;
#[derive(Default)]
struct StreamedToolCall {
name: String,
args: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RawToolArgsState {
Present,
Empty,
Unparseable,
}
impl RawToolArgsState {
fn as_str(self) -> &'static str {
match self {
Self::Present => "present",
Self::Empty => "empty",
Self::Unparseable => "unparseable",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WritePathRecoveryMode {
Heuristic,
OutputTargetOnly,
}
#[derive(Debug, Clone)]
pub struct SpawnAgentToolContext {
pub session_id: String,
pub message_id: String,
pub tool_call_id: Option<String>,
pub args: Value,
}
#[derive(Debug, Clone)]
pub struct SpawnAgentToolResult {
pub output: String,
pub metadata: Value,
}
#[derive(Debug, Clone)]
pub struct ToolPolicyContext {
pub session_id: String,
pub message_id: String,
pub tool: String,
pub args: Value,
}
#[derive(Debug, Clone)]
pub struct ToolPolicyDecision {
pub allowed: bool,
pub reason: Option<String>,
}
pub trait SpawnAgentHook: Send + Sync {
fn spawn_agent(
&self,
ctx: SpawnAgentToolContext,
) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>>;
}
pub trait ToolPolicyHook: Send + Sync {
fn evaluate_tool(
&self,
ctx: ToolPolicyContext,
) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>>;
}
#[derive(Debug, Clone)]
pub struct PromptContextHookContext {
pub session_id: String,
pub message_id: String,
pub provider_id: String,
pub model_id: String,
pub iteration: usize,
}
pub trait PromptContextHook: Send + Sync {
fn augment_provider_messages(
&self,
ctx: PromptContextHookContext,
messages: Vec<ChatMessage>,
) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>>;
}
#[derive(Clone)]
pub struct EngineLoop {
storage: std::sync::Arc<Storage>,
event_bus: EventBus,
providers: ProviderRegistry,
plugins: PluginRegistry,
agents: AgentRegistry,
permissions: PermissionManager,
tools: ToolRegistry,
cancellations: CancellationRegistry,
host_runtime_context: HostRuntimeContext,
workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
session_auto_approve_permissions: std::sync::Arc<RwLock<HashMap<String, bool>>>,
spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
}
impl EngineLoop {
#[allow(clippy::too_many_arguments)]
pub fn new(
storage: std::sync::Arc<Storage>,
event_bus: EventBus,
providers: ProviderRegistry,
plugins: PluginRegistry,
agents: AgentRegistry,
permissions: PermissionManager,
tools: ToolRegistry,
cancellations: CancellationRegistry,
host_runtime_context: HostRuntimeContext,
) -> Self {
Self {
storage,
event_bus,
providers,
plugins,
agents,
permissions,
tools,
cancellations,
host_runtime_context,
workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
session_auto_approve_permissions: std::sync::Arc::new(RwLock::new(HashMap::new())),
spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
}
}
pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
*self.spawn_agent_hook.write().await = Some(hook);
}
pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
*self.tool_policy_hook.write().await = Some(hook);
}
pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
*self.prompt_context_hook.write().await = Some(hook);
}
pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
let normalized = allowed_tools
.into_iter()
.map(|tool| normalize_tool_name(&tool))
.filter(|tool| !tool.trim().is_empty())
.collect::<Vec<_>>();
self.session_allowed_tools
.write()
.await
.insert(session_id.to_string(), normalized);
}
pub async fn clear_session_allowed_tools(&self, session_id: &str) {
self.session_allowed_tools.write().await.remove(session_id);
}
pub async fn set_session_auto_approve_permissions(&self, session_id: &str, enabled: bool) {
if enabled {
self.session_auto_approve_permissions
.write()
.await
.insert(session_id.to_string(), true);
} else {
self.session_auto_approve_permissions
.write()
.await
.remove(session_id);
}
}
pub async fn clear_session_auto_approve_permissions(&self, session_id: &str) {
self.session_auto_approve_permissions
.write()
.await
.remove(session_id);
}
pub async fn grant_workspace_override_for_session(
&self,
session_id: &str,
ttl_seconds: u64,
) -> u64 {
let expires_at = chrono::Utc::now()
.timestamp_millis()
.max(0)
.saturating_add((ttl_seconds as i64).saturating_mul(1000))
as u64;
self.workspace_overrides
.write()
.await
.insert(session_id.to_string(), expires_at);
expires_at
}
pub async fn run_prompt_async(
&self,
session_id: String,
req: SendMessageRequest,
) -> anyhow::Result<()> {
self.run_prompt_async_with_context(session_id, req, None)
.await
}
pub async fn run_prompt_async_with_context(
&self,
session_id: String,
req: SendMessageRequest,
correlation_id: Option<String>,
) -> anyhow::Result<()> {
let session_model = self
.storage
.get_session(&session_id)
.await
.and_then(|s| s.model);
let (provider_id, model_id_value) =
resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
anyhow::anyhow!(
"MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
)
})?;
let correlation_ref = correlation_id.as_deref();
let model_id = Some(model_id_value.as_str());
let cancel = self.cancellations.create(&session_id).await;
emit_event(
Level::INFO,
ProcessKind::Engine,
ObservabilityEvent {
event: "provider.call.start",
component: "engine.loop",
correlation_id: correlation_ref,
session_id: Some(&session_id),
run_id: None,
message_id: None,
provider_id: Some(provider_id.as_str()),
model_id,
status: Some("start"),
error_code: None,
detail: Some("run_prompt_async dispatch"),
},
);
self.event_bus.publish(EngineEvent::new(
"session.status",
json!({"sessionID": session_id, "status":"running"}),
));
let request_parts = req.parts.clone();
let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
let requested_write_required = req.write_required.unwrap_or(false);
let requested_prewrite_requirements = req.prewrite_requirements.clone().unwrap_or_default();
let request_tool_allowlist = req
.tool_allowlist
.clone()
.unwrap_or_default()
.into_iter()
.map(|tool| normalize_tool_name(&tool))
.filter(|tool| !tool.trim().is_empty())
.collect::<HashSet<_>>();
let text = req
.parts
.iter()
.map(|p| match p {
MessagePartInput::Text { text } => text.clone(),
MessagePartInput::File {
mime,
filename,
url,
} => format!(
"[file mime={} name={} url={}]",
mime,
filename.clone().unwrap_or_else(|| "unknown".to_string()),
url
),
})
.collect::<Vec<_>>()
.join("\n");
let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
self.auto_rename_session_from_user_text(&session_id, &text)
.await;
let active_agent = self.agents.get(req.agent.as_deref()).await;
let mut user_message_id = self
.find_recent_matching_user_message_id(&session_id, &text)
.await;
if user_message_id.is_none() {
let user_message = Message::new(
MessageRole::User,
vec![MessagePart::Text { text: text.clone() }],
);
let created_message_id = user_message.id.clone();
self.storage
.append_message(&session_id, user_message)
.await?;
let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({
"part": user_part,
"delta": text,
"agent": active_agent.name
}),
));
user_message_id = Some(created_message_id);
}
let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
if cancel.is_cancelled() {
self.event_bus.publish(EngineEvent::new(
"session.status",
json!({"sessionID": session_id, "status":"cancelled"}),
));
self.cancellations.remove(&session_id).await;
return Ok(());
}
let mut question_tool_used = false;
let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
if normalize_tool_name(&tool) == "question" {
question_tool_used = true;
}
if !agent_can_use_tool(&active_agent, &tool) {
format!(
"Tool `{tool}` is not enabled for agent `{}`.",
active_agent.name
)
} else {
self.execute_tool_with_permission(
&session_id,
&user_message_id,
tool.clone(),
args,
None,
active_agent.skills.as_deref(),
&text,
requested_write_required,
None,
cancel.clone(),
)
.await?
.unwrap_or_default()
}
} else {
let mut completion = String::new();
let mut max_iterations = max_tool_iterations();
let mut followup_context: Option<String> = None;
let mut last_tool_outputs: Vec<String> = Vec::new();
let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
let mut websearch_query_blocked = false;
let websearch_duplicate_signature_limit = websearch_duplicate_signature_limit();
let mut pack_builder_executed = false;
let mut auto_workspace_probe_attempted = false;
let mut productive_tool_calls_total = 0usize;
let mut productive_write_tool_calls_total = 0usize;
let mut productive_workspace_inspection_total = 0usize;
let mut productive_web_research_total = 0usize;
let mut productive_concrete_read_total = 0usize;
let mut successful_web_research_total = 0usize;
let mut required_tool_retry_count = 0usize;
let mut required_write_retry_count = 0usize;
let mut unmet_prewrite_repair_retry_count = 0usize;
let mut empty_completion_retry_count = 0usize;
let mut prewrite_gate_waived = false;
let mut invalid_tool_args_retry_count = 0usize;
let strict_write_retry_max_attempts = strict_write_retry_max_attempts();
let mut required_tool_unsatisfied_emitted = false;
let mut latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
let email_delivery_requested = requires_email_delivery_prompt(&text);
let web_research_requested = requires_web_research_prompt(&text);
let mut email_action_executed = false;
let mut latest_email_action_note: Option<String> = None;
let intent = classify_intent(&text);
let router_enabled = tool_router_enabled();
let retrieval_enabled = semantic_tool_retrieval_enabled();
let retrieval_k = semantic_tool_retrieval_k();
let mcp_server_names = if mcp_catalog_in_system_prompt_enabled() {
self.tools.mcp_server_names().await
} else {
Vec::new()
};
let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
&& runtime_attachments.is_empty()
&& is_short_simple_prompt(&text)
&& matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
while max_iterations > 0 && !cancel.is_cancelled() {
let iteration = 26usize.saturating_sub(max_iterations);
max_iterations -= 1;
let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
ChatHistoryProfile::Full
} else if matches!(requested_context_mode, ContextMode::Compact)
|| context_is_auto_compact
{
ChatHistoryProfile::Compact
} else {
ChatHistoryProfile::Standard
};
let mut messages =
load_chat_history(self.storage.clone(), &session_id, context_profile).await;
if iteration == 1 && !runtime_attachments.is_empty() {
attach_to_last_user_message(&mut messages, &runtime_attachments);
}
let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
self.event_bus.publish(EngineEvent::new(
"context.profile.selected",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
"historyMessageCount": messages.len(),
"historyCharCount": history_char_count,
"memoryInjected": false
}),
));
let mut system_parts = vec![tandem_runtime_system_prompt(
&self.host_runtime_context,
&mcp_server_names,
)];
if let Some(system) = active_agent.system_prompt.as_ref() {
system_parts.push(system.clone());
}
messages.insert(
0,
ChatMessage {
role: "system".to_string(),
content: system_parts.join("\n\n"),
attachments: Vec::new(),
},
);
if let Some(extra) = followup_context.take() {
messages.push(ChatMessage {
role: "user".to_string(),
content: extra,
attachments: Vec::new(),
});
}
if let Some(hook) = self.prompt_context_hook.read().await.clone() {
let ctx = PromptContextHookContext {
session_id: session_id.clone(),
message_id: user_message_id.clone(),
provider_id: provider_id.clone(),
model_id: model_id_value.clone(),
iteration,
};
let hook_timeout =
Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
match tokio::time::timeout(
hook_timeout,
hook.augment_provider_messages(ctx, messages.clone()),
)
.await
{
Ok(Ok(augmented)) => {
messages = augmented;
}
Ok(Err(err)) => {
self.event_bus.publish(EngineEvent::new(
"memory.context.error",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"error": truncate_text(&err.to_string(), 500),
}),
));
}
Err(_) => {
self.event_bus.publish(EngineEvent::new(
"memory.context.error",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"error": format!(
"prompt context hook timeout after {} ms",
hook_timeout.as_millis()
),
}),
));
}
}
}
let all_tools = self.tools.list().await;
let mut retrieval_fallback_reason: Option<&'static str> = None;
let mut candidate_tools = if retrieval_enabled {
self.tools.retrieve(&text, retrieval_k).await
} else {
all_tools.clone()
};
if retrieval_enabled {
if candidate_tools.is_empty() && !all_tools.is_empty() {
candidate_tools = all_tools.clone();
retrieval_fallback_reason = Some("retrieval_empty_result");
} else if web_research_requested
&& has_web_research_tools(&all_tools)
&& !has_web_research_tools(&candidate_tools)
{
candidate_tools = all_tools.clone();
retrieval_fallback_reason = Some("missing_web_tools_for_research_prompt");
} else if email_delivery_requested
&& has_email_action_tools(&all_tools)
&& !has_email_action_tools(&candidate_tools)
{
candidate_tools = all_tools.clone();
retrieval_fallback_reason = Some("missing_email_tools_for_delivery_prompt");
}
}
let mut tool_schemas = if !router_enabled {
candidate_tools
} else {
match requested_tool_mode {
ToolMode::None => Vec::new(),
ToolMode::Required => select_tool_subset(
candidate_tools,
intent,
&request_tool_allowlist,
iteration > 1,
),
ToolMode::Auto => {
if !auto_tools_escalated {
Vec::new()
} else {
select_tool_subset(
candidate_tools,
intent,
&request_tool_allowlist,
iteration > 1,
)
}
}
}
};
let mut policy_patterns =
request_tool_allowlist.iter().cloned().collect::<Vec<_>>();
if let Some(agent_tools) = active_agent.tools.as_ref() {
policy_patterns
.extend(agent_tools.iter().map(|tool| normalize_tool_name(tool)));
}
let session_allowed_tools = self
.session_allowed_tools
.read()
.await
.get(&session_id)
.cloned()
.unwrap_or_default();
policy_patterns.extend(session_allowed_tools.iter().cloned());
if !policy_patterns.is_empty() {
let mut included = tool_schemas
.iter()
.map(|schema| normalize_tool_name(&schema.name))
.collect::<HashSet<_>>();
for schema in &all_tools {
let normalized = normalize_tool_name(&schema.name);
if policy_patterns
.iter()
.any(|pattern| tool_name_matches_policy(pattern, &normalized))
&& included.insert(normalized)
{
tool_schemas.push(schema.clone());
}
}
}
if !request_tool_allowlist.is_empty() {
tool_schemas.retain(|schema| {
let tool = normalize_tool_name(&schema.name);
request_tool_allowlist
.iter()
.any(|pattern| tool_name_matches_policy(pattern, &tool))
});
}
let prewrite_satisfied = prewrite_requirements_satisfied(
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
);
let prewrite_gate_write = should_gate_write_until_prewrite_satisfied(
requested_prewrite_requirements.repair_on_unmet_requirements,
productive_write_tool_calls_total,
prewrite_satisfied,
) && !prewrite_gate_waived;
let force_write_only_retry = requested_write_required
&& required_write_retry_count > 0
&& (productive_write_tool_calls_total == 0 || prewrite_satisfied)
&& !prewrite_gate_write
&& (!requested_prewrite_requirements.repair_on_unmet_requirements
|| prewrite_gate_waived);
let allow_repair_tools = requested_write_required
&& unmet_prewrite_repair_retry_count > 0
&& !prewrite_satisfied
&& !prewrite_gate_waived;
if prewrite_gate_write {
tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
}
if requested_prewrite_requirements.repair_on_unmet_requirements
&& productive_write_tool_calls_total >= 3
{
tool_schemas.retain(|schema| !is_workspace_write_tool(&schema.name));
}
if allow_repair_tools {
let unmet_prewrite_codes = collect_unmet_prewrite_requirement_codes(
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
);
let repair_tools = tool_schemas
.iter()
.filter(|schema| {
tool_matches_unmet_prewrite_repair_requirement(
&schema.name,
&unmet_prewrite_codes,
)
})
.cloned()
.collect::<Vec<_>>();
if !repair_tools.is_empty() {
tool_schemas = repair_tools;
}
}
if force_write_only_retry && !allow_repair_tools {
tool_schemas.retain(|schema| is_workspace_write_tool(&schema.name));
}
if active_agent.tools.is_some() {
tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
}
tool_schemas.retain(|schema| {
let normalized = normalize_tool_name(&schema.name);
if let Some(server) = mcp_server_from_tool_name(&normalized) {
!blocked_mcp_servers.contains(server)
} else {
true
}
});
if let Some(allowed_tools) = self
.session_allowed_tools
.read()
.await
.get(&session_id)
.cloned()
{
if !allowed_tools.is_empty() {
tool_schemas.retain(|schema| {
let normalized = normalize_tool_name(&schema.name);
any_policy_matches(&allowed_tools, &normalized)
});
}
}
if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
let detail = validation_err.to_string();
emit_event(
Level::ERROR,
ProcessKind::Engine,
ObservabilityEvent {
event: "provider.call.error",
component: "engine.loop",
correlation_id: correlation_ref,
session_id: Some(&session_id),
run_id: None,
message_id: Some(&user_message_id),
provider_id: Some(provider_id.as_str()),
model_id,
status: Some("failed"),
error_code: Some("TOOL_SCHEMA_INVALID"),
detail: Some(&detail),
},
);
anyhow::bail!("{detail}");
}
let routing_decision = ToolRoutingDecision {
pass: if auto_tools_escalated { 2 } else { 1 },
mode: match requested_tool_mode {
ToolMode::Auto => default_mode_name(),
ToolMode::None => "none",
ToolMode::Required => "required",
},
intent,
selected_count: tool_schemas.len(),
total_available_count: all_tools.len(),
mcp_included: tool_schemas
.iter()
.any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
};
self.event_bus.publish(EngineEvent::new(
"tool.routing.decision",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"pass": routing_decision.pass,
"mode": routing_decision.mode,
"intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
"selectedToolCount": routing_decision.selected_count,
"totalAvailableTools": routing_decision.total_available_count,
"mcpIncluded": routing_decision.mcp_included,
"retrievalEnabled": retrieval_enabled,
"retrievalK": retrieval_k,
"fallbackToFullTools": retrieval_fallback_reason.is_some(),
"fallbackReason": retrieval_fallback_reason
}),
));
let allowed_tool_names = tool_schemas
.iter()
.map(|schema| normalize_tool_name(&schema.name))
.collect::<HashSet<_>>();
let offered_tool_preview = tool_schemas
.iter()
.take(8)
.map(|schema| normalize_tool_name(&schema.name))
.collect::<Vec<_>>()
.join(", ");
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.start",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"selectedToolCount": allowed_tool_names.len(),
}),
));
let provider_connect_timeout =
Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
let stream_result = tokio::time::timeout(
provider_connect_timeout,
self.providers.stream_for_provider(
Some(provider_id.as_str()),
Some(model_id_value.as_str()),
messages,
requested_tool_mode.clone(),
Some(tool_schemas),
cancel.clone(),
),
)
.await
.map_err(|_| {
anyhow::anyhow!(
"provider stream connect timeout after {} ms",
provider_connect_timeout.as_millis()
)
})
.and_then(|result| result);
let stream = match stream_result {
Ok(stream) => stream,
Err(err) => {
let error_text = err.to_string();
let error_code = provider_error_code(&error_text);
let detail = truncate_text(&error_text, 500);
emit_event(
Level::ERROR,
ProcessKind::Engine,
ObservabilityEvent {
event: "provider.call.error",
component: "engine.loop",
correlation_id: correlation_ref,
session_id: Some(&session_id),
run_id: None,
message_id: Some(&user_message_id),
provider_id: Some(provider_id.as_str()),
model_id,
status: Some("failed"),
error_code: Some(error_code),
detail: Some(&detail),
},
);
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.error",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"error": detail,
}),
));
return Err(err);
}
};
tokio::pin!(stream);
completion.clear();
let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
let mut provider_usage: Option<TokenUsage> = None;
let mut accepted_tool_calls_in_cycle = 0usize;
let provider_idle_timeout =
Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
loop {
let next_chunk_result =
tokio::time::timeout(provider_idle_timeout, stream.next())
.await
.map_err(|_| {
anyhow::anyhow!(
"provider stream idle timeout after {} ms",
provider_idle_timeout.as_millis()
)
});
let next_chunk = match next_chunk_result {
Ok(next_chunk) => next_chunk,
Err(err) => {
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.error",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"error": truncate_text(&err.to_string(), 500),
}),
));
return Err(err);
}
};
let Some(chunk) = next_chunk else {
break;
};
let chunk = match chunk {
Ok(chunk) => chunk,
Err(err) => {
let error_text = err.to_string();
let error_code = provider_error_code(&error_text);
let detail = truncate_text(&error_text, 500);
emit_event(
Level::ERROR,
ProcessKind::Engine,
ObservabilityEvent {
event: "provider.call.error",
component: "engine.loop",
correlation_id: correlation_ref,
session_id: Some(&session_id),
run_id: None,
message_id: Some(&user_message_id),
provider_id: Some(provider_id.as_str()),
model_id,
status: Some("failed"),
error_code: Some(error_code),
detail: Some(&detail),
},
);
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.error",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"error": detail,
}),
));
return Err(anyhow::anyhow!(
"provider stream chunk error: {error_text}"
));
}
};
match chunk {
StreamChunk::TextDelta(delta) => {
let delta = strip_model_control_markers(&delta);
if delta.trim().is_empty() {
continue;
}
if completion.is_empty() {
emit_event(
Level::INFO,
ProcessKind::Engine,
ObservabilityEvent {
event: "provider.call.first_byte",
component: "engine.loop",
correlation_id: correlation_ref,
session_id: Some(&session_id),
run_id: None,
message_id: Some(&user_message_id),
provider_id: Some(provider_id.as_str()),
model_id,
status: Some("streaming"),
error_code: None,
detail: Some("first text delta"),
},
);
}
completion.push_str(&delta);
let delta = truncate_text(&delta, 4_000);
let delta_part =
WireMessagePart::text(&session_id, &user_message_id, delta.clone());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": delta_part, "delta": delta}),
));
}
StreamChunk::ReasoningDelta(_reasoning) => {}
StreamChunk::Done {
finish_reason: _,
usage,
} => {
if usage.is_some() {
provider_usage = usage;
}
break;
}
StreamChunk::ToolCallStart { id, name } => {
let entry = streamed_tool_calls.entry(id).or_default();
if entry.name.is_empty() {
entry.name = name;
}
}
StreamChunk::ToolCallDelta { id, args_delta } => {
let entry = streamed_tool_calls.entry(id.clone()).or_default();
entry.args.push_str(&args_delta);
let tool_name = if entry.name.trim().is_empty() {
"tool".to_string()
} else {
normalize_tool_name(&entry.name)
};
let parsed_preview = if entry.name.trim().is_empty() {
Value::String(truncate_text(&entry.args, 1_000))
} else {
parse_streamed_tool_args(&tool_name, &entry.args)
};
let mut tool_part = WireMessagePart::tool_invocation(
&session_id,
&user_message_id,
tool_name.clone(),
parsed_preview.clone(),
);
tool_part.id = Some(id.clone());
if tool_name == "write" {
tracing::info!(
session_id = %session_id,
message_id = %user_message_id,
tool_call_id = %id,
args_delta_len = args_delta.len(),
accumulated_args_len = entry.args.len(),
parsed_preview_empty = parsed_preview.is_null()
|| parsed_preview.as_object().is_some_and(|value| value.is_empty())
|| parsed_preview
.as_str()
.map(|value| value.trim().is_empty())
.unwrap_or(false),
"streamed write tool args delta received"
);
}
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({
"part": tool_part,
"toolCallDelta": {
"id": id,
"tool": tool_name,
"argsDelta": truncate_text(&args_delta, 1_000),
"rawArgsPreview": truncate_text(&entry.args, 2_000),
"parsedArgsPreview": parsed_preview
}
}),
));
}
StreamChunk::ToolCallEnd { id: _ } => {}
}
if cancel.is_cancelled() {
break;
}
}
let streamed_tool_call_count = streamed_tool_calls.len();
let streamed_tool_call_parse_failed = streamed_tool_calls
.values()
.any(|call| !call.args.trim().is_empty() && call.name.trim().is_empty());
let mut tool_calls = streamed_tool_calls
.into_iter()
.filter_map(|(call_id, call)| {
if call.name.trim().is_empty() {
return None;
}
let tool_name = normalize_tool_name(&call.name);
let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
Some(ParsedToolCall {
tool: tool_name,
args: parsed_args,
call_id: Some(call_id),
})
})
.collect::<Vec<_>>();
if tool_calls.is_empty() {
tool_calls = parse_tool_invocations_from_response(&completion)
.into_iter()
.map(|(tool, args)| ParsedToolCall {
tool,
args,
call_id: None,
})
.collect::<Vec<_>>();
}
let provider_tool_parse_failed = tool_calls.is_empty()
&& (streamed_tool_call_parse_failed
|| (streamed_tool_call_count > 0
&& looks_like_unparsed_tool_payload(&completion))
|| looks_like_unparsed_tool_payload(&completion));
if provider_tool_parse_failed {
latest_required_tool_failure_kind =
RequiredToolFailureKind::ToolCallParseFailed;
} else if tool_calls.is_empty() {
latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
}
if router_enabled
&& matches!(requested_tool_mode, ToolMode::Auto)
&& !auto_tools_escalated
&& iteration == 1
&& should_escalate_auto_tools(intent, &text, &completion)
{
auto_tools_escalated = true;
followup_context = Some(
"Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
.to_string(),
);
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "auto_escalate",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
}),
));
continue;
}
if tool_calls.is_empty()
&& !auto_workspace_probe_attempted
&& should_force_workspace_probe(&text, &completion)
&& allowed_tool_names.contains("glob")
{
auto_workspace_probe_attempted = true;
tool_calls = vec![ParsedToolCall {
tool: "glob".to_string(),
args: json!({ "pattern": "*" }),
call_id: None,
}];
}
if !tool_calls.is_empty() {
let saw_tool_call_candidate = true;
let mut outputs = Vec::new();
let mut executed_productive_tool = false;
let mut write_tool_attempted_in_cycle = false;
let mut auth_required_hit_in_cycle = false;
let mut guard_budget_hit_in_cycle = false;
let mut duplicate_signature_hit_in_cycle = false;
let mut rejected_tool_call_in_cycle = false;
for ParsedToolCall {
tool,
args,
call_id,
} in tool_calls
{
if !agent_can_use_tool(&active_agent, &tool) {
rejected_tool_call_in_cycle = true;
continue;
}
let tool_key = normalize_tool_name(&tool);
if is_workspace_write_tool(&tool_key) {
write_tool_attempted_in_cycle = true;
}
if !allowed_tool_names.contains(&tool_key) {
rejected_tool_call_in_cycle = true;
let note = if offered_tool_preview.is_empty() {
format!(
"Tool `{}` call skipped: it is not available in this turn.",
tool_key
)
} else {
format!(
"Tool `{}` call skipped: it is not available in this turn. Available tools: {}.",
tool_key, offered_tool_preview
)
};
self.event_bus.publish(EngineEvent::new(
"tool.call.rejected_unoffered",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"tool": tool_key,
"offeredToolCount": allowed_tool_names.len()
}),
));
if tool_name_looks_like_email_action(&tool_key) {
latest_email_action_note = Some(note.clone());
}
outputs.push(note);
continue;
}
if let Some(server) = mcp_server_from_tool_name(&tool_key) {
if blocked_mcp_servers.contains(server) {
rejected_tool_call_in_cycle = true;
outputs.push(format!(
"Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
tool_key, server
));
continue;
}
}
if tool_key == "question" {
question_tool_used = true;
}
if tool_key == "pack_builder" && pack_builder_executed {
rejected_tool_call_in_cycle = true;
outputs.push(
"Tool `pack_builder` call skipped: already executed in this run. Provide a final response or ask any required follow-up question."
.to_string(),
);
continue;
}
if websearch_query_blocked && tool_key == "websearch" {
rejected_tool_call_in_cycle = true;
outputs.push(
"Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
.to_string(),
);
continue;
}
let mut effective_args = args.clone();
if tool_key == "todo_write" {
effective_args = normalize_todo_write_args(effective_args, &completion);
if is_empty_todo_write_args(&effective_args) {
rejected_tool_call_in_cycle = true;
outputs.push(
"Tool `todo_write` call skipped: empty todo payload."
.to_string(),
);
continue;
}
}
let signature = if tool_key == "batch" {
batch_tool_signature(&args)
.unwrap_or_else(|| tool_signature(&tool_key, &args))
} else {
tool_signature(&tool_key, &args)
};
if is_shell_tool_name(&tool_key)
&& shell_mismatch_signatures.contains(&signature)
{
rejected_tool_call_in_cycle = true;
outputs.push(
"Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
.to_string(),
);
continue;
}
let mut signature_count = 1usize;
if is_read_only_tool(&tool_key)
|| (tool_key == "batch" && is_read_only_batch_call(&args))
{
let count = readonly_signature_counts
.entry(signature.clone())
.and_modify(|v| *v = v.saturating_add(1))
.or_insert(1);
signature_count = *count;
if tool_key == "websearch" {
if let Some(limit) = websearch_duplicate_signature_limit {
if *count > limit {
rejected_tool_call_in_cycle = true;
self.event_bus.publish(EngineEvent::new(
"tool.loop_guard.triggered",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"tool": tool_key,
"reason": "duplicate_signature_retry_exhausted",
"duplicateLimit": limit,
"queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
"loop_guard_triggered": true
}),
));
outputs.push(
"Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
.to_string(),
);
continue;
}
}
}
if tool_key != "websearch" && *count > 1 {
rejected_tool_call_in_cycle = true;
if let Some(cached) = readonly_tool_cache.get(&signature) {
outputs.push(cached.clone());
} else {
outputs.push(format!(
"Tool `{}` call skipped: duplicate call signature detected.",
tool_key
));
}
continue;
}
}
let is_read_only_signature = is_read_only_tool(&tool_key)
|| (tool_key == "batch" && is_read_only_batch_call(&args));
if !is_read_only_signature {
let duplicate_limit = duplicate_signature_limit_for(&tool_key);
let seen = mutable_signature_counts
.entry(signature.clone())
.and_modify(|v| *v = v.saturating_add(1))
.or_insert(1);
if *seen > duplicate_limit {
rejected_tool_call_in_cycle = true;
self.event_bus.publish(EngineEvent::new(
"tool.loop_guard.triggered",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"tool": tool_key,
"reason": "duplicate_signature_retry_exhausted",
"signatureHash": stable_hash(&signature),
"duplicateLimit": duplicate_limit,
"loop_guard_triggered": true
}),
));
outputs.push(format!(
"Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
tool_key, duplicate_limit
));
duplicate_signature_hit_in_cycle = true;
continue;
}
}
let budget = tool_budget_for(&tool_key);
let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
if *entry >= budget {
rejected_tool_call_in_cycle = true;
outputs.push(format!(
"Tool `{}` call skipped: per-run guard budget exceeded ({}).",
tool_key, budget
));
guard_budget_hit_in_cycle = true;
continue;
}
let mut finalized_part = WireMessagePart::tool_invocation(
&session_id,
&user_message_id,
tool.clone(),
effective_args.clone(),
);
if let Some(call_id) = call_id.clone() {
finalized_part.id = Some(call_id);
}
finalized_part.state = Some("pending".to_string());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": finalized_part}),
));
*entry += 1;
accepted_tool_calls_in_cycle =
accepted_tool_calls_in_cycle.saturating_add(1);
if let Some(output) = self
.execute_tool_with_permission(
&session_id,
&user_message_id,
tool,
effective_args,
call_id,
active_agent.skills.as_deref(),
&text,
requested_write_required,
Some(&completion),
cancel.clone(),
)
.await?
{
let productive = is_productive_tool_output(&tool_key, &output);
if output.contains("WEBSEARCH_QUERY_MISSING") {
websearch_query_blocked = true;
}
if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
{
shell_mismatch_signatures.insert(signature.clone());
}
if is_read_only_tool(&tool_key)
&& tool_key != "websearch"
&& signature_count == 1
{
readonly_tool_cache.insert(signature, output.clone());
}
if productive {
productive_tool_calls_total =
productive_tool_calls_total.saturating_add(1);
if is_workspace_write_tool(&tool_key) {
productive_write_tool_calls_total =
productive_write_tool_calls_total.saturating_add(1);
}
if is_workspace_inspection_tool(&tool_key) {
productive_workspace_inspection_total =
productive_workspace_inspection_total.saturating_add(1);
}
if tool_key == "read" {
productive_concrete_read_total =
productive_concrete_read_total.saturating_add(1);
}
if is_web_research_tool(&tool_key) {
productive_web_research_total =
productive_web_research_total.saturating_add(1);
if is_successful_web_research_output(&tool_key, &output) {
successful_web_research_total =
successful_web_research_total.saturating_add(1);
}
}
executed_productive_tool = true;
if tool_key == "pack_builder" {
pack_builder_executed = true;
}
}
if tool_name_looks_like_email_action(&tool_key) {
if productive {
email_action_executed = true;
} else {
latest_email_action_note =
Some(truncate_text(&output, 280).replace('\n', " "));
}
}
if is_auth_required_tool_output(&output) {
if let Some(server) = mcp_server_from_tool_name(&tool_key) {
blocked_mcp_servers.insert(server.to_string());
}
auth_required_hit_in_cycle = true;
}
outputs.push(output);
if auth_required_hit_in_cycle {
break;
}
if guard_budget_hit_in_cycle {
break;
}
}
}
if !outputs.is_empty() {
last_tool_outputs = outputs.clone();
if matches!(requested_tool_mode, ToolMode::Required)
&& productive_tool_calls_total == 0
{
latest_required_tool_failure_kind = classify_required_tool_failure(
&outputs,
saw_tool_call_candidate,
accepted_tool_calls_in_cycle,
provider_tool_parse_failed,
rejected_tool_call_in_cycle,
);
if requested_write_required
&& write_tool_attempted_in_cycle
&& productive_write_tool_calls_total == 0
&& is_write_invalid_args_failure_kind(
latest_required_tool_failure_kind,
)
{
if required_write_retry_count + 1 < strict_write_retry_max_attempts
{
required_write_retry_count += 1;
required_tool_retry_count += 1;
followup_context = Some(build_write_required_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
&text,
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "required_write_invalid_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
}),
));
continue;
}
}
if !requested_write_required && required_tool_retry_count == 0 {
required_tool_retry_count += 1;
followup_context = Some(build_required_tool_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "required_tool_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
}),
));
continue;
}
completion = required_tool_mode_unsatisfied_completion(
latest_required_tool_failure_kind,
);
if !required_tool_unsatisfied_emitted {
required_tool_unsatisfied_emitted = true;
self.event_bus.publish(EngineEvent::new(
"tool.mode.required.unsatisfied",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"selectedToolCount": allowed_tool_names.len(),
"offeredToolsPreview": offered_tool_preview,
"reason": latest_required_tool_failure_kind.code(),
}),
));
}
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "required_tool_unsatisfied",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
}),
));
break;
}
let prewrite_satisfied = prewrite_requirements_satisfied(
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
);
let unmet_prewrite_codes = collect_unmet_prewrite_requirement_codes(
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
);
if requested_write_required
&& productive_tool_calls_total > 0
&& productive_write_tool_calls_total == 0
{
if should_start_prewrite_repair_before_first_write(
requested_prewrite_requirements.repair_on_unmet_requirements,
productive_write_tool_calls_total,
prewrite_satisfied,
) {
if unmet_prewrite_repair_retry_count
< prewrite_repair_retry_max_attempts()
{
unmet_prewrite_repair_retry_count += 1;
let repair_attempt = unmet_prewrite_repair_retry_count;
let repair_attempts_remaining =
prewrite_repair_retry_max_attempts()
.saturating_sub(repair_attempt);
followup_context = Some(build_prewrite_repair_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
&text,
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "prewrite_repair_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
"repair": prewrite_repair_event_payload(
repair_attempt,
repair_attempts_remaining,
&unmet_prewrite_codes,
false,
),
}),
));
continue;
}
if !prewrite_gate_waived {
prewrite_gate_waived = true;
let repair_attempt = unmet_prewrite_repair_retry_count;
let repair_attempts_remaining =
prewrite_repair_retry_max_attempts()
.saturating_sub(repair_attempt);
followup_context = Some(build_prewrite_waived_write_context(
&text,
&unmet_prewrite_codes,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "prewrite_gate_waived",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"prewriteGateWaived": true,
"repair": prewrite_repair_event_payload(
repair_attempt,
repair_attempts_remaining,
&unmet_prewrite_codes,
true,
),
}),
));
continue;
}
}
latest_required_tool_failure_kind =
RequiredToolFailureKind::WriteRequiredNotSatisfied;
if required_write_retry_count + 1 < strict_write_retry_max_attempts {
required_write_retry_count += 1;
followup_context = Some(build_write_required_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
&text,
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "required_write_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
}),
));
continue;
}
completion = required_tool_mode_unsatisfied_completion(
latest_required_tool_failure_kind,
);
if !required_tool_unsatisfied_emitted {
required_tool_unsatisfied_emitted = true;
self.event_bus.publish(EngineEvent::new(
"tool.mode.required.unsatisfied",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"selectedToolCount": allowed_tool_names.len(),
"offeredToolsPreview": offered_tool_preview,
"reason": latest_required_tool_failure_kind.code(),
}),
));
}
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "required_write_unsatisfied",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
}),
));
break;
}
if invalid_tool_args_retry_count < invalid_tool_args_retry_max_attempts() {
if let Some(retry_context) =
build_invalid_tool_args_retry_context_from_outputs(
&outputs,
invalid_tool_args_retry_count,
)
{
invalid_tool_args_retry_count += 1;
followup_context = Some(format!(
"Previous tool call arguments were invalid. {}",
retry_context
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "invalid_tool_args_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
}),
));
continue;
}
}
let guard_budget_hit =
outputs.iter().any(|o| is_guard_budget_tool_output(o));
if executed_productive_tool {
let prewrite_satisfied = prewrite_requirements_satisfied(
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
);
let unmet_prewrite_codes = collect_unmet_prewrite_requirement_codes(
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
);
if requested_write_required
&& productive_write_tool_calls_total > 0
&& requested_prewrite_requirements.repair_on_unmet_requirements
&& unmet_prewrite_repair_retry_count
< prewrite_repair_retry_max_attempts()
&& !prewrite_satisfied
{
unmet_prewrite_repair_retry_count += 1;
let repair_attempt = unmet_prewrite_repair_retry_count;
let repair_attempts_remaining =
prewrite_repair_retry_max_attempts()
.saturating_sub(repair_attempt);
followup_context = Some(build_prewrite_repair_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
&text,
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "prewrite_repair_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
"repair": prewrite_repair_event_payload(
repair_attempt,
repair_attempts_remaining,
&unmet_prewrite_codes,
false,
),
}),
));
continue;
}
followup_context = Some(format!(
"{}\nContinue with a concise final response and avoid repeating identical tool calls.",
summarize_tool_outputs(&outputs)
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "tool_followup",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
}),
));
continue;
}
if guard_budget_hit {
completion = summarize_guard_budget_outputs(&outputs)
.unwrap_or_else(|| {
"This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
});
} else if duplicate_signature_hit_in_cycle {
completion = summarize_duplicate_signature_outputs(&outputs)
.unwrap_or_else(|| {
"This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
});
} else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
completion = summary;
} else {
completion.clear();
}
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "tool_summary",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
}),
));
break;
} else if matches!(requested_tool_mode, ToolMode::Required) {
latest_required_tool_failure_kind = classify_required_tool_failure(
&outputs,
saw_tool_call_candidate,
accepted_tool_calls_in_cycle,
provider_tool_parse_failed,
rejected_tool_call_in_cycle,
);
}
}
if let Some(usage) = provider_usage {
self.event_bus.publish(EngineEvent::new(
"provider.usage",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"promptTokens": usage.prompt_tokens,
"completionTokens": usage.completion_tokens,
"totalTokens": usage.total_tokens,
}),
));
}
if matches!(requested_tool_mode, ToolMode::Required)
&& productive_tool_calls_total == 0
{
if requested_write_required
&& required_write_retry_count > 0
&& productive_write_tool_calls_total == 0
&& !is_write_invalid_args_failure_kind(latest_required_tool_failure_kind)
{
latest_required_tool_failure_kind =
RequiredToolFailureKind::WriteRequiredNotSatisfied;
}
if requested_write_required
&& required_write_retry_count + 1 < strict_write_retry_max_attempts
{
required_write_retry_count += 1;
followup_context = Some(build_write_required_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
&text,
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
));
continue;
}
if !requested_write_required && required_tool_retry_count == 0 {
required_tool_retry_count += 1;
followup_context = Some(build_required_tool_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
));
continue;
}
completion = required_tool_mode_unsatisfied_completion(
latest_required_tool_failure_kind,
);
if !required_tool_unsatisfied_emitted {
required_tool_unsatisfied_emitted = true;
self.event_bus.publish(EngineEvent::new(
"tool.mode.required.unsatisfied",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"selectedToolCount": allowed_tool_names.len(),
"offeredToolsPreview": offered_tool_preview,
"reason": latest_required_tool_failure_kind.code(),
}),
));
}
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "required_tool_unsatisfied",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
}),
));
} else {
if completion.trim().is_empty()
&& !last_tool_outputs.is_empty()
&& requested_write_required
&& empty_completion_retry_count == 0
{
empty_completion_retry_count += 1;
followup_context = Some(build_empty_completion_retry_context(
&offered_tool_preview,
&text,
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "empty_completion_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
}),
));
continue;
}
if should_start_prewrite_repair_before_first_write(
requested_prewrite_requirements.repair_on_unmet_requirements,
productive_write_tool_calls_total,
prewrite_satisfied,
) && !prewrite_gate_waived
{
let unmet_prewrite_codes = collect_unmet_prewrite_requirement_codes(
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
);
if unmet_prewrite_repair_retry_count < prewrite_repair_retry_max_attempts()
{
unmet_prewrite_repair_retry_count += 1;
let repair_attempt = unmet_prewrite_repair_retry_count;
let repair_attempts_remaining =
prewrite_repair_retry_max_attempts().saturating_sub(repair_attempt);
followup_context = Some(build_prewrite_repair_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
&text,
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "prewrite_repair_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"requiredToolFailureReason": latest_required_tool_failure_kind.code(),
"repair": prewrite_repair_event_payload(
repair_attempt,
repair_attempts_remaining,
&unmet_prewrite_codes,
false,
),
}),
));
continue;
}
prewrite_gate_waived = true;
let repair_attempt = unmet_prewrite_repair_retry_count;
let repair_attempts_remaining =
prewrite_repair_retry_max_attempts().saturating_sub(repair_attempt);
followup_context = Some(build_prewrite_waived_write_context(
&text,
&unmet_prewrite_codes,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "prewrite_gate_waived",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
"prewriteGateWaived": true,
"repair": prewrite_repair_event_payload(
repair_attempt,
repair_attempts_remaining,
&unmet_prewrite_codes,
true,
),
}),
));
continue;
}
if prewrite_gate_waived
&& requested_write_required
&& productive_write_tool_calls_total == 0
&& required_write_retry_count + 1 < strict_write_retry_max_attempts
{
required_write_retry_count += 1;
followup_context = Some(build_write_required_retry_context(
&offered_tool_preview,
latest_required_tool_failure_kind,
&text,
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
));
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "waived_write_retry",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
}),
));
continue;
}
self.event_bus.publish(EngineEvent::new(
"provider.call.iteration.finish",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"iteration": iteration,
"finishReason": "provider_completion",
"acceptedToolCalls": accepted_tool_calls_in_cycle,
"rejectedToolCalls": 0,
}),
));
}
break;
}
if matches!(requested_tool_mode, ToolMode::Required) && productive_tool_calls_total == 0
{
completion =
required_tool_mode_unsatisfied_completion(latest_required_tool_failure_kind);
if !required_tool_unsatisfied_emitted {
self.event_bus.publish(EngineEvent::new(
"tool.mode.required.unsatisfied",
json!({
"sessionID": session_id,
"messageID": user_message_id,
"selectedToolCount": tool_call_counts.len(),
"reason": latest_required_tool_failure_kind.code(),
}),
));
}
}
if completion.trim().is_empty()
&& !last_tool_outputs.is_empty()
&& requested_write_required
&& productive_write_tool_calls_total > 0
{
let final_prewrite_satisfied = prewrite_requirements_satisfied(
&requested_prewrite_requirements,
productive_workspace_inspection_total > 0,
productive_concrete_read_total > 0,
productive_web_research_total > 0,
successful_web_research_total > 0,
);
completion = synthesize_artifact_write_completion_from_tool_state(
&text,
final_prewrite_satisfied,
prewrite_gate_waived,
);
}
if completion.trim().is_empty()
&& !last_tool_outputs.is_empty()
&& should_generate_post_tool_final_narrative(
requested_tool_mode,
productive_tool_calls_total,
)
{
if let Some(narrative) = self
.generate_final_narrative_without_tools(
&session_id,
&active_agent,
Some(provider_id.as_str()),
Some(model_id_value.as_str()),
cancel.clone(),
&last_tool_outputs,
)
.await
{
completion = narrative;
}
}
if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
let preview = last_tool_outputs
.iter()
.take(3)
.map(|o| truncate_text(o, 240))
.collect::<Vec<_>>()
.join("\n");
completion = format!(
"I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
preview
);
}
if completion.trim().is_empty() {
completion =
"I couldn't produce a final response for that run. Please retry your request."
.to_string();
}
if email_delivery_requested
&& !email_action_executed
&& completion_claims_email_sent(&completion)
{
let mut fallback = "I could not verify that an email was sent in this run. I did not complete the delivery action."
.to_string();
if let Some(note) = latest_email_action_note.as_ref() {
fallback.push_str("\n\nLast email tool status: ");
fallback.push_str(note);
}
fallback.push_str(
"\n\nPlease retry with an explicit available email tool (for example a draft, reply, or send MCP tool in your current connector set).",
);
completion = fallback;
}
completion = strip_model_control_markers(&completion);
truncate_text(&completion, 16_000)
};
emit_event(
Level::INFO,
ProcessKind::Engine,
ObservabilityEvent {
event: "provider.call.finish",
component: "engine.loop",
correlation_id: correlation_ref,
session_id: Some(&session_id),
run_id: None,
message_id: Some(&user_message_id),
provider_id: Some(provider_id.as_str()),
model_id,
status: Some("ok"),
error_code: None,
detail: Some("provider stream complete"),
},
);
if active_agent.name.eq_ignore_ascii_case("plan") {
emit_plan_todo_fallback(
self.storage.clone(),
&self.event_bus,
&session_id,
&user_message_id,
&completion,
)
.await;
let todos_after_fallback = self.storage.get_todos(&session_id).await;
if todos_after_fallback.is_empty() && !question_tool_used {
emit_plan_question_fallback(
self.storage.clone(),
&self.event_bus,
&session_id,
&user_message_id,
&completion,
)
.await;
}
}
if cancel.is_cancelled() {
self.event_bus.publish(EngineEvent::new(
"session.status",
json!({"sessionID": session_id, "status":"cancelled"}),
));
self.cancellations.remove(&session_id).await;
return Ok(());
}
let assistant = Message::new(
MessageRole::Assistant,
vec![MessagePart::Text {
text: completion.clone(),
}],
);
let assistant_message_id = assistant.id.clone();
self.storage.append_message(&session_id, assistant).await?;
let final_part = WireMessagePart::text(
&session_id,
&assistant_message_id,
truncate_text(&completion, 16_000),
);
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": final_part}),
));
self.event_bus.publish(EngineEvent::new(
"session.updated",
json!({"sessionID": session_id, "status":"idle"}),
));
self.event_bus.publish(EngineEvent::new(
"session.status",
json!({"sessionID": session_id, "status":"idle"}),
));
self.cancellations.remove(&session_id).await;
Ok(())
}
pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
self.providers.default_complete(&prompt).await
}
pub async fn run_oneshot_for_provider(
&self,
prompt: String,
provider_id: Option<&str>,
) -> anyhow::Result<String> {
self.providers
.complete_for_provider(provider_id, &prompt, None)
.await
}
#[allow(clippy::too_many_arguments)]
async fn execute_tool_with_permission(
&self,
session_id: &str,
message_id: &str,
tool: String,
args: Value,
initial_tool_call_id: Option<String>,
equipped_skills: Option<&[String]>,
latest_user_text: &str,
write_required: bool,
latest_assistant_context: Option<&str>,
cancel: CancellationToken,
) -> anyhow::Result<Option<String>> {
let tool = normalize_tool_name(&tool);
let raw_args = args.clone();
let normalized = normalize_tool_args_with_mode(
&tool,
args,
latest_user_text,
latest_assistant_context.unwrap_or_default(),
if write_required {
WritePathRecoveryMode::OutputTargetOnly
} else {
WritePathRecoveryMode::Heuristic
},
);
let raw_args_preview = truncate_text(&raw_args.to_string(), 2_000);
let normalized_args_preview = truncate_text(&normalized.args.to_string(), 2_000);
self.event_bus.publish(EngineEvent::new(
"tool.args.normalized",
json!({
"sessionID": session_id,
"messageID": message_id,
"tool": tool,
"argsSource": normalized.args_source,
"argsIntegrity": normalized.args_integrity,
"rawArgsState": normalized.raw_args_state.as_str(),
"rawArgsPreview": raw_args_preview,
"normalizedArgsPreview": normalized_args_preview,
"query": normalized.query,
"queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
"requestID": Value::Null
}),
));
if normalized.args_integrity == "recovered" {
self.event_bus.publish(EngineEvent::new(
"tool.args.recovered",
json!({
"sessionID": session_id,
"messageID": message_id,
"tool": tool,
"argsSource": normalized.args_source,
"rawArgsPreview": raw_args_preview,
"normalizedArgsPreview": normalized_args_preview,
"query": normalized.query,
"queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
"requestID": Value::Null
}),
));
}
if normalized.missing_terminal {
let missing_reason = normalized
.missing_terminal_reason
.clone()
.unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
let latest_user_preview = truncate_text(latest_user_text, 500);
let latest_assistant_preview =
truncate_text(latest_assistant_context.unwrap_or_default(), 500);
self.event_bus.publish(EngineEvent::new(
"tool.args.missing_terminal",
json!({
"sessionID": session_id,
"messageID": message_id,
"tool": tool,
"argsSource": normalized.args_source,
"argsIntegrity": normalized.args_integrity,
"rawArgsState": normalized.raw_args_state.as_str(),
"requestID": Value::Null,
"error": missing_reason,
"rawArgsPreview": raw_args_preview,
"normalizedArgsPreview": normalized_args_preview,
"latestUserPreview": latest_user_preview,
"latestAssistantPreview": latest_assistant_preview,
}),
));
if tool == "write" {
tracing::warn!(
session_id = %session_id,
message_id = %message_id,
tool = %tool,
reason = %missing_reason,
args_source = %normalized.args_source,
args_integrity = %normalized.args_integrity,
raw_args_state = %normalized.raw_args_state.as_str(),
raw_args = %raw_args_preview,
normalized_args = %normalized_args_preview,
latest_user = %latest_user_preview,
latest_assistant = %latest_assistant_preview,
"write tool arguments missing terminal field"
);
}
let best_effort_args = persisted_failed_tool_args(&raw_args, &normalized.args);
let mut failed_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(best_effort_args),
json!(null),
);
failed_part.state = Some("failed".to_string());
let surfaced_reason =
provider_specific_write_reason(&tool, &missing_reason, normalized.raw_args_state)
.unwrap_or_else(|| missing_reason.clone());
failed_part.error = Some(surfaced_reason.clone());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": failed_part}),
));
return Ok(Some(surfaced_reason));
}
let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
Ok(args) => args,
Err(message) => return Ok(Some(message)),
};
if let Some(allowed_tools) = self
.session_allowed_tools
.read()
.await
.get(session_id)
.cloned()
{
if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
return Ok(Some(format!("Tool `{tool}` is not allowed for this run.")));
}
}
if let Some(hook) = self.tool_policy_hook.read().await.clone() {
let decision = hook
.evaluate_tool(ToolPolicyContext {
session_id: session_id.to_string(),
message_id: message_id.to_string(),
tool: tool.clone(),
args: args.clone(),
})
.await?;
if !decision.allowed {
let reason = decision
.reason
.unwrap_or_else(|| "Tool denied by runtime policy".to_string());
let mut blocked_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args.clone()),
json!(null),
);
blocked_part.state = Some("failed".to_string());
blocked_part.error = Some(reason.clone());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": blocked_part}),
));
return Ok(Some(reason));
}
}
let mut tool_call_id: Option<String> = initial_tool_call_id;
if let Some(violation) = self
.workspace_sandbox_violation(session_id, &tool, &args)
.await
{
let mut blocked_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args.clone()),
json!(null),
);
blocked_part.state = Some("failed".to_string());
blocked_part.error = Some(violation.clone());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": blocked_part}),
));
return Ok(Some(violation));
}
let rule = self
.plugins
.permission_override(&tool)
.await
.unwrap_or(self.permissions.evaluate(&tool, &tool).await);
if matches!(rule, PermissionAction::Deny) {
return Ok(Some(format!(
"Permission denied for tool `{tool}` by policy."
)));
}
let mut effective_args = args.clone();
if matches!(rule, PermissionAction::Ask) {
let auto_approve_permissions = self
.session_auto_approve_permissions
.read()
.await
.get(session_id)
.copied()
.unwrap_or(false);
if auto_approve_permissions {
self.event_bus.publish(EngineEvent::new(
"permission.auto_approved",
json!({
"sessionID": session_id,
"messageID": message_id,
"tool": tool,
}),
));
effective_args = args;
} else {
let pending = self
.permissions
.ask_for_session_with_context(
Some(session_id),
&tool,
args.clone(),
Some(crate::PermissionArgsContext {
args_source: normalized.args_source.clone(),
args_integrity: normalized.args_integrity.clone(),
query: normalized.query.clone(),
}),
)
.await;
let mut pending_part = WireMessagePart::tool_invocation(
session_id,
message_id,
tool.clone(),
args.clone(),
);
pending_part.id = Some(pending.id.clone());
tool_call_id = Some(pending.id.clone());
pending_part.state = Some("pending".to_string());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": pending_part}),
));
let reply = self
.permissions
.wait_for_reply_with_timeout(
&pending.id,
cancel.clone(),
Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
)
.await;
let (reply, timed_out) = reply;
if cancel.is_cancelled() {
return Ok(None);
}
if timed_out {
let timeout_ms = permission_wait_timeout_ms();
self.event_bus.publish(EngineEvent::new(
"permission.wait.timeout",
json!({
"sessionID": session_id,
"messageID": message_id,
"tool": tool,
"requestID": pending.id,
"timeoutMs": timeout_ms,
}),
));
let mut timeout_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args.clone()),
json!(null),
);
timeout_part.id = Some(pending.id);
timeout_part.state = Some("failed".to_string());
timeout_part.error = Some(format!(
"Permission request timed out after {} ms",
timeout_ms
));
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": timeout_part}),
));
return Ok(Some(format!(
"Permission request for tool `{tool}` timed out after {timeout_ms} ms."
)));
}
let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
if !approved {
let mut denied_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args.clone()),
json!(null),
);
denied_part.id = Some(pending.id);
denied_part.state = Some("denied".to_string());
denied_part.error = Some("Permission denied by user".to_string());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": denied_part}),
));
return Ok(Some(format!(
"Permission denied for tool `{tool}` by user."
)));
}
effective_args = args;
}
}
let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
let session = self.storage.get_session(session_id).await;
if let (Some(obj), Some(session)) = (args.as_object_mut(), session.as_ref()) {
obj.insert(
"__session_id".to_string(),
Value::String(session_id.to_string()),
);
if let Some(project_id) = session.project_id.clone() {
obj.insert(
"__project_id".to_string(),
Value::String(project_id.clone()),
);
if project_id.starts_with("channel-public::") {
obj.insert(
"__memory_max_visible_scope".to_string(),
Value::String("project".to_string()),
);
}
}
}
let tool_context = self.resolve_tool_execution_context(session_id).await;
if let Some((workspace_root, effective_cwd, project_id)) = tool_context.as_ref() {
args = rewrite_workspace_alias_tool_args(&tool, args, workspace_root);
if let Some(obj) = args.as_object_mut() {
obj.insert(
"__workspace_root".to_string(),
Value::String(workspace_root.clone()),
);
obj.insert(
"__effective_cwd".to_string(),
Value::String(effective_cwd.clone()),
);
obj.insert(
"__session_id".to_string(),
Value::String(session_id.to_string()),
);
if let Some(project_id) = project_id.clone() {
obj.insert("__project_id".to_string(), Value::String(project_id));
}
}
tracing::info!(
"tool execution context session_id={} tool={} workspace_root={} effective_cwd={} project_id={}",
session_id,
tool,
workspace_root,
effective_cwd,
project_id.clone().unwrap_or_default()
);
}
let mut invoke_part =
WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
if let Some(call_id) = tool_call_id.clone() {
invoke_part.id = Some(call_id);
}
let invoke_part_id = invoke_part.id.clone();
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": invoke_part}),
));
let args_for_side_events = args.clone();
if tool == "spawn_agent" {
let hook = self.spawn_agent_hook.read().await.clone();
if let Some(hook) = hook {
let spawned = hook
.spawn_agent(SpawnAgentToolContext {
session_id: session_id.to_string(),
message_id: message_id.to_string(),
tool_call_id: invoke_part_id.clone(),
args: args_for_side_events.clone(),
})
.await?;
let output = self.plugins.transform_tool_output(spawned.output).await;
let output = truncate_text(&output, 16_000);
emit_tool_side_events(
self.storage.clone(),
&self.event_bus,
ToolSideEventContext {
session_id,
message_id,
tool: &tool,
args: &args_for_side_events,
metadata: &spawned.metadata,
workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
},
)
.await;
let mut result_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args_for_side_events.clone()),
json!(output.clone()),
);
result_part.id = invoke_part_id;
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": result_part}),
));
return Ok(Some(truncate_text(
&format!("Tool `{tool}` result:\n{output}"),
16_000,
)));
}
let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
let mut failed_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args_for_side_events.clone()),
json!(null),
);
failed_part.id = invoke_part_id.clone();
failed_part.state = Some("failed".to_string());
failed_part.error = Some(output.to_string());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": failed_part}),
));
return Ok(Some(output.to_string()));
}
let result = match self
.execute_tool_with_timeout(&tool, args, cancel.clone())
.await
{
Ok(result) => result,
Err(err) => {
let err_text = err.to_string();
if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
let timeout_ms = tool_exec_timeout_ms();
let timeout_output = format!(
"Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
);
let mut failed_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args_for_side_events.clone()),
json!(null),
);
failed_part.id = invoke_part_id.clone();
failed_part.state = Some("failed".to_string());
failed_part.error = Some(timeout_output.clone());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": failed_part}),
));
return Ok(Some(timeout_output));
}
if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
self.event_bus.publish(EngineEvent::new(
"mcp.auth.required",
json!({
"sessionID": session_id,
"messageID": message_id,
"tool": tool.clone(),
"server": auth.server,
"authorizationUrl": auth.authorization_url,
"message": auth.message,
"challengeId": auth.challenge_id
}),
));
let auth_output = format!(
"Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
tool, auth.message, auth.authorization_url
);
let mut result_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args_for_side_events.clone()),
json!(auth_output.clone()),
);
result_part.id = invoke_part_id.clone();
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": result_part}),
));
return Ok(Some(truncate_text(
&format!("Tool `{tool}` result:\n{auth_output}"),
16_000,
)));
}
let mut failed_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args_for_side_events.clone()),
json!(null),
);
failed_part.id = invoke_part_id.clone();
failed_part.state = Some("failed".to_string());
failed_part.error = Some(err_text.clone());
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": failed_part}),
));
return Err(err);
}
};
if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
let event_name = if auth.pending && auth.blocked {
"mcp.auth.pending"
} else {
"mcp.auth.required"
};
self.event_bus.publish(EngineEvent::new(
event_name,
json!({
"sessionID": session_id,
"messageID": message_id,
"tool": tool.clone(),
"server": auth.server,
"authorizationUrl": auth.authorization_url,
"message": auth.message,
"challengeId": auth.challenge_id,
"pending": auth.pending,
"blocked": auth.blocked,
"retryAfterMs": auth.retry_after_ms
}),
));
}
emit_tool_side_events(
self.storage.clone(),
&self.event_bus,
ToolSideEventContext {
session_id,
message_id,
tool: &tool,
args: &args_for_side_events,
metadata: &result.metadata,
workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
},
)
.await;
let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
if auth.pending && auth.blocked {
let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
format!(
"Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
tool, auth.message, auth.authorization_url, retry_after_secs
)
} else {
format!(
"Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
tool, auth.message, auth.authorization_url
)
}
} else {
self.plugins.transform_tool_output(result.output).await
};
let output = truncate_text(&output, 16_000);
let mut result_part = WireMessagePart::tool_result(
session_id,
message_id,
tool.clone(),
Some(args_for_side_events.clone()),
json!(output.clone()),
);
result_part.id = invoke_part_id;
self.event_bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": result_part}),
));
Ok(Some(truncate_text(
&format!("Tool `{tool}` result:\n{output}"),
16_000,
)))
}
async fn execute_tool_with_timeout(
&self,
tool: &str,
args: Value,
cancel: CancellationToken,
) -> anyhow::Result<tandem_types::ToolResult> {
let timeout_ms = tool_exec_timeout_ms() as u64;
match tokio::time::timeout(
Duration::from_millis(timeout_ms),
self.tools.execute_with_cancel(tool, args, cancel),
)
.await
{
Ok(result) => result,
Err(_) => anyhow::bail!("TOOL_EXEC_TIMEOUT_MS_EXCEEDED({timeout_ms})"),
}
}
async fn find_recent_matching_user_message_id(
&self,
session_id: &str,
text: &str,
) -> Option<String> {
let session = self.storage.get_session(session_id).await?;
let last = session.messages.last()?;
if !matches!(last.role, MessageRole::User) {
return None;
}
let age_ms = (Utc::now() - last.created_at).num_milliseconds().max(0) as u64;
if age_ms > 10_000 {
return None;
}
let last_text = last
.parts
.iter()
.filter_map(|part| match part {
MessagePart::Text { text } => Some(text.clone()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
if last_text == text {
return Some(last.id.clone());
}
None
}
async fn auto_rename_session_from_user_text(&self, session_id: &str, fallback_text: &str) {
let Some(mut session) = self.storage.get_session(session_id).await else {
return;
};
if !title_needs_repair(&session.title) {
return;
}
let first_user_text = session.messages.iter().find_map(|message| {
if !matches!(message.role, MessageRole::User) {
return None;
}
message.parts.iter().find_map(|part| match part {
MessagePart::Text { text } if !text.trim().is_empty() => Some(text.clone()),
_ => None,
})
});
let source = first_user_text.unwrap_or_else(|| fallback_text.to_string());
let Some(title) = derive_session_title_from_prompt(&source, 60) else {
return;
};
session.title = title;
session.time.updated = Utc::now();
let _ = self.storage.save_session(session).await;
}
async fn workspace_sandbox_violation(
&self,
session_id: &str,
tool: &str,
args: &Value,
) -> Option<String> {
if self.workspace_override_active(session_id).await {
return None;
}
let session = self.storage.get_session(session_id).await?;
let workspace = session
.workspace_root
.or_else(|| crate::normalize_workspace_path(&session.directory))?;
let workspace_path = PathBuf::from(&workspace);
let candidate_paths = extract_tool_candidate_paths(tool, args);
if candidate_paths.is_empty() {
if is_shell_tool_name(tool) {
if let Some(command) = extract_shell_command(args) {
if shell_command_targets_sensitive_path(&command) {
return Some(format!(
"Sandbox blocked `{tool}` command targeting sensitive paths."
));
}
}
}
return None;
}
if let Some(sensitive) = candidate_paths.iter().find(|path| {
let raw = Path::new(path);
let resolved = if raw.is_absolute() {
raw.to_path_buf()
} else {
workspace_path.join(raw)
};
is_sensitive_path_candidate(&resolved)
}) {
return Some(format!(
"Sandbox blocked `{tool}` path `{sensitive}` (sensitive path policy)."
));
}
let outside = candidate_paths.iter().find(|path| {
let raw = Path::new(path);
let resolved = if raw.is_absolute() {
raw.to_path_buf()
} else {
workspace_path.join(raw)
};
!crate::is_within_workspace_root(&resolved, &workspace_path)
})?;
Some(format!(
"Sandbox blocked `{tool}` path `{outside}` (workspace root: `{workspace}`)"
))
}
async fn resolve_tool_execution_context(
&self,
session_id: &str,
) -> Option<(String, String, Option<String>)> {
let session = self.storage.get_session(session_id).await?;
let workspace_root = session
.workspace_root
.or_else(|| crate::normalize_workspace_path(&session.directory))?;
let effective_cwd = if session.directory.trim().is_empty()
|| session.directory.trim() == "."
{
workspace_root.clone()
} else {
crate::normalize_workspace_path(&session.directory).unwrap_or(workspace_root.clone())
};
let project_id = session
.project_id
.clone()
.or_else(|| crate::workspace_project_id(&workspace_root));
Some((workspace_root, effective_cwd, project_id))
}
async fn workspace_override_active(&self, session_id: &str) -> bool {
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
let mut overrides = self.workspace_overrides.write().await;
overrides.retain(|_, expires_at| *expires_at > now);
overrides
.get(session_id)
.map(|expires_at| *expires_at > now)
.unwrap_or(false)
}
async fn generate_final_narrative_without_tools(
&self,
session_id: &str,
active_agent: &AgentDefinition,
provider_hint: Option<&str>,
model_id: Option<&str>,
cancel: CancellationToken,
tool_outputs: &[String],
) -> Option<String> {
if cancel.is_cancelled() {
return None;
}
let mut messages = load_chat_history(
self.storage.clone(),
session_id,
ChatHistoryProfile::Standard,
)
.await;
let mut system_parts = vec![tandem_runtime_system_prompt(
&self.host_runtime_context,
&[],
)];
if let Some(system) = active_agent.system_prompt.as_ref() {
system_parts.push(system.clone());
}
messages.insert(
0,
ChatMessage {
role: "system".to_string(),
content: system_parts.join("\n\n"),
attachments: Vec::new(),
},
);
messages.push(ChatMessage {
role: "user".to_string(),
content: build_post_tool_final_narrative_prompt(tool_outputs),
attachments: Vec::new(),
});
let stream = self
.providers
.stream_for_provider(
provider_hint,
model_id,
messages,
ToolMode::None,
None,
cancel.clone(),
)
.await
.ok()?;
tokio::pin!(stream);
let mut completion = String::new();
while let Some(chunk) = stream.next().await {
if cancel.is_cancelled() {
return None;
}
match chunk {
Ok(StreamChunk::TextDelta(delta)) => {
let delta = strip_model_control_markers(&delta);
if !delta.trim().is_empty() {
completion.push_str(&delta);
}
}
Ok(StreamChunk::Done { .. }) => break,
Ok(_) => {}
Err(_) => return None,
}
}
let completion = truncate_text(&strip_model_control_markers(&completion), 16_000);
if completion.trim().is_empty() {
None
} else {
Some(completion)
}
}
}
fn resolve_model_route(
request_model: Option<&ModelSpec>,
session_model: Option<&ModelSpec>,
) -> Option<(String, String)> {
fn normalize(spec: &ModelSpec) -> Option<(String, String)> {
let provider_id = spec.provider_id.trim();
let model_id = spec.model_id.trim();
if provider_id.is_empty() || model_id.is_empty() {
return None;
}
Some((provider_id.to_string(), model_id.to_string()))
}
request_model
.and_then(normalize)
.or_else(|| session_model.and_then(normalize))
}
fn strip_model_control_markers(input: &str) -> String {
let mut cleaned = input.to_string();
for marker in ["<|eom|>", "<|eot_id|>", "<|im_end|>", "<|end|>"] {
if cleaned.contains(marker) {
cleaned = cleaned.replace(marker, "");
}
}
cleaned
}
fn truncate_text(input: &str, max_len: usize) -> String {
if input.len() <= max_len {
return input.to_string();
}
let mut out = input[..max_len].to_string();
out.push_str("...<truncated>");
out
}
fn build_post_tool_final_narrative_prompt(tool_outputs: &[String]) -> String {
format!(
"Tool observations:\n{}\n\nUsing the tool observations and the existing conversation instructions, provide the required final answer now. Preserve any requested output contract, required JSON structure, required handoff fields, and required final status object from the original task. Do not call tools. Do not stop at a tool summary if the task requires a structured final response.",
summarize_tool_outputs(tool_outputs)
)
}
fn provider_error_code(error_text: &str) -> &'static str {
let lower = error_text.to_lowercase();
if lower.contains("invalid_function_parameters")
|| lower.contains("array schema missing items")
|| lower.contains("tool schema")
{
return "TOOL_SCHEMA_INVALID";
}
if lower.contains("rate limit") || lower.contains("too many requests") || lower.contains("429")
{
return "RATE_LIMIT_EXCEEDED";
}
if lower.contains("context length")
|| lower.contains("max tokens")
|| lower.contains("token limit")
{
return "CONTEXT_LENGTH_EXCEEDED";
}
if lower.contains("unauthorized")
|| lower.contains("authentication")
|| lower.contains("401")
|| lower.contains("403")
{
return "AUTHENTICATION_ERROR";
}
if lower.contains("timeout") || lower.contains("timed out") {
return "TIMEOUT";
}
if lower.contains("server error")
|| lower.contains("500")
|| lower.contains("502")
|| lower.contains("503")
|| lower.contains("504")
{
return "PROVIDER_SERVER_ERROR";
}
"PROVIDER_REQUEST_FAILED"
}
fn normalize_tool_name(name: &str) -> String {
let mut normalized = name.trim().to_ascii_lowercase().replace('-', "_");
for prefix in [
"default_api:",
"default_api.",
"functions.",
"function.",
"tools.",
"tool.",
"builtin:",
"builtin.",
] {
if let Some(rest) = normalized.strip_prefix(prefix) {
let trimmed = rest.trim();
if !trimmed.is_empty() {
normalized = trimmed.to_string();
break;
}
}
}
match normalized.as_str() {
"todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
"run_command" | "shell" | "powershell" | "cmd" => "bash".to_string(),
other => other.to_string(),
}
}
fn mcp_server_from_tool_name(tool_name: &str) -> Option<&str> {
let mut parts = tool_name.split('.');
let prefix = parts.next()?;
if prefix != "mcp" {
return None;
}
parts.next().filter(|server| !server.is_empty())
}
fn requires_web_research_prompt(input: &str) -> bool {
let lower = input.to_ascii_lowercase();
[
"research",
"top news",
"today's news",
"todays news",
"with links",
"latest headlines",
"current events",
]
.iter()
.any(|needle| lower.contains(needle))
}
fn requires_email_delivery_prompt(input: &str) -> bool {
let lower = input.to_ascii_lowercase();
(lower.contains("send") && lower.contains("email"))
|| (lower.contains("send") && lower.contains('@') && lower.contains("to"))
|| lower.contains("email to")
}
fn has_web_research_tools(schemas: &[ToolSchema]) -> bool {
schemas.iter().any(|schema| {
let name = normalize_tool_name(&schema.name);
name == "websearch" || name == "webfetch" || name == "webfetch_html"
})
}
fn has_email_action_tools(schemas: &[ToolSchema]) -> bool {
schemas
.iter()
.map(|schema| normalize_tool_name(&schema.name))
.any(|name| tool_name_looks_like_email_action(&name))
}
fn tool_name_looks_like_email_action(name: &str) -> bool {
let normalized = normalize_tool_name(name);
if normalized.starts_with("mcp.") {
return normalized.contains("gmail")
|| normalized.contains("mail")
|| normalized.contains("email");
}
normalized.contains("mail") || normalized.contains("email")
}
fn completion_claims_email_sent(text: &str) -> bool {
let lower = text.to_ascii_lowercase();
let has_email_marker = lower.contains("email status")
|| lower.contains("emailed")
|| lower.contains("email sent")
|| lower.contains("sent to");
has_email_marker
&& (lower.contains("sent")
|| lower.contains("delivered")
|| lower.contains("has been sent"))
}
fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
let Some(obj) = args.as_object() else {
return Vec::new();
};
let keys: &[&str] = match tool {
"read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
"glob" => &["pattern"],
"lsp" => &["filePath", "path"],
"bash" => &["cwd"],
"apply_patch" => &[],
_ => &["path", "cwd"],
};
keys.iter()
.filter_map(|key| obj.get(*key))
.filter_map(|value| value.as_str())
.filter(|s| !s.trim().is_empty())
.map(ToString::to_string)
.collect()
}
fn agent_can_use_tool(agent: &AgentDefinition, tool_name: &str) -> bool {
let target = normalize_tool_name(tool_name);
match agent.tools.as_ref() {
None => true,
Some(list) => {
let normalized = list
.iter()
.map(|t| normalize_tool_name(t))
.collect::<Vec<_>>();
any_policy_matches(&normalized, &target)
}
}
}
fn enforce_skill_scope(
tool_name: &str,
args: Value,
equipped_skills: Option<&[String]>,
) -> Result<Value, String> {
if normalize_tool_name(tool_name) != "skill" {
return Ok(args);
}
let Some(configured) = equipped_skills else {
return Ok(args);
};
let mut allowed = configured
.iter()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
if allowed
.iter()
.any(|s| s == "*" || s.eq_ignore_ascii_case("all"))
{
return Ok(args);
}
allowed.sort();
allowed.dedup();
if allowed.is_empty() {
return Err("No skills are equipped for this agent.".to_string());
}
let requested = args
.get("name")
.and_then(|v| v.as_str())
.map(|v| v.trim().to_string())
.unwrap_or_default();
if !requested.is_empty() && !allowed.iter().any(|s| s == &requested) {
return Err(format!(
"Skill '{}' is not equipped for this agent. Equipped skills: {}",
requested,
allowed.join(", ")
));
}
let mut out = if let Some(obj) = args.as_object() {
Value::Object(obj.clone())
} else {
json!({})
};
if let Some(obj) = out.as_object_mut() {
obj.insert("allowed_skills".to_string(), json!(allowed));
}
Ok(out)
}
fn is_read_only_tool(tool_name: &str) -> bool {
matches!(
normalize_tool_name(tool_name).as_str(),
"glob"
| "read"
| "grep"
| "search"
| "codesearch"
| "list"
| "ls"
| "lsp"
| "websearch"
| "webfetch"
| "webfetch_html"
)
}
fn is_workspace_write_tool(tool_name: &str) -> bool {
matches!(
normalize_tool_name(tool_name).as_str(),
"write" | "edit" | "apply_patch"
)
}
fn should_gate_write_until_prewrite_satisfied(
repair_on_unmet_requirements: bool,
productive_write_tool_calls_total: usize,
prewrite_satisfied: bool,
) -> bool {
repair_on_unmet_requirements && productive_write_tool_calls_total == 0 && !prewrite_satisfied
}
fn should_start_prewrite_repair_before_first_write(
repair_on_unmet_requirements: bool,
productive_write_tool_calls_total: usize,
prewrite_satisfied: bool,
) -> bool {
repair_on_unmet_requirements && productive_write_tool_calls_total == 0 && !prewrite_satisfied
}
fn is_batch_wrapper_tool_name(name: &str) -> bool {
matches!(
normalize_tool_name(name).as_str(),
"default_api" | "default" | "api" | "function" | "functions" | "tool" | "tools"
)
}
fn non_empty_string_at<'a>(obj: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
obj.get(key)
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|s| !s.is_empty())
}
fn nested_non_empty_string_at<'a>(
obj: &'a Map<String, Value>,
parent: &str,
key: &str,
) -> Option<&'a str> {
obj.get(parent)
.and_then(|v| v.as_object())
.and_then(|nested| nested.get(key))
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|s| !s.is_empty())
}
fn extract_batch_calls(args: &Value) -> Vec<(String, Value)> {
let calls = args
.get("tool_calls")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
calls
.into_iter()
.filter_map(|call| {
let obj = call.as_object()?;
let tool_raw = non_empty_string_at(obj, "tool")
.or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
.or_else(|| nested_non_empty_string_at(obj, "function", "tool"))
.or_else(|| nested_non_empty_string_at(obj, "function_call", "tool"))
.or_else(|| nested_non_empty_string_at(obj, "call", "tool"));
let name_raw = non_empty_string_at(obj, "name")
.or_else(|| nested_non_empty_string_at(obj, "function", "name"))
.or_else(|| nested_non_empty_string_at(obj, "function_call", "name"))
.or_else(|| nested_non_empty_string_at(obj, "call", "name"))
.or_else(|| nested_non_empty_string_at(obj, "tool", "name"));
let effective = match (tool_raw, name_raw) {
(Some(t), Some(n)) if is_batch_wrapper_tool_name(t) => n,
(Some(t), _) => t,
(None, Some(n)) => n,
(None, None) => return None,
};
let normalized = normalize_tool_name(effective);
let call_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
Some((normalized, call_args))
})
.collect()
}
fn is_read_only_batch_call(args: &Value) -> bool {
let calls = extract_batch_calls(args);
!calls.is_empty() && calls.iter().all(|(tool, _)| is_read_only_tool(tool))
}
fn batch_tool_signature(args: &Value) -> Option<String> {
let calls = extract_batch_calls(args);
if calls.is_empty() {
return None;
}
let parts = calls
.into_iter()
.map(|(tool, call_args)| tool_signature(&tool, &call_args))
.collect::<Vec<_>>();
Some(format!("batch:{}", parts.join("|")))
}
fn is_productive_tool_output(tool_name: &str, output: &str) -> bool {
let normalized_tool = normalize_tool_name(tool_name);
if normalized_tool == "batch" && is_non_productive_batch_output(output) {
return false;
}
if is_auth_required_tool_output(output) {
return false;
}
let Some(result_body) = extract_tool_result_body(output) else {
return false;
};
!is_non_productive_tool_result_body(result_body)
}
fn is_successful_web_research_output(tool_name: &str, output: &str) -> bool {
if !is_web_research_tool(tool_name) {
return false;
}
let Some(result_body) = extract_tool_result_body(output) else {
return false;
};
if is_non_productive_tool_result_body(result_body) {
return false;
}
let lower = result_body.to_ascii_lowercase();
!(lower.contains("search timed out")
|| lower.contains("timed out")
|| lower.contains("no results received")
|| lower.contains("no search results")
|| lower.contains("no relevant results"))
}
fn extract_tool_result_body(output: &str) -> Option<&str> {
let trimmed = output.trim();
let rest = trimmed.strip_prefix("Tool `")?;
let (_, result_body) = rest.split_once("` result:")?;
Some(result_body.trim())
}
fn is_non_productive_tool_result_body(output: &str) -> bool {
let trimmed = output.trim();
if trimmed.is_empty() {
return true;
}
let lower = trimmed.to_ascii_lowercase();
lower.starts_with("unknown tool:")
|| lower.contains("call skipped")
|| lower.contains("guard budget exceeded")
|| lower.contains("invalid_function_parameters")
|| is_terminal_tool_error_reason(trimmed)
}
fn is_terminal_tool_error_reason(output: &str) -> bool {
let first_line = output.lines().next().unwrap_or_default().trim();
if first_line.is_empty() {
return false;
}
let normalized = first_line.to_ascii_uppercase();
matches!(
normalized.as_str(),
"TOOL_ARGUMENTS_MISSING"
| "WEBSEARCH_QUERY_MISSING"
| "BASH_COMMAND_MISSING"
| "FILE_PATH_MISSING"
| "WRITE_CONTENT_MISSING"
| "WRITE_ARGS_EMPTY_FROM_PROVIDER"
| "WRITE_ARGS_UNPARSEABLE_FROM_PROVIDER"
| "WEBFETCH_URL_MISSING"
| "PACK_BUILDER_PLAN_ID_MISSING"
| "PACK_BUILDER_GOAL_MISSING"
| "PROVIDER_REQUEST_FAILED"
| "AUTHENTICATION_ERROR"
| "CONTEXT_LENGTH_EXCEEDED"
| "RATE_LIMIT_EXCEEDED"
) || normalized.ends_with("_MISSING")
|| normalized.ends_with("_ERROR")
}
fn is_non_productive_batch_output(output: &str) -> bool {
let Ok(value) = serde_json::from_str::<Value>(output.trim()) else {
return false;
};
let Some(items) = value.as_array() else {
return false;
};
if items.is_empty() {
return true;
}
items.iter().all(|item| {
let text = item
.get("output")
.and_then(|v| v.as_str())
.map(str::trim)
.unwrap_or_default()
.to_ascii_lowercase();
text.is_empty()
|| text.starts_with("unknown tool:")
|| text.contains("call skipped")
|| text.contains("guard budget exceeded")
})
}
fn is_auth_required_tool_output(output: &str) -> bool {
let lower = output.to_ascii_lowercase();
(lower.contains("authorization required")
|| lower.contains("requires authorization")
|| lower.contains("authorization pending"))
&& (lower.contains("authorize here") || lower.contains("http"))
}
#[derive(Debug, Clone)]
struct McpAuthRequiredMetadata {
challenge_id: String,
authorization_url: String,
message: String,
server: Option<String>,
pending: bool,
blocked: bool,
retry_after_ms: Option<u64>,
}
fn extract_mcp_auth_required_metadata(metadata: &Value) -> Option<McpAuthRequiredMetadata> {
let auth = metadata.get("mcpAuth")?;
if !auth
.get("required")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
return None;
}
let authorization_url = auth
.get("authorizationUrl")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())?
.to_string();
let message = auth
.get("message")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or("This tool requires authorization before it can run.")
.to_string();
let challenge_id = auth
.get("challengeId")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or("unknown")
.to_string();
let server = metadata
.get("server")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string);
let pending = auth
.get("pending")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let blocked = auth
.get("blocked")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let retry_after_ms = auth.get("retryAfterMs").and_then(|v| v.as_u64());
Some(McpAuthRequiredMetadata {
challenge_id,
authorization_url,
message,
server,
pending,
blocked,
retry_after_ms,
})
}
fn extract_mcp_auth_required_from_error_text(
tool_name: &str,
error_text: &str,
) -> Option<McpAuthRequiredMetadata> {
let lower = error_text.to_ascii_lowercase();
let auth_hint = lower.contains("authorization")
|| lower.contains("oauth")
|| lower.contains("invalid oauth token")
|| lower.contains("requires authorization");
if !auth_hint {
return None;
}
let authorization_url = find_first_url(error_text)?;
let challenge_id = stable_hash(&format!("{tool_name}:{authorization_url}"));
let server = tool_name
.strip_prefix("mcp.")
.and_then(|rest| rest.split('.').next())
.filter(|s| !s.is_empty())
.map(ToString::to_string);
Some(McpAuthRequiredMetadata {
challenge_id,
authorization_url,
message: "This integration requires authorization before this action can run.".to_string(),
server,
pending: false,
blocked: false,
retry_after_ms: None,
})
}
fn summarize_auth_pending_outputs(outputs: &[String]) -> Option<String> {
if outputs.is_empty()
|| !outputs
.iter()
.all(|output| is_auth_required_tool_output(output))
{
return None;
}
let mut auth_lines = outputs
.iter()
.filter_map(|output| {
let trimmed = output.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.collect::<Vec<_>>();
auth_lines.sort();
auth_lines.dedup();
if auth_lines.is_empty() {
return None;
}
Some(format!(
"Authorization is required before I can continue with this action.\n\n{}",
auth_lines.join("\n\n")
))
}
fn summarize_guard_budget_outputs(outputs: &[String]) -> Option<String> {
if outputs.is_empty()
|| !outputs
.iter()
.all(|output| is_guard_budget_tool_output(output))
{
return None;
}
let mut lines = outputs
.iter()
.filter_map(|output| {
let trimmed = output.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.collect::<Vec<_>>();
lines.sort();
lines.dedup();
if lines.is_empty() {
return None;
}
Some(format!(
"This run hit the per-run tool guard budget, so I paused tool execution to avoid runaway retries.\n\n{}\n\nSend a new message to start a fresh run.",
lines.join("\n")
))
}
fn summarize_duplicate_signature_outputs(outputs: &[String]) -> Option<String> {
if outputs.is_empty()
|| !outputs
.iter()
.all(|output| is_duplicate_signature_limit_output(output))
{
return None;
}
let mut lines = outputs
.iter()
.filter_map(|output| {
let trimmed = output.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.collect::<Vec<_>>();
lines.sort();
lines.dedup();
if lines.is_empty() {
return None;
}
Some(format!(
"This run paused because the same tool call kept repeating.\n\n{}\n\nRephrase the request or start a new message with a clearer command target.",
lines.join("\n")
))
}
const REQUIRED_TOOL_MODE_UNSATISFIED_REASON: &str = "TOOL_MODE_REQUIRED_NOT_SATISFIED";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RequiredToolFailureKind {
NoToolCallEmitted,
ToolCallParseFailed,
ToolCallInvalidArgs,
WriteArgsEmptyFromProvider,
WriteArgsUnparseableFromProvider,
ToolCallRejectedByPolicy,
ToolCallExecutedNonProductive,
WriteRequiredNotSatisfied,
PrewriteRequirementsExhausted,
}
impl RequiredToolFailureKind {
fn code(self) -> &'static str {
match self {
Self::NoToolCallEmitted => "NO_TOOL_CALL_EMITTED",
Self::ToolCallParseFailed => "TOOL_CALL_PARSE_FAILED",
Self::ToolCallInvalidArgs => "TOOL_CALL_INVALID_ARGS",
Self::WriteArgsEmptyFromProvider => "WRITE_ARGS_EMPTY_FROM_PROVIDER",
Self::WriteArgsUnparseableFromProvider => "WRITE_ARGS_UNPARSEABLE_FROM_PROVIDER",
Self::ToolCallRejectedByPolicy => "TOOL_CALL_REJECTED_BY_POLICY",
Self::ToolCallExecutedNonProductive => "TOOL_CALL_EXECUTED_NON_PRODUCTIVE",
Self::WriteRequiredNotSatisfied => "WRITE_REQUIRED_NOT_SATISFIED",
Self::PrewriteRequirementsExhausted => "PREWRITE_REQUIREMENTS_EXHAUSTED",
}
}
}
fn required_tool_mode_unsatisfied_completion(reason: RequiredToolFailureKind) -> String {
format!(
"{REQUIRED_TOOL_MODE_UNSATISFIED_REASON}: {}: tool_mode=required but the model ended without executing a productive tool call.",
reason.code()
)
}
#[allow(dead_code)]
fn prewrite_requirements_exhausted_completion(
unmet_codes: &[&'static str],
repair_attempt: usize,
repair_attempts_remaining: usize,
) -> String {
let unmet = if unmet_codes.is_empty() {
"none".to_string()
} else {
unmet_codes.join(", ")
};
format!(
"TOOL_MODE_REQUIRED_NOT_SATISFIED: PREWRITE_REQUIREMENTS_EXHAUSTED: unmet prewrite requirements: {unmet}\n\n{{\"status\":\"blocked\",\"reason\":\"prewrite requirements exhausted before final artifact validation\",\"failureCode\":\"PREWRITE_REQUIREMENTS_EXHAUSTED\",\"repairAttempt\":{},\"repairAttemptsRemaining\":{},\"repairExhausted\":true,\"unmetRequirements\":{:?}}}",
repair_attempt,
repair_attempts_remaining,
unmet_codes,
)
}
fn prewrite_repair_event_payload(
repair_attempt: usize,
repair_attempts_remaining: usize,
unmet_codes: &[&'static str],
repair_exhausted: bool,
) -> Value {
json!({
"repairAttempt": repair_attempt,
"repairAttemptsRemaining": repair_attempts_remaining,
"unmetRequirements": unmet_codes,
"repairActive": repair_attempt > 0 && !repair_exhausted,
"repairExhausted": repair_exhausted,
})
}
fn build_required_tool_retry_context(
offered_tool_preview: &str,
previous_reason: RequiredToolFailureKind,
) -> String {
let offered = offered_tool_preview.trim();
let available_tools = if offered.is_empty() {
"Use one of the tools offered in this turn before you produce final text.".to_string()
} else {
format!("Use one of these offered tools before you produce final text: {offered}.")
};
let execution_instruction = if previous_reason
== RequiredToolFailureKind::WriteRequiredNotSatisfied
{
"Inspection is complete; now create or modify workspace files with write, edit, or apply_patch.".to_string()
} else if is_write_invalid_args_failure_kind(previous_reason) {
"Previous tool call arguments were invalid. If you use write, include both `path` and the full `content`. If inspection is already complete, use write, edit, or apply_patch now.".to_string()
} else {
available_tools
};
format!(
"Tool access is mandatory for this request. Previous attempt failed with {}. Execute at least one valid offered tool call before any final text. {}",
previous_reason.code(),
execution_instruction
)
}
fn is_write_invalid_args_failure_kind(reason: RequiredToolFailureKind) -> bool {
matches!(
reason,
RequiredToolFailureKind::ToolCallInvalidArgs
| RequiredToolFailureKind::WriteArgsEmptyFromProvider
| RequiredToolFailureKind::WriteArgsUnparseableFromProvider
)
}
fn build_write_required_retry_context(
offered_tool_preview: &str,
previous_reason: RequiredToolFailureKind,
latest_user_text: &str,
prewrite_requirements: &PrewriteRequirements,
workspace_inspection_satisfied: bool,
concrete_read_satisfied: bool,
web_research_satisfied: bool,
successful_web_research_satisfied: bool,
) -> String {
let mut prompt = build_required_tool_retry_context(offered_tool_preview, previous_reason);
let unmet = describe_unmet_prewrite_requirements(
prewrite_requirements,
workspace_inspection_satisfied,
concrete_read_satisfied,
web_research_satisfied,
successful_web_research_satisfied,
);
if !unmet.is_empty() {
prompt.push(' ');
prompt.push_str(&format!(
"Before the final write, you still need to {}.",
unmet.join(" and ")
));
}
if let Some(path) = infer_required_output_target_path_from_text(latest_user_text) {
prompt.push(' ');
prompt.push_str(&format!(
"The required output target for this task is `{path}`. Write or update that file now."
));
prompt.push(' ');
prompt.push_str(
"Your next response must be a `write` tool call for that file, not a prose-only reply.",
);
prompt.push(' ');
prompt.push_str(
"You have already gathered research in this session. Now write the output file using the information from your previous tool calls. You may re-read a specific file if needed for accuracy.",
);
}
prompt
}
fn build_prewrite_repair_retry_context(
offered_tool_preview: &str,
previous_reason: RequiredToolFailureKind,
latest_user_text: &str,
prewrite_requirements: &PrewriteRequirements,
workspace_inspection_satisfied: bool,
concrete_read_satisfied: bool,
web_research_satisfied: bool,
successful_web_research_satisfied: bool,
) -> String {
let mut prompt = build_required_tool_retry_context(offered_tool_preview, previous_reason);
let unmet = describe_unmet_prewrite_requirements(
prewrite_requirements,
workspace_inspection_satisfied,
concrete_read_satisfied,
web_research_satisfied,
successful_web_research_satisfied,
);
if !unmet.is_empty() {
prompt.push(' ');
prompt.push_str(&format!(
"Before the final write, you still need to {}.",
unmet.join(" and ")
));
}
let mut repair_notes = Vec::new();
if prewrite_requirements.concrete_read_required && !concrete_read_satisfied {
repair_notes.push(
"This task requires concrete `read` calls on relevant workspace files before you can write the output. Call `read` now on the files you discovered.",
);
}
if prewrite_requirements.successful_web_research_required && !successful_web_research_satisfied
{
repair_notes.push(
"Timed out or empty websearch attempts do not satisfy external-research requirements; call `websearch` with a concrete query now.",
);
}
if !matches!(
prewrite_requirements.coverage_mode,
PrewriteCoverageMode::None
) {
repair_notes.push(
"Every path listed under `Files reviewed` must have been actually read in this run, and any relevant discovered file you did not read must appear under `Files not reviewed` with a reason.",
);
}
if !repair_notes.is_empty() {
prompt.push(' ');
prompt.push_str("Do not skip this step. ");
prompt.push_str(&repair_notes.join(" "));
}
if let Some(path) = infer_required_output_target_path_from_text(latest_user_text) {
prompt.push(' ');
prompt.push_str(&format!(
"Use `read` and `websearch` now to gather evidence, then write the artifact to `{path}`."
));
prompt.push(' ');
prompt.push_str(&format!(
"Do not declare the output blocked while `read` and `websearch` remain available. Call them now."
));
}
prompt
}
fn build_prewrite_waived_write_context(
latest_user_text: &str,
unmet_codes: &[&'static str],
) -> String {
let mut prompt = String::from(
"Research prerequisites could not be fully satisfied after multiple repair attempts. \
You must still write the output file using whatever information you have gathered so far. \
Do not write a blocked or placeholder file. Write the best possible output with the evidence available.",
);
if !unmet_codes.is_empty() {
prompt.push_str(&format!(
" (Unmet prerequisites waived: {}.)",
unmet_codes.join(", ")
));
}
if let Some(path) = infer_required_output_target_path_from_text(latest_user_text) {
prompt.push_str(&format!(
" The required output file is `{path}`. Call the `write` tool now to create it."
));
}
prompt
}
fn build_empty_completion_retry_context(
offered_tool_preview: &str,
latest_user_text: &str,
prewrite_requirements: &PrewriteRequirements,
workspace_inspection_satisfied: bool,
concrete_read_satisfied: bool,
web_research_satisfied: bool,
successful_web_research_satisfied: bool,
) -> String {
let mut prompt = String::from(
"You already used tools in this session, but returned no final output. Do not stop now.",
);
let unmet = describe_unmet_prewrite_requirements(
prewrite_requirements,
workspace_inspection_satisfied,
concrete_read_satisfied,
web_research_satisfied,
successful_web_research_satisfied,
);
if !unmet.is_empty() {
prompt.push(' ');
prompt.push_str(&format!(
"You still need to {} before the final write.",
unmet.join(" and ")
));
prompt.push(' ');
prompt.push_str(&build_required_tool_retry_context(
offered_tool_preview,
RequiredToolFailureKind::WriteRequiredNotSatisfied,
));
}
if let Some(path) = infer_required_output_target_path_from_text(latest_user_text) {
prompt.push(' ');
prompt.push_str(&format!("The required output target is `{path}`."));
if unmet.is_empty() {
prompt.push(' ');
prompt.push_str(
"Your next response must be a `write` tool call for that file, not a prose-only reply.",
);
} else {
prompt.push(' ');
prompt.push_str(
"After completing the missing requirement, immediately write that file instead of ending with prose.",
);
}
}
prompt
}
fn synthesize_artifact_write_completion_from_tool_state(
latest_user_text: &str,
prewrite_satisfied: bool,
prewrite_gate_waived: bool,
) -> String {
let target = infer_required_output_target_path_from_text(latest_user_text)
.unwrap_or_else(|| "the declared output artifact".to_string());
let mut completion = format!("Completed the requested tool actions and wrote `{target}`.");
if prewrite_gate_waived && !prewrite_satisfied {
completion.push_str(
"\n\nRuntime validation will decide whether the artifact can be accepted because some evidence requirements were waived in-run."
);
} else {
completion
.push_str("\n\nRuntime validation will verify the artifact and finalize node status.");
}
completion.push_str("\n\n{\"status\":\"completed\"}");
completion
}
fn should_generate_post_tool_final_narrative(
requested_tool_mode: ToolMode,
productive_tool_calls_total: usize,
) -> bool {
!matches!(requested_tool_mode, ToolMode::Required) || productive_tool_calls_total > 0
}
fn prewrite_requirements_satisfied(
requirements: &PrewriteRequirements,
workspace_inspection_satisfied: bool,
concrete_read_satisfied: bool,
web_research_satisfied: bool,
successful_web_research_satisfied: bool,
) -> bool {
(!requirements.workspace_inspection_required || workspace_inspection_satisfied)
&& (!requirements.web_research_required || web_research_satisfied)
&& (!requirements.concrete_read_required || concrete_read_satisfied)
&& (!requirements.successful_web_research_required || successful_web_research_satisfied)
}
fn describe_unmet_prewrite_requirements(
requirements: &PrewriteRequirements,
workspace_inspection_satisfied: bool,
concrete_read_satisfied: bool,
web_research_satisfied: bool,
successful_web_research_satisfied: bool,
) -> Vec<&'static str> {
let mut unmet = Vec::new();
if requirements.workspace_inspection_required && !workspace_inspection_satisfied {
unmet.push("inspect the workspace with `glob`/`read` before writing");
}
if requirements.concrete_read_required && !concrete_read_satisfied {
unmet.push("use `read` on the concrete files you cite before finalizing");
}
if requirements.web_research_required && !web_research_satisfied {
unmet.push("use `websearch` before finalizing the file");
}
if requirements.successful_web_research_required && !successful_web_research_satisfied {
unmet.push("obtain at least one successful web research result instead of only timed-out or empty searches");
}
unmet
}
fn is_workspace_inspection_tool(tool_name: &str) -> bool {
matches!(
normalize_tool_name(tool_name).as_str(),
"glob" | "read" | "grep" | "search" | "codesearch" | "ls" | "list"
)
}
fn is_web_research_tool(tool_name: &str) -> bool {
matches!(
normalize_tool_name(tool_name).as_str(),
"websearch" | "webfetch" | "webfetch_html"
)
}
fn tool_matches_unmet_prewrite_repair_requirement(tool_name: &str, unmet_codes: &[&str]) -> bool {
if is_workspace_write_tool(tool_name) {
return false;
}
let normalized = normalize_tool_name(tool_name);
let needs_workspace_inspection = unmet_codes.contains(&"workspace_inspection_required");
let needs_concrete_read =
unmet_codes.contains(&"concrete_read_required") || unmet_codes.contains(&"coverage_mode");
let needs_web_research = unmet_codes.iter().any(|code| {
matches!(
*code,
"web_research_required" | "successful_web_research_required"
)
});
(needs_concrete_read && (normalized == "read" || normalized == "glob"))
|| (needs_workspace_inspection && is_workspace_inspection_tool(&normalized))
|| (needs_web_research && is_web_research_tool(&normalized))
}
fn invalid_tool_args_retry_max_attempts() -> usize {
2
}
pub fn prewrite_repair_retry_max_attempts() -> usize {
5
}
fn collect_unmet_prewrite_requirement_codes(
requirements: &PrewriteRequirements,
workspace_inspection_satisfied: bool,
concrete_read_satisfied: bool,
web_research_satisfied: bool,
successful_web_research_satisfied: bool,
) -> Vec<&'static str> {
let mut unmet = Vec::new();
if requirements.workspace_inspection_required && !workspace_inspection_satisfied {
unmet.push("workspace_inspection_required");
}
if requirements.concrete_read_required && !concrete_read_satisfied {
unmet.push("concrete_read_required");
}
if requirements.web_research_required && !web_research_satisfied {
unmet.push("web_research_required");
}
if requirements.successful_web_research_required && !successful_web_research_satisfied {
unmet.push("successful_web_research_required");
}
if !matches!(requirements.coverage_mode, PrewriteCoverageMode::None)
&& (!workspace_inspection_satisfied || !concrete_read_satisfied)
{
unmet.push("coverage_mode");
}
unmet
}
fn build_invalid_tool_args_retry_context_from_outputs(
outputs: &[String],
previous_attempts: usize,
) -> Option<String> {
if outputs
.iter()
.any(|output| output.contains("BASH_COMMAND_MISSING"))
{
let emphasis = if previous_attempts > 0 {
"You already tried `bash` without a valid command. Do not repeat an empty bash call."
} else {
"If you use `bash`, include a full non-empty command string."
};
return Some(format!(
"Previous bash tool call was invalid because it did not include the required `command` field. {emphasis} Good examples: `pwd`, `ls -la`, `find docs -maxdepth 2 -type f`, or `rg -n \"workflow\" docs src`. Prefer `ls`, `glob`, `search`, and `read` for repository inspection when they are sufficient."
));
}
if outputs
.iter()
.any(|output| output.contains("WEBSEARCH_QUERY_MISSING"))
{
return Some(
"Previous websearch tool call was invalid because it did not include a query. If you use `websearch`, include a specific non-empty search query.".to_string(),
);
}
if outputs
.iter()
.any(|output| output.contains("WEBFETCH_URL_MISSING"))
{
return Some(
"Previous webfetch tool call was invalid because it did not include a URL. If you use `webfetch`, include a full absolute `url`.".to_string(),
);
}
if outputs
.iter()
.any(|output| output.contains("FILE_PATH_MISSING"))
{
return Some(
"Previous file tool call was invalid because it did not include a `path`. If you use `read`, `write`, or `edit`, include the exact workspace-relative file path.".to_string(),
);
}
if outputs
.iter()
.any(|output| output.contains("WRITE_CONTENT_MISSING"))
{
return Some(
"Previous write tool call was invalid because it did not include `content`. If you use `write`, include both `path` and the full `content`.".to_string(),
);
}
None
}
fn looks_like_unparsed_tool_payload(output: &str) -> bool {
let trimmed = output.trim();
if trimmed.is_empty() {
return false;
}
let lower = trimmed.to_ascii_lowercase();
lower.contains("\"tool_calls\"")
|| lower.contains("\"function_call\"")
|| lower.contains("\"function\":{")
|| lower.contains("\"type\":\"tool_call\"")
|| lower.contains("\"type\":\"function_call\"")
|| lower.contains("\"type\":\"tool_use\"")
}
fn is_policy_rejection_output(output: &str) -> bool {
let lower = output.trim().to_ascii_lowercase();
lower.contains("call skipped")
|| lower.contains("authorization required")
|| lower.contains("not allowed")
|| lower.contains("permission denied")
}
fn classify_required_tool_failure(
outputs: &[String],
saw_tool_call_candidate: bool,
accepted_tool_calls: usize,
parse_failed: bool,
rejected_by_policy: bool,
) -> RequiredToolFailureKind {
if parse_failed {
return RequiredToolFailureKind::ToolCallParseFailed;
}
if !saw_tool_call_candidate {
return RequiredToolFailureKind::NoToolCallEmitted;
}
if accepted_tool_calls == 0 || rejected_by_policy {
return RequiredToolFailureKind::ToolCallRejectedByPolicy;
}
if outputs
.iter()
.any(|output| output.contains("WRITE_ARGS_EMPTY_FROM_PROVIDER"))
{
return RequiredToolFailureKind::WriteArgsEmptyFromProvider;
}
if outputs
.iter()
.any(|output| output.contains("WRITE_ARGS_UNPARSEABLE_FROM_PROVIDER"))
{
return RequiredToolFailureKind::WriteArgsUnparseableFromProvider;
}
if outputs
.iter()
.any(|output| is_terminal_tool_error_reason(output))
{
return RequiredToolFailureKind::ToolCallInvalidArgs;
}
if outputs
.iter()
.any(|output| is_policy_rejection_output(output))
{
return RequiredToolFailureKind::ToolCallRejectedByPolicy;
}
RequiredToolFailureKind::ToolCallExecutedNonProductive
}
fn find_first_url(text: &str) -> Option<String> {
text.split_whitespace().find_map(|token| {
if token.starts_with("https://") || token.starts_with("http://") {
let cleaned = token.trim_end_matches(&[')', ']', '}', '"', '\'', ',', '.'][..]);
if cleaned.len() > "https://".len() {
return Some(cleaned.to_string());
}
}
None
})
}
fn max_tool_iterations() -> usize {
let default_iterations = 25usize;
std::env::var("TANDEM_MAX_TOOL_ITERATIONS")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(default_iterations)
}
fn strict_write_retry_max_attempts() -> usize {
std::env::var("TANDEM_STRICT_WRITE_RETRY_MAX_ATTEMPTS")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(3)
}
fn provider_stream_connect_timeout_ms() -> usize {
std::env::var("TANDEM_PROVIDER_STREAM_CONNECT_TIMEOUT_MS")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(90_000)
}
fn provider_stream_idle_timeout_ms() -> usize {
std::env::var("TANDEM_PROVIDER_STREAM_IDLE_TIMEOUT_MS")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(90_000)
}
fn prompt_context_hook_timeout_ms() -> usize {
std::env::var("TANDEM_PROMPT_CONTEXT_HOOK_TIMEOUT_MS")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(5_000)
}
fn permission_wait_timeout_ms() -> usize {
std::env::var("TANDEM_PERMISSION_WAIT_TIMEOUT_MS")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(15_000)
}
fn tool_exec_timeout_ms() -> usize {
std::env::var("TANDEM_TOOL_EXEC_TIMEOUT_MS")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(45_000)
}
fn is_guard_budget_tool_output(output: &str) -> bool {
output
.to_ascii_lowercase()
.contains("per-run guard budget exceeded")
}
fn is_duplicate_signature_limit_output(output: &str) -> bool {
output
.to_ascii_lowercase()
.contains("duplicate call signature retry limit reached")
}
fn is_sensitive_path_candidate(path: &Path) -> bool {
let lowered = path.to_string_lossy().to_ascii_lowercase();
if lowered.contains("/.ssh/")
|| lowered.ends_with("/.ssh")
|| lowered.contains("/.gnupg/")
|| lowered.ends_with("/.gnupg")
{
return true;
}
if lowered.contains("/.aws/credentials")
|| lowered.ends_with("/.npmrc")
|| lowered.ends_with("/.netrc")
|| lowered.ends_with("/.pypirc")
{
return true;
}
if lowered.contains("id_rsa")
|| lowered.contains("id_ed25519")
|| lowered.contains("id_ecdsa")
|| lowered.contains(".pem")
|| lowered.contains(".p12")
|| lowered.contains(".pfx")
|| lowered.contains(".key")
{
return true;
}
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
let n = name.to_ascii_lowercase();
if n == ".env" || n.starts_with(".env.") {
return true;
}
}
false
}
fn shell_command_targets_sensitive_path(command: &str) -> bool {
let lower = command.to_ascii_lowercase();
let patterns = [
".env",
".ssh",
".gnupg",
".aws/credentials",
"id_rsa",
"id_ed25519",
".pem",
".p12",
".pfx",
".key",
];
patterns.iter().any(|p| lower.contains(p))
}
#[derive(Debug, Clone)]
struct NormalizedToolArgs {
args: Value,
args_source: String,
args_integrity: String,
raw_args_state: RawToolArgsState,
query: Option<String>,
missing_terminal: bool,
missing_terminal_reason: Option<String>,
}
#[derive(Debug, Clone)]
struct ParsedToolCall {
tool: String,
args: Value,
call_id: Option<String>,
}
#[cfg(test)]
fn normalize_tool_args(
tool_name: &str,
raw_args: Value,
latest_user_text: &str,
latest_assistant_context: &str,
) -> NormalizedToolArgs {
normalize_tool_args_with_mode(
tool_name,
raw_args,
latest_user_text,
latest_assistant_context,
WritePathRecoveryMode::Heuristic,
)
}
fn normalize_tool_args_with_mode(
tool_name: &str,
raw_args: Value,
latest_user_text: &str,
latest_assistant_context: &str,
write_path_recovery_mode: WritePathRecoveryMode,
) -> NormalizedToolArgs {
let normalized_tool = normalize_tool_name(tool_name);
let original_args = raw_args.clone();
let mut args = raw_args;
let mut args_source = if args.is_string() {
"provider_string".to_string()
} else {
"provider_json".to_string()
};
let mut args_integrity = "ok".to_string();
let raw_args_state = classify_raw_tool_args_state(&args);
let mut query = None;
let mut missing_terminal = false;
let mut missing_terminal_reason = None;
if normalized_tool == "websearch" {
if let Some(found) = extract_websearch_query(&args) {
query = Some(found);
args = set_websearch_query_and_source(args, query.clone(), "tool_args");
} else if let Some(inferred) = infer_websearch_query_from_text(latest_user_text) {
args_source = "inferred_from_user".to_string();
args_integrity = "recovered".to_string();
query = Some(inferred);
args = set_websearch_query_and_source(args, query.clone(), "inferred_from_user");
} else if let Some(recovered) = infer_websearch_query_from_text(latest_assistant_context) {
args_source = "recovered_from_context".to_string();
args_integrity = "recovered".to_string();
query = Some(recovered);
args = set_websearch_query_and_source(args, query.clone(), "recovered_from_context");
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("WEBSEARCH_QUERY_MISSING".to_string());
}
} else if is_shell_tool_name(&normalized_tool) {
if let Some(command) = extract_shell_command(&args) {
args = set_shell_command(args, command);
} else if let Some(inferred) = infer_shell_command_from_text(latest_assistant_context) {
args_source = "inferred_from_context".to_string();
args_integrity = "recovered".to_string();
args = set_shell_command(args, inferred);
} else if let Some(inferred) = infer_shell_command_from_text(latest_user_text) {
args_source = "inferred_from_user".to_string();
args_integrity = "recovered".to_string();
args = set_shell_command(args, inferred);
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("BASH_COMMAND_MISSING".to_string());
}
} else if matches!(normalized_tool.as_str(), "read" | "write" | "edit") {
if let Some(path) = extract_file_path_arg(&args) {
args = set_file_path_arg(args, path);
} else if normalized_tool == "write" || normalized_tool == "edit" {
if let Some(inferred) = infer_required_output_target_path_from_text(latest_user_text)
.or_else(|| infer_required_output_target_path_from_text(latest_assistant_context))
{
args_source = "recovered_from_context".to_string();
args_integrity = "recovered".to_string();
args = set_file_path_arg(args, inferred);
} else if write_path_recovery_mode == WritePathRecoveryMode::Heuristic {
if let Some(inferred) = infer_write_file_path_from_text(latest_user_text) {
args_source = "inferred_from_user".to_string();
args_integrity = "recovered".to_string();
args = set_file_path_arg(args, inferred);
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("FILE_PATH_MISSING".to_string());
}
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("FILE_PATH_MISSING".to_string());
}
} else if let Some(inferred) = infer_file_path_from_text(latest_user_text) {
args_source = "inferred_from_user".to_string();
args_integrity = "recovered".to_string();
args = set_file_path_arg(args, inferred);
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("FILE_PATH_MISSING".to_string());
}
if !missing_terminal && normalized_tool == "write" {
if let Some(content) = extract_write_content_arg(&args) {
args = set_write_content_arg(args, content);
} else if let Some(recovered) =
infer_write_content_from_assistant_context(latest_assistant_context)
{
args_source = "recovered_from_context".to_string();
args_integrity = "recovered".to_string();
args = set_write_content_arg(args, recovered);
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("WRITE_CONTENT_MISSING".to_string());
}
}
} else if matches!(normalized_tool.as_str(), "webfetch" | "webfetch_html") {
if let Some(url) = extract_webfetch_url_arg(&args) {
args = set_webfetch_url_arg(args, url);
} else if let Some(inferred) = infer_url_from_text(latest_assistant_context) {
args_source = "inferred_from_context".to_string();
args_integrity = "recovered".to_string();
args = set_webfetch_url_arg(args, inferred);
} else if let Some(inferred) = infer_url_from_text(latest_user_text) {
args_source = "inferred_from_user".to_string();
args_integrity = "recovered".to_string();
args = set_webfetch_url_arg(args, inferred);
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("WEBFETCH_URL_MISSING".to_string());
}
} else if normalized_tool == "pack_builder" {
let mode = extract_pack_builder_mode_arg(&args);
let plan_id = extract_pack_builder_plan_id_arg(&args);
if mode.as_deref() == Some("apply") && plan_id.is_none() {
if let Some(inferred_plan) =
infer_pack_builder_apply_plan_id(latest_user_text, latest_assistant_context)
{
args_source = "recovered_from_context".to_string();
args_integrity = "recovered".to_string();
args = set_pack_builder_apply_args(args, inferred_plan);
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("PACK_BUILDER_PLAN_ID_MISSING".to_string());
}
} else if mode.as_deref() == Some("apply") {
args = ensure_pack_builder_default_mode(args);
} else if let Some(inferred_plan) =
infer_pack_builder_apply_plan_id(latest_user_text, latest_assistant_context)
{
args_source = "recovered_from_context".to_string();
args_integrity = "recovered".to_string();
args = set_pack_builder_apply_args(args, inferred_plan);
} else if let Some(goal) = extract_pack_builder_goal_arg(&args) {
args = set_pack_builder_goal_arg(args, goal);
} else if let Some(inferred) = infer_pack_builder_goal_from_text(latest_user_text) {
args_source = "inferred_from_user".to_string();
args_integrity = "recovered".to_string();
args = set_pack_builder_goal_arg(args, inferred);
} else if let Some(recovered) = infer_pack_builder_goal_from_text(latest_assistant_context)
{
args_source = "recovered_from_context".to_string();
args_integrity = "recovered".to_string();
args = set_pack_builder_goal_arg(args, recovered);
} else {
args_source = "missing".to_string();
args_integrity = "empty".to_string();
missing_terminal = true;
missing_terminal_reason = Some("PACK_BUILDER_GOAL_MISSING".to_string());
}
args = ensure_pack_builder_default_mode(args);
} else if is_email_delivery_tool_name(&normalized_tool) {
let sanitized = sanitize_email_attachment_args(args);
if sanitized != original_args {
args_source = "sanitized_attachment".to_string();
args_integrity = "recovered".to_string();
}
args = sanitized;
}
NormalizedToolArgs {
args,
args_source,
args_integrity,
raw_args_state,
query,
missing_terminal,
missing_terminal_reason,
}
}
fn classify_raw_tool_args_state(raw_args: &Value) -> RawToolArgsState {
match raw_args {
Value::Null => RawToolArgsState::Empty,
Value::Object(obj) => {
if obj.is_empty() {
RawToolArgsState::Empty
} else {
RawToolArgsState::Present
}
}
Value::Array(items) => {
if items.is_empty() {
RawToolArgsState::Empty
} else {
RawToolArgsState::Present
}
}
Value::String(raw) => {
let trimmed = raw.trim();
if trimmed.is_empty() {
return RawToolArgsState::Empty;
}
if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
return classify_raw_tool_args_state(&parsed);
}
if parse_function_style_args(trimmed).is_empty() {
return RawToolArgsState::Unparseable;
}
RawToolArgsState::Present
}
_ => RawToolArgsState::Present,
}
}
fn args_missing_or_empty(args: &Value) -> bool {
match args {
Value::Null => true,
Value::Object(obj) => obj.is_empty(),
Value::Array(items) => items.is_empty(),
Value::String(raw) => raw.trim().is_empty(),
_ => false,
}
}
fn persisted_failed_tool_args(raw_args: &Value, normalized_args: &Value) -> Value {
if args_missing_or_empty(raw_args) && !args_missing_or_empty(normalized_args) {
normalized_args.clone()
} else {
raw_args.clone()
}
}
fn provider_specific_write_reason(
tool: &str,
missing_reason: &str,
raw_args_state: RawToolArgsState,
) -> Option<String> {
if tool != "write"
|| !matches!(
missing_reason,
"FILE_PATH_MISSING" | "WRITE_CONTENT_MISSING"
)
{
return None;
}
match raw_args_state {
RawToolArgsState::Empty => Some("WRITE_ARGS_EMPTY_FROM_PROVIDER".to_string()),
RawToolArgsState::Unparseable => Some("WRITE_ARGS_UNPARSEABLE_FROM_PROVIDER".to_string()),
RawToolArgsState::Present => None,
}
}
fn is_shell_tool_name(tool_name: &str) -> bool {
matches!(
tool_name.trim().to_ascii_lowercase().as_str(),
"bash" | "shell" | "powershell" | "cmd"
)
}
fn is_email_delivery_tool_name(tool_name: &str) -> bool {
matches!(
normalize_tool_name(tool_name).as_str(),
"mcp.composio_1.gmail_send_email"
| "mcp.composio_1.gmail_send_draft"
| "mcp.composio.gmail_send_email"
| "mcp.composio.gmail_send_draft"
) || tool_name.ends_with(".gmail_send_email")
|| tool_name.ends_with(".gmail_send_draft")
}
fn sanitize_email_attachment_args(args: Value) -> Value {
let mut obj = match args {
Value::Object(map) => map,
other => return other,
};
if let Some(Value::Object(attachment)) = obj.get("attachment") {
let s3key = attachment
.get("s3key")
.and_then(Value::as_str)
.map(str::trim)
.unwrap_or("");
if s3key.is_empty() {
obj.remove("attachment");
}
} else if obj.get("attachment").is_some() && obj.get("attachment").is_some_and(Value::is_null) {
obj.remove("attachment");
}
if let Some(Value::Array(attachments)) = obj.get_mut("attachments") {
attachments.retain(|entry| {
entry
.get("s3key")
.and_then(Value::as_str)
.map(str::trim)
.map(|value| !value.is_empty())
.unwrap_or(false)
});
if attachments.is_empty() {
obj.remove("attachments");
}
}
Value::Object(obj)
}
fn set_file_path_arg(args: Value, path: String) -> Value {
let mut obj = args.as_object().cloned().unwrap_or_default();
obj.insert("path".to_string(), Value::String(path));
Value::Object(obj)
}
fn normalize_workspace_alias_path(path: &str, workspace_root: &str) -> Option<String> {
let trimmed = path.trim();
if trimmed.is_empty() {
return None;
}
let normalized = trimmed.replace('\\', "/");
if normalized == "/workspace" {
return Some(workspace_root.to_string());
}
if let Some(rest) = normalized.strip_prefix("/workspace/") {
if rest.trim().is_empty() {
return Some(workspace_root.to_string());
}
return Some(rest.trim().to_string());
}
None
}
fn rewrite_workspace_alias_tool_args(tool: &str, args: Value, workspace_root: &str) -> Value {
let normalized_tool = normalize_tool_name(tool);
if !matches!(normalized_tool.as_str(), "read" | "write" | "edit") {
return args;
}
let Some(path) = extract_file_path_arg(&args) else {
return args;
};
let Some(rewritten) = normalize_workspace_alias_path(&path, workspace_root) else {
return args;
};
set_file_path_arg(args, rewritten)
}
fn set_write_content_arg(args: Value, content: String) -> Value {
let mut obj = args.as_object().cloned().unwrap_or_default();
obj.insert("content".to_string(), Value::String(content));
Value::Object(obj)
}
fn extract_file_path_arg(args: &Value) -> Option<String> {
extract_file_path_arg_internal(args, 0)
}
fn extract_write_content_arg(args: &Value) -> Option<String> {
extract_write_content_arg_internal(args, 0)
}
fn extract_file_path_arg_internal(args: &Value, depth: usize) -> Option<String> {
if depth > 5 {
return None;
}
match args {
Value::String(raw) => {
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
if !(trimmed.starts_with('{') || trimmed.starts_with('[') || trimmed.starts_with('"')) {
return sanitize_path_candidate(trimmed);
}
if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
return extract_file_path_arg_internal(&parsed, depth + 1);
}
sanitize_path_candidate(trimmed)
}
Value::Array(items) => items
.iter()
.find_map(|item| extract_file_path_arg_internal(item, depth + 1)),
Value::Object(obj) => {
for key in FILE_PATH_KEYS {
if let Some(raw) = obj.get(key).and_then(|v| v.as_str()) {
if let Some(path) = sanitize_path_candidate(raw) {
return Some(path);
}
}
}
for container in NESTED_ARGS_KEYS {
if let Some(nested) = obj.get(container) {
if let Some(path) = extract_file_path_arg_internal(nested, depth + 1) {
return Some(path);
}
}
}
None
}
_ => None,
}
}
fn extract_write_content_arg_internal(args: &Value, depth: usize) -> Option<String> {
if depth > 5 {
return None;
}
match args {
Value::String(raw) => {
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
return extract_write_content_arg_internal(&parsed, depth + 1);
}
if sanitize_path_candidate(trimmed).is_some()
&& !trimmed.contains('\n')
&& trimmed.split_whitespace().count() <= 3
{
return None;
}
Some(trimmed.to_string())
}
Value::Array(items) => items
.iter()
.find_map(|item| extract_write_content_arg_internal(item, depth + 1)),
Value::Object(obj) => {
for key in WRITE_CONTENT_KEYS {
if let Some(value) = obj.get(key) {
if let Some(raw) = value.as_str() {
if !raw.is_empty() {
return Some(raw.to_string());
}
} else if let Some(recovered) =
extract_write_content_arg_internal(value, depth + 1)
{
return Some(recovered);
}
}
}
for container in NESTED_ARGS_KEYS {
if let Some(nested) = obj.get(container) {
if let Some(content) = extract_write_content_arg_internal(nested, depth + 1) {
return Some(content);
}
}
}
None
}
_ => None,
}
}
fn infer_write_content_from_assistant_context(latest_assistant_context: &str) -> Option<String> {
let text = latest_assistant_context.trim();
if text.len() < 32 {
return None;
}
Some(text.to_string())
}
fn set_shell_command(args: Value, command: String) -> Value {
let mut obj = args.as_object().cloned().unwrap_or_default();
obj.insert("command".to_string(), Value::String(command));
Value::Object(obj)
}
fn extract_shell_command(args: &Value) -> Option<String> {
extract_shell_command_internal(args, 0)
}
fn extract_shell_command_internal(args: &Value, depth: usize) -> Option<String> {
if depth > 5 {
return None;
}
match args {
Value::String(raw) => {
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
if !(trimmed.starts_with('{') || trimmed.starts_with('[') || trimmed.starts_with('"')) {
return sanitize_shell_command_candidate(trimmed);
}
if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
return extract_shell_command_internal(&parsed, depth + 1);
}
sanitize_shell_command_candidate(trimmed)
}
Value::Array(items) => items
.iter()
.find_map(|item| extract_shell_command_internal(item, depth + 1)),
Value::Object(obj) => {
for key in SHELL_COMMAND_KEYS {
if let Some(raw) = obj.get(key).and_then(|v| v.as_str()) {
if let Some(command) = sanitize_shell_command_candidate(raw) {
return Some(command);
}
}
}
for container in NESTED_ARGS_KEYS {
if let Some(nested) = obj.get(container) {
if let Some(command) = extract_shell_command_internal(nested, depth + 1) {
return Some(command);
}
}
}
None
}
_ => None,
}
}
fn infer_shell_command_from_text(text: &str) -> Option<String> {
let trimmed = text.trim();
if trimmed.is_empty() {
return None;
}
let mut in_tick = false;
let mut tick_buf = String::new();
for ch in trimmed.chars() {
if ch == '`' {
if in_tick {
if let Some(candidate) = sanitize_shell_command_candidate(&tick_buf) {
if looks_like_shell_command(&candidate) {
return Some(candidate);
}
}
tick_buf.clear();
}
in_tick = !in_tick;
continue;
}
if in_tick {
tick_buf.push(ch);
}
}
for line in trimmed.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let lower = line.to_ascii_lowercase();
for prefix in [
"run ",
"execute ",
"call ",
"use bash ",
"use shell ",
"bash ",
"shell ",
"powershell ",
"pwsh ",
] {
if lower.starts_with(prefix) {
let candidate = line[prefix.len()..].trim();
if let Some(command) = sanitize_shell_command_candidate(candidate) {
if looks_like_shell_command(&command) {
return Some(command);
}
}
}
}
}
None
}
fn set_websearch_query_and_source(args: Value, query: Option<String>, query_source: &str) -> Value {
let mut obj = args.as_object().cloned().unwrap_or_default();
if let Some(q) = query {
obj.insert("query".to_string(), Value::String(q));
}
obj.insert(
"__query_source".to_string(),
Value::String(query_source.to_string()),
);
Value::Object(obj)
}
fn set_webfetch_url_arg(args: Value, url: String) -> Value {
let mut obj = args.as_object().cloned().unwrap_or_default();
obj.insert("url".to_string(), Value::String(url));
Value::Object(obj)
}
fn set_pack_builder_goal_arg(args: Value, goal: String) -> Value {
let mut obj = args.as_object().cloned().unwrap_or_default();
obj.insert("goal".to_string(), Value::String(goal));
Value::Object(obj)
}
fn set_pack_builder_apply_args(args: Value, plan_id: String) -> Value {
let mut obj = args.as_object().cloned().unwrap_or_default();
obj.insert("mode".to_string(), Value::String("apply".to_string()));
obj.insert("plan_id".to_string(), Value::String(plan_id));
obj.insert(
"approve_connector_registration".to_string(),
Value::Bool(true),
);
obj.insert("approve_pack_install".to_string(), Value::Bool(true));
obj.insert("approve_enable_routines".to_string(), Value::Bool(false));
Value::Object(obj)
}
fn extract_pack_builder_mode_arg(args: &Value) -> Option<String> {
for key in ["mode"] {
if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
let mode = value.trim().to_ascii_lowercase();
if !mode.is_empty() {
return Some(mode);
}
}
}
for container in ["arguments", "args", "input", "params"] {
if let Some(obj) = args.get(container) {
if let Some(value) = obj.get("mode").and_then(|v| v.as_str()) {
let mode = value.trim().to_ascii_lowercase();
if !mode.is_empty() {
return Some(mode);
}
}
}
}
None
}
fn extract_pack_builder_plan_id_arg(args: &Value) -> Option<String> {
for key in ["plan_id", "planId"] {
if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
let plan_id = value.trim();
if !plan_id.is_empty() {
return Some(plan_id.to_string());
}
}
}
for container in ["arguments", "args", "input", "params"] {
if let Some(obj) = args.get(container) {
for key in ["plan_id", "planId"] {
if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
let plan_id = value.trim();
if !plan_id.is_empty() {
return Some(plan_id.to_string());
}
}
}
}
}
None
}
fn extract_pack_builder_plan_id_from_text(text: &str) -> Option<String> {
if text.trim().is_empty() {
return None;
}
let bytes = text.as_bytes();
let mut idx = 0usize;
while idx + 5 <= bytes.len() {
if &bytes[idx..idx + 5] != b"plan-" {
idx += 1;
continue;
}
let mut end = idx + 5;
while end < bytes.len() {
let ch = bytes[end] as char;
if ch.is_ascii_alphanumeric() || ch == '-' {
end += 1;
} else {
break;
}
}
if end > idx + 5 {
let candidate = &text[idx..end];
if candidate.len() >= 10 {
return Some(candidate.to_string());
}
}
idx = end.saturating_add(1);
}
None
}
fn is_pack_builder_confirmation_text(text: &str) -> bool {
let trimmed = text.trim();
if trimmed.is_empty() {
return false;
}
let lower = trimmed.to_ascii_lowercase();
matches!(
lower.as_str(),
"confirm"
| "confirmed"
| "approve"
| "approved"
| "yes"
| "y"
| "ok"
| "okay"
| "go"
| "go ahead"
| "ship it"
| "do it"
| "apply"
| "run it"
| "✅"
| "👍"
)
}
fn infer_pack_builder_apply_plan_id(
latest_user_text: &str,
latest_assistant_context: &str,
) -> Option<String> {
if let Some(plan_id) = extract_pack_builder_plan_id_from_text(latest_user_text) {
return Some(plan_id);
}
if !is_pack_builder_confirmation_text(latest_user_text) {
return None;
}
extract_pack_builder_plan_id_from_text(latest_assistant_context)
}
fn ensure_pack_builder_default_mode(args: Value) -> Value {
let mut obj = args.as_object().cloned().unwrap_or_default();
let has_mode = obj
.get("mode")
.and_then(Value::as_str)
.map(str::trim)
.is_some_and(|v| !v.is_empty());
if !has_mode {
obj.insert("mode".to_string(), Value::String("preview".to_string()));
}
Value::Object(obj)
}
fn extract_webfetch_url_arg(args: &Value) -> Option<String> {
const URL_KEYS: [&str; 5] = ["url", "uri", "link", "href", "target_url"];
for key in URL_KEYS {
if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
if let Some(url) = sanitize_url_candidate(value) {
return Some(url);
}
}
}
for container in ["arguments", "args", "input", "params"] {
if let Some(obj) = args.get(container) {
for key in URL_KEYS {
if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
if let Some(url) = sanitize_url_candidate(value) {
return Some(url);
}
}
}
}
}
args.as_str().and_then(sanitize_url_candidate)
}
fn extract_pack_builder_goal_arg(args: &Value) -> Option<String> {
const GOAL_KEYS: [&str; 1] = ["goal"];
for key in GOAL_KEYS {
if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
let trimmed = value.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
}
for container in ["arguments", "args", "input", "params"] {
if let Some(obj) = args.get(container) {
for key in GOAL_KEYS {
if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
let trimmed = value.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
}
}
}
args.as_str()
.map(str::trim)
.filter(|v| !v.is_empty())
.map(ToString::to_string)
}
fn extract_websearch_query(args: &Value) -> Option<String> {
const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
for key in QUERY_KEYS {
if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
if let Some(query) = sanitize_websearch_query_candidate(value) {
return Some(query);
}
}
}
for container in ["arguments", "args", "input", "params"] {
if let Some(obj) = args.get(container) {
for key in QUERY_KEYS {
if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
if let Some(query) = sanitize_websearch_query_candidate(value) {
return Some(query);
}
}
}
}
}
args.as_str().and_then(sanitize_websearch_query_candidate)
}
fn sanitize_websearch_query_candidate(raw: &str) -> Option<String> {
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
let lower = trimmed.to_ascii_lowercase();
if let Some(start) = lower.find("<arg_value>") {
let value_start = start + "<arg_value>".len();
let tail = &trimmed[value_start..];
let value = if let Some(end) = tail.to_ascii_lowercase().find("</arg_value>") {
&tail[..end]
} else {
tail
};
let cleaned = value.trim();
if !cleaned.is_empty() {
return Some(cleaned.to_string());
}
}
let without_wrappers = trimmed
.replace("<arg_key>", " ")
.replace("</arg_key>", " ")
.replace("<arg_value>", " ")
.replace("</arg_value>", " ");
let collapsed = without_wrappers
.split_whitespace()
.collect::<Vec<_>>()
.join(" ");
if collapsed.is_empty() {
return None;
}
let collapsed_lower = collapsed.to_ascii_lowercase();
if let Some(rest) = collapsed_lower.strip_prefix("websearch query ") {
let offset = collapsed.len() - rest.len();
let q = collapsed[offset..].trim();
if !q.is_empty() {
return Some(q.to_string());
}
}
if let Some(rest) = collapsed_lower.strip_prefix("query ") {
let offset = collapsed.len() - rest.len();
let q = collapsed[offset..].trim();
if !q.is_empty() {
return Some(q.to_string());
}
}
Some(collapsed)
}
fn infer_websearch_query_from_text(text: &str) -> Option<String> {
let trimmed = text.trim();
if trimmed.is_empty() {
return None;
}
let lower = trimmed.to_lowercase();
const PREFIXES: [&str; 11] = [
"web search",
"websearch",
"search web for",
"search web",
"search for",
"search",
"look up",
"lookup",
"find",
"web lookup",
"query",
];
let mut candidate = trimmed;
for prefix in PREFIXES {
if lower.starts_with(prefix) && lower.len() >= prefix.len() {
let remainder = trimmed[prefix.len()..]
.trim_start_matches(|c: char| c.is_whitespace() || c == ':' || c == '-');
candidate = remainder;
break;
}
}
let normalized = candidate
.trim()
.trim_matches(|c: char| c == '"' || c == '\'' || c.is_whitespace())
.trim_matches(|c: char| matches!(c, '.' | ',' | '!' | '?'))
.trim()
.to_string();
if normalized.split_whitespace().count() < 2 {
return None;
}
Some(normalized)
}
fn infer_file_path_from_text(text: &str) -> Option<String> {
let trimmed = text.trim();
if trimmed.is_empty() {
return None;
}
let mut candidates: Vec<String> = Vec::new();
let mut in_tick = false;
let mut tick_buf = String::new();
for ch in trimmed.chars() {
if ch == '`' {
if in_tick {
let cand = sanitize_path_candidate(&tick_buf);
if let Some(path) = cand {
candidates.push(path);
}
tick_buf.clear();
}
in_tick = !in_tick;
continue;
}
if in_tick {
tick_buf.push(ch);
}
}
for raw in trimmed.split_whitespace() {
if let Some(path) = sanitize_path_candidate(raw) {
candidates.push(path);
}
}
let mut deduped = Vec::new();
let mut seen = HashSet::new();
for candidate in candidates {
if seen.insert(candidate.clone()) {
deduped.push(candidate);
}
}
deduped.into_iter().next()
}
fn infer_workspace_root_from_text(text: &str) -> Option<String> {
text.lines().find_map(|line| {
let trimmed = line.trim();
let value = trimmed.strip_prefix("Workspace:")?.trim();
sanitize_path_candidate(value)
})
}
fn infer_required_output_target_path_from_text(text: &str) -> Option<String> {
let marker = "Required output target:";
if let Some(idx) = text.find(marker) {
let tail = text[idx + marker.len()..].trim_start();
if let Some(start) = tail.find('{') {
let json_candidate = tail[start..]
.lines()
.take_while(|line| {
let trimmed = line.trim();
!(trimmed.is_empty() && !trimmed.starts_with('{'))
})
.collect::<Vec<_>>()
.join("\n");
if let Ok(parsed) = serde_json::from_str::<Value>(&json_candidate) {
if let Some(path) = parsed.get("path").and_then(|v| v.as_str()) {
if let Some(clean) = sanitize_explicit_output_target_path(path) {
return Some(clean);
}
}
}
}
}
let auto_marker = "Create or update `";
if let Some(idx) = text.find(auto_marker) {
let after = &text[idx + auto_marker.len()..];
if let Some(end) = after.find('`') {
let path = after[..end].trim();
if let Some(clean) = sanitize_explicit_output_target_path(path) {
return Some(clean);
}
}
}
None
}
fn infer_write_file_path_from_text(text: &str) -> Option<String> {
let inferred = infer_file_path_from_text(text)?;
let workspace_root = infer_workspace_root_from_text(text);
if workspace_root
.as_deref()
.is_some_and(|root| root == inferred)
{
return None;
}
Some(inferred)
}
fn infer_url_from_text(text: &str) -> Option<String> {
let trimmed = text.trim();
if trimmed.is_empty() {
return None;
}
let mut candidates: Vec<String> = Vec::new();
let mut in_tick = false;
let mut tick_buf = String::new();
for ch in trimmed.chars() {
if ch == '`' {
if in_tick {
if let Some(url) = sanitize_url_candidate(&tick_buf) {
candidates.push(url);
}
tick_buf.clear();
}
in_tick = !in_tick;
continue;
}
if in_tick {
tick_buf.push(ch);
}
}
for raw in trimmed.split_whitespace() {
if let Some(url) = sanitize_url_candidate(raw) {
candidates.push(url);
}
}
let mut seen = HashSet::new();
candidates
.into_iter()
.find(|candidate| seen.insert(candidate.clone()))
}
fn infer_pack_builder_goal_from_text(text: &str) -> Option<String> {
let trimmed = text.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn sanitize_url_candidate(raw: &str) -> Option<String> {
let token = raw
.trim()
.trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | '*' | '|'))
.trim_start_matches(['(', '[', '{', '<'])
.trim_end_matches([',', ';', ':', ')', ']', '}', '>'])
.trim_end_matches('.')
.trim();
if token.is_empty() {
return None;
}
let lower = token.to_ascii_lowercase();
if !(lower.starts_with("http://") || lower.starts_with("https://")) {
return None;
}
Some(token.to_string())
}
fn clean_path_candidate_token(raw: &str) -> Option<String> {
let token = raw
.trim()
.trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | '*' | '|'))
.trim_start_matches(['(', '[', '{', '<'])
.trim_end_matches([',', ';', ':', ')', ']', '}', '>'])
.trim_end_matches('.')
.trim();
if token.is_empty() {
return None;
}
Some(token.to_string())
}
fn sanitize_explicit_output_target_path(raw: &str) -> Option<String> {
let token = clean_path_candidate_token(raw)?;
let lower = token.to_ascii_lowercase();
if lower.starts_with("http://") || lower.starts_with("https://") {
return None;
}
if is_malformed_tool_path_token(&token) {
return None;
}
if is_root_only_path_token(&token) {
return None;
}
if is_placeholder_path_token(&token) {
return None;
}
if token.ends_with('/') || token.ends_with('\\') {
return None;
}
Some(token.to_string())
}
fn sanitize_path_candidate(raw: &str) -> Option<String> {
let token = clean_path_candidate_token(raw)?;
let lower = token.to_ascii_lowercase();
if lower.starts_with("http://") || lower.starts_with("https://") {
return None;
}
if is_malformed_tool_path_token(token.as_str()) {
return None;
}
if is_root_only_path_token(token.as_str()) {
return None;
}
if is_placeholder_path_token(token.as_str()) {
return None;
}
let looks_like_path = token.contains('/') || token.contains('\\');
let has_file_ext = [
".md", ".txt", ".json", ".yaml", ".yml", ".toml", ".rs", ".ts", ".tsx", ".js", ".jsx",
".py", ".go", ".java", ".cpp", ".c", ".h", ".pdf", ".docx", ".pptx", ".xlsx", ".rtf",
".html", ".htm", ".css", ".scss", ".sass", ".less", ".svg", ".xml", ".sql", ".sh",
]
.iter()
.any(|ext| lower.ends_with(ext));
if !looks_like_path && !has_file_ext {
return None;
}
Some(token)
}
fn is_placeholder_path_token(token: &str) -> bool {
let lowered = token.trim().to_ascii_lowercase();
if lowered.is_empty() {
return true;
}
matches!(
lowered.as_str(),
"files/directories"
| "file/directory"
| "relative/or/absolute/path"
| "path/to/file"
| "path/to/your/file"
| "tool/policy"
| "tools/policy"
| "the expected artifact file"
| "workspace/file"
)
}
fn is_malformed_tool_path_token(token: &str) -> bool {
let lower = token.to_ascii_lowercase();
if lower.contains("<tool_call")
|| lower.contains("</tool_call")
|| lower.contains("<function=")
|| lower.contains("<parameter=")
|| lower.contains("</function>")
|| lower.contains("</parameter>")
{
return true;
}
if token.contains('\n') || token.contains('\r') {
return true;
}
if token.contains('*') || token.contains('?') {
return true;
}
false
}
fn is_root_only_path_token(token: &str) -> bool {
let trimmed = token.trim();
if trimmed.is_empty() {
return true;
}
if matches!(trimmed, "/" | "\\" | "." | ".." | "~") {
return true;
}
let bytes = trimmed.as_bytes();
if bytes.len() == 2 && bytes[1] == b':' && (bytes[0] as char).is_ascii_alphabetic() {
return true;
}
if bytes.len() == 3
&& bytes[1] == b':'
&& (bytes[0] as char).is_ascii_alphabetic()
&& (bytes[2] == b'\\' || bytes[2] == b'/')
{
return true;
}
false
}
fn sanitize_shell_command_candidate(raw: &str) -> Option<String> {
let token = raw
.trim()
.trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | ',' | ';'))
.trim();
if token.is_empty() {
return None;
}
Some(token.to_string())
}
fn looks_like_shell_command(candidate: &str) -> bool {
let lower = candidate.to_ascii_lowercase();
if lower.is_empty() {
return false;
}
let first = lower.split_whitespace().next().unwrap_or_default();
let common = [
"rg",
"git",
"cargo",
"pnpm",
"npm",
"node",
"python",
"pytest",
"pwsh",
"powershell",
"cmd",
"dir",
"ls",
"cat",
"type",
"echo",
"cd",
"mkdir",
"cp",
"copy",
"move",
"del",
"rm",
];
common.contains(&first)
|| first.starts_with("get-")
|| first.starts_with("./")
|| first.starts_with(".\\")
|| lower.contains(" | ")
|| lower.contains(" && ")
|| lower.contains(" ; ")
}
const FILE_PATH_KEYS: [&str; 10] = [
"path",
"file_path",
"filePath",
"filepath",
"filename",
"file",
"target",
"targetFile",
"absolutePath",
"uri",
];
const SHELL_COMMAND_KEYS: [&str; 4] = ["command", "cmd", "script", "line"];
const WRITE_CONTENT_KEYS: [&str; 8] = [
"content",
"text",
"body",
"value",
"markdown",
"document",
"output",
"file_content",
];
const NESTED_ARGS_KEYS: [&str; 10] = [
"arguments",
"args",
"input",
"params",
"payload",
"data",
"tool_input",
"toolInput",
"tool_args",
"toolArgs",
];
fn tool_signature(tool_name: &str, args: &Value) -> String {
let normalized = normalize_tool_name(tool_name);
if normalized == "websearch" {
let query = extract_websearch_query(args)
.unwrap_or_default()
.to_lowercase();
let limit = args
.get("limit")
.or_else(|| args.get("numResults"))
.or_else(|| args.get("num_results"))
.and_then(|v| v.as_u64())
.unwrap_or(8);
let domains = args
.get("domains")
.or_else(|| args.get("domain"))
.map(|v| v.to_string())
.unwrap_or_default();
let recency = args.get("recency").and_then(|v| v.as_u64()).unwrap_or(0);
return format!("websearch:q={query}|limit={limit}|domains={domains}|recency={recency}");
}
format!("{}:{}", normalized, args)
}
fn stable_hash(input: &str) -> String {
let mut hasher = DefaultHasher::new();
input.hash(&mut hasher);
format!("{:016x}", hasher.finish())
}
fn summarize_tool_outputs(outputs: &[String]) -> String {
outputs
.iter()
.take(6)
.map(|output| truncate_text(output, 600))
.collect::<Vec<_>>()
.join("\n\n")
}
fn is_os_mismatch_tool_output(output: &str) -> bool {
let lower = output.to_ascii_lowercase();
lower.contains("os error 3")
|| lower.contains("system cannot find the path specified")
|| lower.contains("command not found")
|| lower.contains("is not recognized as an internal or external command")
|| lower.contains("shell command blocked on windows")
}
fn format_context_mode(requested: &ContextMode, auto_compact: bool) -> &'static str {
match requested {
ContextMode::Full => "full",
ContextMode::Compact => "compact",
ContextMode::Auto => {
if auto_compact {
"auto_compact"
} else {
"auto_standard"
}
}
}
}
fn tandem_runtime_system_prompt(host: &HostRuntimeContext, mcp_server_names: &[String]) -> String {
let mut sections = Vec::new();
if os_aware_prompts_enabled() {
sections.push(format!(
"[Execution Environment]\nHost OS: {}\nShell: {}\nPath style: {}\nArchitecture: {}",
host_os_label(host.os),
shell_family_label(host.shell_family),
path_style_label(host.path_style),
host.arch
));
}
sections.push(
"You are operating inside Tandem (Desktop/TUI) as an engine-backed coding assistant.
Use tool calls to inspect and modify the workspace when needed instead of asking the user
to manually run basic discovery steps. Permission prompts may occur for some tools; if
a tool is denied or blocked, explain what was blocked and suggest a concrete next step."
.to_string(),
);
sections.push(
"For greetings or simple conversational messages (for example: hi, hello, thanks),
respond directly without calling tools."
.to_string(),
);
if host.os == HostOs::Windows {
sections.push(
"Windows guidance: prefer cross-platform tools (`glob`, `grep`, `read`, `write`, `edit`) and PowerShell-native commands.
Avoid Unix-only shell syntax (`ls -la`, `find ... -type f`, `cat` pipelines) unless translated.
If a shell command fails with a path/shell mismatch, immediately switch to cross-platform tools (`read`, `glob`, `grep`)."
.to_string(),
);
} else {
sections.push(
"POSIX guidance: standard shell commands are available.
Use cross-platform tools (`glob`, `grep`, `read`) when they are simpler and safer for codebase exploration."
.to_string(),
);
}
if !mcp_server_names.is_empty() {
let cap = mcp_catalog_max_servers();
let mut listed = mcp_server_names
.iter()
.take(cap)
.cloned()
.collect::<Vec<_>>();
listed.sort();
let mut catalog = listed
.iter()
.map(|name| format!("- {name}"))
.collect::<Vec<_>>();
if mcp_server_names.len() > cap {
catalog.push(format!("- (+{} more)", mcp_server_names.len() - cap));
}
sections.push(format!(
"[Connected Integrations]\nThe following external integrations are currently connected and available:\n{}",
catalog.join("\n")
));
}
sections.join("\n\n")
}
fn os_aware_prompts_enabled() -> bool {
std::env::var("TANDEM_OS_AWARE_PROMPTS")
.ok()
.map(|v| {
let normalized = v.trim().to_ascii_lowercase();
!(normalized == "0" || normalized == "false" || normalized == "off")
})
.unwrap_or(true)
}
fn semantic_tool_retrieval_enabled() -> bool {
std::env::var("TANDEM_SEMANTIC_TOOL_RETRIEVAL")
.ok()
.map(|raw| {
!matches!(
raw.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "off" | "no"
)
})
.unwrap_or(true)
}
fn semantic_tool_retrieval_k() -> usize {
std::env::var("TANDEM_SEMANTIC_TOOL_RETRIEVAL_K")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or_else(max_tools_per_call_expanded)
}
fn mcp_catalog_in_system_prompt_enabled() -> bool {
std::env::var("TANDEM_MCP_CATALOG_IN_SYSTEM_PROMPT")
.ok()
.map(|raw| {
!matches!(
raw.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "off" | "no"
)
})
.unwrap_or(true)
}
fn mcp_catalog_max_servers() -> usize {
std::env::var("TANDEM_MCP_CATALOG_MAX_SERVERS")
.ok()
.and_then(|raw| raw.trim().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(20)
}
fn host_os_label(os: HostOs) -> &'static str {
match os {
HostOs::Windows => "windows",
HostOs::Linux => "linux",
HostOs::Macos => "macos",
}
}
fn shell_family_label(shell: ShellFamily) -> &'static str {
match shell {
ShellFamily::Powershell => "powershell",
ShellFamily::Posix => "posix",
}
}
fn path_style_label(path_style: PathStyle) -> &'static str {
match path_style {
PathStyle::Windows => "windows",
PathStyle::Posix => "posix",
}
}
fn should_force_workspace_probe(user_text: &str, completion: &str) -> bool {
let user = user_text.to_lowercase();
let reply = completion.to_lowercase();
let asked_for_project_context = [
"what is this project",
"what's this project",
"what project is this",
"explain this project",
"analyze this project",
"inspect this project",
"look at the project",
"summarize this project",
"show me this project",
"what files are in",
"show files",
"list files",
"read files",
"browse files",
"use glob",
"run glob",
]
.iter()
.any(|needle| user.contains(needle));
if !asked_for_project_context {
return false;
}
let assistant_claimed_no_access = [
"can't inspect",
"cannot inspect",
"unable to inspect",
"unable to directly inspect",
"can't access",
"cannot access",
"unable to access",
"can't read files",
"cannot read files",
"unable to read files",
"tool restriction",
"tool restrictions",
"don't have visibility",
"no visibility",
"haven't been able to inspect",
"i don't know what this project is",
"need your help to",
"sandbox",
"restriction",
"system restriction",
"permissions restrictions",
]
.iter()
.any(|needle| reply.contains(needle));
asked_for_project_context && assistant_claimed_no_access
}
fn parse_tool_invocation(input: &str) -> Option<(String, serde_json::Value)> {
let raw = input.trim();
if !raw.starts_with("/tool ") {
return None;
}
let rest = raw.trim_start_matches("/tool ").trim();
let mut split = rest.splitn(2, ' ');
let tool = normalize_tool_name(split.next()?.trim());
let args = split
.next()
.and_then(|v| serde_json::from_str::<serde_json::Value>(v).ok())
.unwrap_or_else(|| json!({}));
Some((tool, args))
}
fn parse_tool_invocations_from_response(input: &str) -> Vec<(String, serde_json::Value)> {
let trimmed = input.trim();
if trimmed.is_empty() {
return Vec::new();
}
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
if let Some(found) = extract_tool_call_from_value(&parsed) {
return vec![found];
}
}
if let Some(block) = extract_first_json_object(trimmed) {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&block) {
if let Some(found) = extract_tool_call_from_value(&parsed) {
return vec![found];
}
}
}
parse_function_style_tool_calls(trimmed)
}
#[cfg(test)]
fn parse_tool_invocation_from_response(input: &str) -> Option<(String, serde_json::Value)> {
parse_tool_invocations_from_response(input)
.into_iter()
.next()
}
fn parse_function_style_tool_calls(input: &str) -> Vec<(String, Value)> {
let mut calls = Vec::new();
let lower = input.to_lowercase();
let names = [
"todo_write",
"todowrite",
"update_todo_list",
"update_todos",
];
let mut cursor = 0usize;
while cursor < lower.len() {
let mut best: Option<(usize, &str)> = None;
for name in names {
let needle = format!("{name}(");
if let Some(rel_idx) = lower[cursor..].find(&needle) {
let idx = cursor + rel_idx;
if best.as_ref().is_none_or(|(best_idx, _)| idx < *best_idx) {
best = Some((idx, name));
}
}
}
let Some((tool_start, tool_name)) = best else {
break;
};
let open_paren = tool_start + tool_name.len();
if let Some(close_paren) = find_matching_paren(input, open_paren) {
if let Some(args_text) = input.get(open_paren + 1..close_paren) {
let args = parse_function_style_args(args_text.trim());
calls.push((normalize_tool_name(tool_name), Value::Object(args)));
}
cursor = close_paren.saturating_add(1);
} else {
cursor = tool_start.saturating_add(tool_name.len());
}
}
calls
}
fn find_matching_paren(input: &str, open_paren: usize) -> Option<usize> {
if input.as_bytes().get(open_paren).copied()? != b'(' {
return None;
}
let mut depth = 0usize;
let mut in_single = false;
let mut in_double = false;
let mut escaped = false;
for (offset, ch) in input.get(open_paren..)?.char_indices() {
if escaped {
escaped = false;
continue;
}
if ch == '\\' && (in_single || in_double) {
escaped = true;
continue;
}
if ch == '\'' && !in_double {
in_single = !in_single;
continue;
}
if ch == '"' && !in_single {
in_double = !in_double;
continue;
}
if in_single || in_double {
continue;
}
match ch {
'(' => depth += 1,
')' => {
depth = depth.saturating_sub(1);
if depth == 0 {
return Some(open_paren + offset);
}
}
_ => {}
}
}
None
}
fn parse_function_style_args(input: &str) -> Map<String, Value> {
let mut args = Map::new();
if input.trim().is_empty() {
return args;
}
let mut parts = Vec::<String>::new();
let mut current = String::new();
let mut in_single = false;
let mut in_double = false;
let mut escaped = false;
let mut depth_paren = 0usize;
let mut depth_bracket = 0usize;
let mut depth_brace = 0usize;
for ch in input.chars() {
if escaped {
current.push(ch);
escaped = false;
continue;
}
if ch == '\\' && (in_single || in_double) {
current.push(ch);
escaped = true;
continue;
}
if ch == '\'' && !in_double {
in_single = !in_single;
current.push(ch);
continue;
}
if ch == '"' && !in_single {
in_double = !in_double;
current.push(ch);
continue;
}
if in_single || in_double {
current.push(ch);
continue;
}
match ch {
'(' => depth_paren += 1,
')' => depth_paren = depth_paren.saturating_sub(1),
'[' => depth_bracket += 1,
']' => depth_bracket = depth_bracket.saturating_sub(1),
'{' => depth_brace += 1,
'}' => depth_brace = depth_brace.saturating_sub(1),
',' if depth_paren == 0 && depth_bracket == 0 && depth_brace == 0 => {
let part = current.trim();
if !part.is_empty() {
parts.push(part.to_string());
}
current.clear();
continue;
}
_ => {}
}
current.push(ch);
}
let tail = current.trim();
if !tail.is_empty() {
parts.push(tail.to_string());
}
for part in parts {
let Some((raw_key, raw_value)) = part
.split_once('=')
.or_else(|| part.split_once(':'))
.map(|(k, v)| (k.trim(), v.trim()))
else {
continue;
};
let key = raw_key.trim_matches(|c| c == '"' || c == '\'' || c == '`');
if key.is_empty() {
continue;
}
let value = parse_scalar_like_value(raw_value);
args.insert(key.to_string(), value);
}
args
}
fn parse_scalar_like_value(raw: &str) -> Value {
let trimmed = raw.trim();
if trimmed.is_empty() {
return Value::Null;
}
if (trimmed.starts_with('"') && trimmed.ends_with('"'))
|| (trimmed.starts_with('\'') && trimmed.ends_with('\''))
{
if trimmed.len() < 2 {
return Value::String(trimmed.to_string());
}
return Value::String(trimmed[1..trimmed.len().saturating_sub(1)].to_string());
}
if trimmed.eq_ignore_ascii_case("true") {
return Value::Bool(true);
}
if trimmed.eq_ignore_ascii_case("false") {
return Value::Bool(false);
}
if trimmed.eq_ignore_ascii_case("null") {
return Value::Null;
}
if let Ok(v) = serde_json::from_str::<Value>(trimmed) {
return v;
}
if let Ok(v) = trimmed.parse::<i64>() {
return Value::Number(Number::from(v));
}
if let Ok(v) = trimmed.parse::<f64>() {
if let Some(n) = Number::from_f64(v) {
return Value::Number(n);
}
}
Value::String(trimmed.to_string())
}
fn recover_write_args_from_malformed_json(raw: &str) -> Option<Value> {
let content = extract_loose_json_string_field(raw, "content")?;
let mut obj = Map::new();
if let Some(path) = extract_loose_json_string_field(raw, "path") {
obj.insert("path".to_string(), Value::String(path));
}
obj.insert("content".to_string(), Value::String(content));
Some(Value::Object(obj))
}
fn extract_loose_json_string_field(input: &str, key: &str) -> Option<String> {
let pattern = format!("\"{key}\"");
let start = input.find(&pattern)?;
let remainder = input.get(start + pattern.len()..)?;
let colon = remainder.find(':')?;
let value = remainder.get(colon + 1..)?.trim_start();
let value = value.strip_prefix('"')?;
Some(parse_loose_json_string_value(value))
}
fn parse_loose_json_string_value(input: &str) -> String {
let mut out = String::new();
let mut chars = input.chars().peekable();
let mut closed = false;
while let Some(ch) = chars.next() {
if ch == '"' {
closed = true;
break;
}
if ch != '\\' {
out.push(ch);
continue;
}
let Some(escaped) = chars.next() else {
out.push('\\');
break;
};
match escaped {
'"' => out.push('"'),
'\\' => out.push('\\'),
'/' => out.push('/'),
'b' => out.push('\u{0008}'),
'f' => out.push('\u{000C}'),
'n' => out.push('\n'),
'r' => out.push('\r'),
't' => out.push('\t'),
'u' => {
let mut hex = String::new();
for _ in 0..4 {
let Some(next) = chars.next() else {
break;
};
hex.push(next);
}
if hex.len() == 4 {
if let Ok(codepoint) = u16::from_str_radix(&hex, 16) {
if let Some(decoded) = char::from_u32(codepoint as u32) {
out.push(decoded);
continue;
}
}
}
out.push('\\');
out.push('u');
out.push_str(&hex);
}
other => {
out.push('\\');
out.push(other);
}
}
}
if !closed {
return out;
}
out
}
fn normalize_todo_write_args(args: Value, completion: &str) -> Value {
if is_todo_status_update_args(&args) {
return args;
}
let mut obj = match args {
Value::Object(map) => map,
Value::Array(items) => {
return json!({ "todos": normalize_todo_arg_items(items) });
}
Value::String(text) => {
let derived = extract_todo_candidates_from_text(&text);
if !derived.is_empty() {
return json!({ "todos": derived });
}
return json!({});
}
_ => return json!({}),
};
if obj
.get("todos")
.and_then(|v| v.as_array())
.map(|arr| !arr.is_empty())
.unwrap_or(false)
{
return Value::Object(obj);
}
for alias in ["tasks", "items", "list", "checklist"] {
if let Some(items) = obj.get(alias).and_then(|v| v.as_array()) {
let normalized = normalize_todo_arg_items(items.clone());
if !normalized.is_empty() {
obj.insert("todos".to_string(), Value::Array(normalized));
return Value::Object(obj);
}
}
}
let derived = extract_todo_candidates_from_text(completion);
if !derived.is_empty() {
obj.insert("todos".to_string(), Value::Array(derived));
}
Value::Object(obj)
}
fn normalize_todo_arg_items(items: Vec<Value>) -> Vec<Value> {
items
.into_iter()
.filter_map(|item| match item {
Value::String(text) => {
let content = text.trim();
if content.is_empty() {
None
} else {
Some(json!({"content": content}))
}
}
Value::Object(mut obj) => {
if !obj.contains_key("content") {
if let Some(text) = obj.get("text").cloned() {
obj.insert("content".to_string(), text);
} else if let Some(title) = obj.get("title").cloned() {
obj.insert("content".to_string(), title);
} else if let Some(name) = obj.get("name").cloned() {
obj.insert("content".to_string(), name);
}
}
let content = obj
.get("content")
.and_then(|v| v.as_str())
.map(str::trim)
.unwrap_or("");
if content.is_empty() {
None
} else {
Some(Value::Object(obj))
}
}
_ => None,
})
.collect()
}
fn is_todo_status_update_args(args: &Value) -> bool {
let Some(obj) = args.as_object() else {
return false;
};
let has_status = obj
.get("status")
.and_then(|v| v.as_str())
.map(|s| !s.trim().is_empty())
.unwrap_or(false);
let has_target =
obj.get("task_id").is_some() || obj.get("todo_id").is_some() || obj.get("id").is_some();
has_status && has_target
}
fn is_empty_todo_write_args(args: &Value) -> bool {
if is_todo_status_update_args(args) {
return false;
}
let Some(obj) = args.as_object() else {
return true;
};
!obj.get("todos")
.and_then(|v| v.as_array())
.map(|arr| !arr.is_empty())
.unwrap_or(false)
}
fn parse_streamed_tool_args(tool_name: &str, raw_args: &str) -> Value {
let trimmed = raw_args.trim();
if trimmed.is_empty() {
return json!({});
}
let normalized_tool = normalize_tool_name(tool_name);
if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
return normalize_streamed_tool_args(&normalized_tool, parsed, trimmed);
}
if normalized_tool == "write" {
if let Some(recovered) = recover_write_args_from_malformed_json(trimmed) {
return recovered;
}
}
let kv_args = parse_function_style_args(trimmed);
if !kv_args.is_empty() {
return normalize_streamed_tool_args(&normalized_tool, Value::Object(kv_args), trimmed);
}
if normalized_tool == "websearch" {
if let Some(query) = sanitize_websearch_query_candidate(trimmed) {
return json!({ "query": query });
}
return json!({});
}
Value::String(trimmed.to_string())
}
fn normalize_streamed_tool_args(tool_name: &str, parsed: Value, raw: &str) -> Value {
let normalized_tool = normalize_tool_name(tool_name);
if normalized_tool != "websearch" {
return parsed;
}
match parsed {
Value::Object(mut obj) => {
if !has_websearch_query(&obj) && !raw.trim().is_empty() {
if let Some(query) = sanitize_websearch_query_candidate(raw) {
obj.insert("query".to_string(), Value::String(query));
}
}
Value::Object(obj)
}
Value::String(s) => match sanitize_websearch_query_candidate(&s) {
Some(query) => json!({ "query": query }),
None => json!({}),
},
other => other,
}
}
fn has_websearch_query(obj: &Map<String, Value>) -> bool {
const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
QUERY_KEYS.iter().any(|key| {
obj.get(*key)
.and_then(|v| v.as_str())
.map(|s| !s.trim().is_empty())
.unwrap_or(false)
})
}
fn extract_tool_call_from_value(value: &Value) -> Option<(String, Value)> {
if let Some(obj) = value.as_object() {
if let Some(tool) = obj.get("tool").and_then(|v| v.as_str()) {
return Some((
normalize_tool_name(tool),
obj.get("args").cloned().unwrap_or_else(|| json!({})),
));
}
if let Some(tool) = obj.get("name").and_then(|v| v.as_str()) {
let args = obj
.get("args")
.cloned()
.or_else(|| obj.get("arguments").cloned())
.unwrap_or_else(|| json!({}));
let normalized_tool = normalize_tool_name(tool);
let args = if let Some(raw) = args.as_str() {
parse_streamed_tool_args(&normalized_tool, raw)
} else {
args
};
return Some((normalized_tool, args));
}
for key in [
"tool_call",
"toolCall",
"call",
"function_call",
"functionCall",
] {
if let Some(nested) = obj.get(key) {
if let Some(found) = extract_tool_call_from_value(nested) {
return Some(found);
}
}
}
if let Some(calls) = obj.get("tool_calls").and_then(|v| v.as_array()) {
for call in calls {
if let Some(found) = extract_tool_call_from_value(call) {
return Some(found);
}
}
}
}
if let Some(items) = value.as_array() {
for item in items {
if let Some(found) = extract_tool_call_from_value(item) {
return Some(found);
}
}
}
None
}
fn extract_first_json_object(input: &str) -> Option<String> {
let mut start = None;
let mut depth = 0usize;
for (idx, ch) in input.char_indices() {
if ch == '{' {
if start.is_none() {
start = Some(idx);
}
depth += 1;
} else if ch == '}' {
if depth == 0 {
continue;
}
depth -= 1;
if depth == 0 {
let begin = start?;
let block = input.get(begin..=idx)?;
return Some(block.to_string());
}
}
}
None
}
fn extract_todo_candidates_from_text(input: &str) -> Vec<Value> {
let mut seen = HashSet::<String>::new();
let mut todos = Vec::new();
for raw_line in input.lines() {
let mut line = raw_line.trim();
let mut structured_line = false;
if line.is_empty() {
continue;
}
if line.starts_with("```") {
continue;
}
if line.ends_with(':') {
continue;
}
if let Some(rest) = line
.strip_prefix("- [ ]")
.or_else(|| line.strip_prefix("* [ ]"))
.or_else(|| line.strip_prefix("- [x]"))
.or_else(|| line.strip_prefix("* [x]"))
{
line = rest.trim();
structured_line = true;
} else if let Some(rest) = line.strip_prefix("- ").or_else(|| line.strip_prefix("* ")) {
line = rest.trim();
structured_line = true;
} else {
let bytes = line.as_bytes();
let mut i = 0usize;
while i < bytes.len() && bytes[i].is_ascii_digit() {
i += 1;
}
if i > 0 && i + 1 < bytes.len() && (bytes[i] == b'.' || bytes[i] == b')') {
line = line[i + 1..].trim();
structured_line = true;
}
}
if !structured_line {
continue;
}
let content = line.trim_matches(|c: char| c.is_whitespace() || c == '-' || c == '*');
if content.len() < 5 || content.len() > 180 {
continue;
}
let key = content.to_lowercase();
if seen.contains(&key) {
continue;
}
seen.insert(key);
todos.push(json!({ "content": content }));
if todos.len() >= 25 {
break;
}
}
todos
}
async fn emit_plan_todo_fallback(
storage: std::sync::Arc<Storage>,
bus: &EventBus,
session_id: &str,
message_id: &str,
completion: &str,
) {
let todos = extract_todo_candidates_from_text(completion);
if todos.is_empty() {
return;
}
let invoke_part = WireMessagePart::tool_invocation(
session_id,
message_id,
"todo_write",
json!({"todos": todos.clone()}),
);
let call_id = invoke_part.id.clone();
bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": invoke_part}),
));
if storage.set_todos(session_id, todos.clone()).await.is_err() {
let mut failed_part = WireMessagePart::tool_result(
session_id,
message_id,
"todo_write",
Some(json!({"todos": todos.clone()})),
json!(null),
);
failed_part.id = call_id;
failed_part.state = Some("failed".to_string());
failed_part.error = Some("failed to persist plan todos".to_string());
bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": failed_part}),
));
return;
}
let normalized = storage.get_todos(session_id).await;
let mut result_part = WireMessagePart::tool_result(
session_id,
message_id,
"todo_write",
Some(json!({"todos": todos.clone()})),
json!({ "todos": normalized }),
);
result_part.id = call_id;
bus.publish(EngineEvent::new(
"message.part.updated",
json!({"part": result_part}),
));
bus.publish(EngineEvent::new(
"todo.updated",
json!({
"sessionID": session_id,
"todos": normalized
}),
));
}
async fn emit_plan_question_fallback(
storage: std::sync::Arc<Storage>,
bus: &EventBus,
session_id: &str,
message_id: &str,
completion: &str,
) {
let trimmed = completion.trim();
if trimmed.is_empty() {
return;
}
let hints = extract_todo_candidates_from_text(trimmed)
.into_iter()
.take(6)
.filter_map(|v| {
v.get("content")
.and_then(|c| c.as_str())
.map(ToString::to_string)
})
.collect::<Vec<_>>();
let mut options = hints
.iter()
.map(|label| json!({"label": label, "description": "Use this as a starting task"}))
.collect::<Vec<_>>();
if options.is_empty() {
options = vec![
json!({"label":"Define scope", "description":"Clarify the intended outcome"}),
json!({"label":"Provide constraints", "description":"Budget, timeline, and constraints"}),
json!({"label":"Draft a starter list", "description":"Generate a first-pass task list"}),
];
}
let question_payload = vec![json!({
"header":"Planning Input",
"question":"I couldn't produce a concrete task list yet. Which tasks should I include first?",
"options": options,
"multiple": true,
"custom": true
})];
let request = storage
.add_question_request(session_id, message_id, question_payload.clone())
.await
.ok();
bus.publish(EngineEvent::new(
"question.asked",
json!({
"id": request
.as_ref()
.map(|req| req.id.clone())
.unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
"sessionID": session_id,
"messageID": message_id,
"questions": question_payload,
"tool": request.and_then(|req| {
req.tool.map(|tool| {
json!({
"callID": tool.call_id,
"messageID": tool.message_id
})
})
})
}),
));
}
#[derive(Debug, Clone, Copy)]
enum ChatHistoryProfile {
Full,
Standard,
Compact,
}
async fn load_chat_history(
storage: std::sync::Arc<Storage>,
session_id: &str,
profile: ChatHistoryProfile,
) -> Vec<ChatMessage> {
let Some(session) = storage.get_session(session_id).await else {
return Vec::new();
};
let messages = session
.messages
.into_iter()
.map(|m| {
let role = format!("{:?}", m.role).to_lowercase();
let content = m
.parts
.into_iter()
.map(|part| match part {
MessagePart::Text { text } => text,
MessagePart::Reasoning { text } => text,
MessagePart::ToolInvocation {
tool,
args,
result,
error,
} => summarize_tool_invocation_for_history(
&tool,
&args,
result.as_ref(),
error.as_deref(),
),
})
.collect::<Vec<_>>()
.join("\n");
ChatMessage {
role,
content,
attachments: Vec::new(),
}
})
.collect::<Vec<_>>();
compact_chat_history(messages, profile)
}
fn summarize_tool_invocation_for_history(
tool: &str,
args: &Value,
result: Option<&Value>,
error: Option<&str>,
) -> String {
let mut segments = vec![format!("Tool {tool}")];
if !args.is_null()
&& !args.as_object().is_some_and(|value| value.is_empty())
&& !args
.as_str()
.map(|value| value.trim().is_empty())
.unwrap_or(false)
{
segments.push(format!("args={args}"));
}
if let Some(error) = error.map(str::trim).filter(|value| !value.is_empty()) {
segments.push(format!("error={error}"));
}
if let Some(result) = result.filter(|value| !value.is_null()) {
segments.push(format!("result={result}"));
}
if segments.len() == 1 {
segments.push("result={}".to_string());
}
segments.join(" ")
}
fn attach_to_last_user_message(messages: &mut [ChatMessage], attachments: &[ChatAttachment]) {
if attachments.is_empty() {
return;
}
if let Some(message) = messages.iter_mut().rev().find(|m| m.role == "user") {
message.attachments = attachments.to_vec();
}
}
async fn build_runtime_attachments(
provider_id: &str,
parts: &[MessagePartInput],
) -> Vec<ChatAttachment> {
if !supports_image_attachments(provider_id) {
return Vec::new();
}
let mut attachments = Vec::new();
for part in parts {
let MessagePartInput::File { mime, url, .. } = part else {
continue;
};
if !mime.to_ascii_lowercase().starts_with("image/") {
continue;
}
if let Some(source_url) = normalize_attachment_source_url(url, mime).await {
attachments.push(ChatAttachment::ImageUrl { url: source_url });
}
}
attachments
}
fn supports_image_attachments(provider_id: &str) -> bool {
matches!(
provider_id,
"openai"
| "openrouter"
| "ollama"
| "groq"
| "mistral"
| "together"
| "azure"
| "bedrock"
| "vertex"
| "copilot"
)
}
async fn normalize_attachment_source_url(url: &str, mime: &str) -> Option<String> {
let trimmed = url.trim();
if trimmed.is_empty() {
return None;
}
if trimmed.starts_with("http://")
|| trimmed.starts_with("https://")
|| trimmed.starts_with("data:")
{
return Some(trimmed.to_string());
}
let file_path = trimmed
.strip_prefix("file://")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from(trimmed));
if !file_path.exists() {
return None;
}
let max_bytes = std::env::var("TANDEM_CHANNEL_MAX_ATTACHMENT_BYTES")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(20 * 1024 * 1024);
let bytes = match tokio::fs::read(&file_path).await {
Ok(bytes) => bytes,
Err(err) => {
tracing::warn!(
"failed reading local attachment '{}': {}",
file_path.to_string_lossy(),
err
);
return None;
}
};
if bytes.len() > max_bytes {
tracing::warn!(
"local attachment '{}' exceeds max bytes ({} > {})",
file_path.to_string_lossy(),
bytes.len(),
max_bytes
);
return None;
}
use base64::Engine as _;
let b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
Some(format!("data:{mime};base64,{b64}"))
}
struct ToolSideEventContext<'a> {
session_id: &'a str,
message_id: &'a str,
tool: &'a str,
args: &'a serde_json::Value,
metadata: &'a serde_json::Value,
workspace_root: Option<&'a str>,
effective_cwd: Option<&'a str>,
}
async fn emit_tool_side_events(
storage: std::sync::Arc<Storage>,
bus: &EventBus,
ctx: ToolSideEventContext<'_>,
) {
let ToolSideEventContext {
session_id,
message_id,
tool,
args,
metadata,
workspace_root,
effective_cwd,
} = ctx;
if tool == "todo_write" {
let todos_from_metadata = metadata
.get("todos")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
if !todos_from_metadata.is_empty() {
let _ = storage.set_todos(session_id, todos_from_metadata).await;
} else {
let current = storage.get_todos(session_id).await;
if let Some(updated) = apply_todo_updates_from_args(current, args) {
let _ = storage.set_todos(session_id, updated).await;
}
}
let normalized = storage.get_todos(session_id).await;
bus.publish(EngineEvent::new(
"todo.updated",
json!({
"sessionID": session_id,
"todos": normalized,
"workspaceRoot": workspace_root,
"effectiveCwd": effective_cwd
}),
));
}
if tool == "question" {
let questions = metadata
.get("questions")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
if questions.is_empty() {
tracing::warn!(
"question tool produced empty questions payload; skipping question.asked event session_id={} message_id={}",
session_id,
message_id
);
} else {
let request = storage
.add_question_request(session_id, message_id, questions.clone())
.await
.ok();
bus.publish(EngineEvent::new(
"question.asked",
json!({
"id": request
.as_ref()
.map(|req| req.id.clone())
.unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
"sessionID": session_id,
"messageID": message_id,
"questions": questions,
"tool": request.and_then(|req| {
req.tool.map(|tool| {
json!({
"callID": tool.call_id,
"messageID": tool.message_id
})
})
}),
"workspaceRoot": workspace_root,
"effectiveCwd": effective_cwd
}),
));
}
}
if let Some(events) = metadata.get("events").and_then(|v| v.as_array()) {
for event in events {
let Some(event_type) = event.get("type").and_then(|v| v.as_str()) else {
continue;
};
if !event_type.starts_with("agent_team.") {
continue;
}
let mut properties = event
.get("properties")
.and_then(|v| v.as_object())
.cloned()
.unwrap_or_default();
properties
.entry("sessionID".to_string())
.or_insert(json!(session_id));
properties
.entry("messageID".to_string())
.or_insert(json!(message_id));
properties
.entry("workspaceRoot".to_string())
.or_insert(json!(workspace_root));
properties
.entry("effectiveCwd".to_string())
.or_insert(json!(effective_cwd));
bus.publish(EngineEvent::new(event_type, Value::Object(properties)));
}
}
}
fn apply_todo_updates_from_args(current: Vec<Value>, args: &Value) -> Option<Vec<Value>> {
let obj = args.as_object()?;
let mut todos = current;
let mut changed = false;
if let Some(items) = obj.get("todos").and_then(|v| v.as_array()) {
for item in items {
let Some(item_obj) = item.as_object() else {
continue;
};
let status = item_obj
.get("status")
.and_then(|v| v.as_str())
.map(normalize_todo_status);
let target = item_obj
.get("task_id")
.or_else(|| item_obj.get("todo_id"))
.or_else(|| item_obj.get("id"));
if let (Some(status), Some(target)) = (status, target) {
changed |= apply_single_todo_status_update(&mut todos, target, &status);
}
}
}
let status = obj
.get("status")
.and_then(|v| v.as_str())
.map(normalize_todo_status);
let target = obj
.get("task_id")
.or_else(|| obj.get("todo_id"))
.or_else(|| obj.get("id"));
if let (Some(status), Some(target)) = (status, target) {
changed |= apply_single_todo_status_update(&mut todos, target, &status);
}
if changed {
Some(todos)
} else {
None
}
}
fn apply_single_todo_status_update(todos: &mut [Value], target: &Value, status: &str) -> bool {
let idx_from_value = match target {
Value::Number(n) => n.as_u64().map(|v| v.saturating_sub(1) as usize),
Value::String(s) => {
let trimmed = s.trim();
trimmed
.parse::<usize>()
.ok()
.map(|v| v.saturating_sub(1))
.or_else(|| {
let digits = trimmed
.chars()
.rev()
.take_while(|c| c.is_ascii_digit())
.collect::<String>()
.chars()
.rev()
.collect::<String>();
digits.parse::<usize>().ok().map(|v| v.saturating_sub(1))
})
}
_ => None,
};
if let Some(idx) = idx_from_value {
if idx < todos.len() {
if let Some(obj) = todos[idx].as_object_mut() {
obj.insert("status".to_string(), Value::String(status.to_string()));
return true;
}
}
}
let id_target = target.as_str().map(|s| s.trim()).filter(|s| !s.is_empty());
if let Some(id_target) = id_target {
for todo in todos.iter_mut() {
if let Some(obj) = todo.as_object_mut() {
if obj.get("id").and_then(|v| v.as_str()) == Some(id_target) {
obj.insert("status".to_string(), Value::String(status.to_string()));
return true;
}
}
}
}
false
}
fn normalize_todo_status(raw: &str) -> String {
match raw.trim().to_lowercase().as_str() {
"in_progress" | "inprogress" | "running" | "working" => "in_progress".to_string(),
"done" | "complete" | "completed" => "completed".to_string(),
"cancelled" | "canceled" | "aborted" | "skipped" => "cancelled".to_string(),
"open" | "todo" | "pending" => "pending".to_string(),
other => other.to_string(),
}
}
fn compact_chat_history(
messages: Vec<ChatMessage>,
profile: ChatHistoryProfile,
) -> Vec<ChatMessage> {
let (max_context_chars, keep_recent_messages) = match profile {
ChatHistoryProfile::Full => (usize::MAX, usize::MAX),
ChatHistoryProfile::Standard => (80_000usize, 40usize),
ChatHistoryProfile::Compact => (12_000usize, 12usize),
};
if messages.len() <= keep_recent_messages {
let total_chars = messages.iter().map(|m| m.content.len()).sum::<usize>();
if total_chars <= max_context_chars {
return messages;
}
}
let mut kept = messages;
let mut dropped_count = 0usize;
let mut total_chars = kept.iter().map(|m| m.content.len()).sum::<usize>();
while kept.len() > keep_recent_messages || total_chars > max_context_chars {
if kept.is_empty() {
break;
}
let removed = kept.remove(0);
total_chars = total_chars.saturating_sub(removed.content.len());
dropped_count += 1;
}
if dropped_count > 0 {
kept.insert(
0,
ChatMessage {
role: "system".to_string(),
content: format!(
"[history compacted: omitted {} older messages to fit context window]",
dropped_count
),
attachments: Vec::new(),
},
);
}
kept
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{EventBus, Storage};
use std::sync::{Mutex, OnceLock};
use tandem_types::Session;
use uuid::Uuid;
fn env_test_lock() -> std::sync::MutexGuard<'static, ()> {
static ENV_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
ENV_TEST_LOCK
.get_or_init(|| Mutex::new(()))
.lock()
.expect("env test lock")
}
#[tokio::test]
async fn todo_updated_event_is_normalized() {
let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
let session_id = session.id.clone();
storage.save_session(session).await.expect("save session");
let bus = EventBus::new();
let mut rx = bus.subscribe();
emit_tool_side_events(
storage.clone(),
&bus,
ToolSideEventContext {
session_id: &session_id,
message_id: "m1",
tool: "todo_write",
args: &json!({"todos":[{"content":"ship parity"}]}),
metadata: &json!({"todos":[{"content":"ship parity"}]}),
workspace_root: Some("."),
effective_cwd: Some("."),
},
)
.await;
let event = rx.recv().await.expect("event");
assert_eq!(event.event_type, "todo.updated");
let todos = event
.properties
.get("todos")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
assert_eq!(todos.len(), 1);
assert!(todos[0].get("id").and_then(|v| v.as_str()).is_some());
assert_eq!(
todos[0].get("content").and_then(|v| v.as_str()),
Some("ship parity")
);
assert!(todos[0].get("status").and_then(|v| v.as_str()).is_some());
}
#[tokio::test]
async fn question_asked_event_contains_tool_reference() {
let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
let session_id = session.id.clone();
storage.save_session(session).await.expect("save session");
let bus = EventBus::new();
let mut rx = bus.subscribe();
emit_tool_side_events(
storage,
&bus,
ToolSideEventContext {
session_id: &session_id,
message_id: "msg-1",
tool: "question",
args: &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
metadata: &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
workspace_root: Some("."),
effective_cwd: Some("."),
},
)
.await;
let event = rx.recv().await.expect("event");
assert_eq!(event.event_type, "question.asked");
assert_eq!(
event
.properties
.get("sessionID")
.and_then(|v| v.as_str())
.unwrap_or(""),
session_id
);
let tool = event
.properties
.get("tool")
.cloned()
.unwrap_or_else(|| json!({}));
assert!(tool.get("callID").and_then(|v| v.as_str()).is_some());
assert_eq!(
tool.get("messageID").and_then(|v| v.as_str()),
Some("msg-1")
);
}
#[test]
fn compact_chat_history_keeps_recent_and_inserts_summary() {
let mut messages = Vec::new();
for i in 0..60 {
messages.push(ChatMessage {
role: "user".to_string(),
content: format!("message-{i}"),
attachments: Vec::new(),
});
}
let compacted = compact_chat_history(messages, ChatHistoryProfile::Standard);
assert!(compacted.len() <= 41);
assert_eq!(compacted[0].role, "system");
assert!(compacted[0].content.contains("history compacted"));
assert!(compacted.iter().any(|m| m.content.contains("message-59")));
}
#[tokio::test]
async fn load_chat_history_preserves_tool_args_and_error_context() {
let base = std::env::temp_dir().join(format!(
"tandem-core-load-chat-history-error-{}",
uuid::Uuid::new_v4()
));
let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
let session = Session::new(Some("chat history".to_string()), Some(".".to_string()));
let session_id = session.id.clone();
storage.save_session(session).await.expect("save session");
let message = Message::new(
MessageRole::User,
vec![
MessagePart::Text {
text: "build the page".to_string(),
},
MessagePart::ToolInvocation {
tool: "write".to_string(),
args: json!({"path":"game.html","content":"<html>draft</html>"}),
result: None,
error: Some("WRITE_ARGS_EMPTY_FROM_PROVIDER".to_string()),
},
],
);
storage
.append_message(&session_id, message)
.await
.expect("append message");
let history = load_chat_history(storage, &session_id, ChatHistoryProfile::Standard).await;
let content = history
.iter()
.find(|message| message.role == "user")
.map(|message| message.content.clone())
.unwrap_or_default();
assert!(content.contains("build the page"));
assert!(content.contains("Tool write"));
assert!(content.contains(r#"args={"content":"<html>draft</html>","path":"game.html"}"#));
assert!(content.contains("error=WRITE_ARGS_EMPTY_FROM_PROVIDER"));
}
#[tokio::test]
async fn load_chat_history_preserves_tool_args_and_result_context() {
let base = std::env::temp_dir().join(format!(
"tandem-core-load-chat-history-result-{}",
uuid::Uuid::new_v4()
));
let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
let session = Session::new(Some("chat history".to_string()), Some(".".to_string()));
let session_id = session.id.clone();
storage.save_session(session).await.expect("save session");
let message = Message::new(
MessageRole::Assistant,
vec![MessagePart::ToolInvocation {
tool: "glob".to_string(),
args: json!({"pattern":"src/**/*.rs"}),
result: Some(json!({"output":"src/lib.rs\nsrc/main.rs"})),
error: None,
}],
);
storage
.append_message(&session_id, message)
.await
.expect("append message");
let history = load_chat_history(storage, &session_id, ChatHistoryProfile::Standard).await;
let content = history
.iter()
.find(|message| message.role == "assistant")
.map(|message| message.content.clone())
.unwrap_or_default();
assert!(content.contains("Tool glob"));
assert!(content.contains(r#"args={"pattern":"src/**/*.rs"}"#));
assert!(content.contains(r#"result={"output":"src/lib.rs\nsrc/main.rs"}"#));
}
#[test]
fn extracts_todos_from_checklist_and_numbered_lines() {
let input = r#"
Plan:
- [ ] Audit current implementation
- [ ] Add planner fallback
1. Add regression test coverage
"#;
let todos = extract_todo_candidates_from_text(input);
assert_eq!(todos.len(), 3);
assert_eq!(
todos[0].get("content").and_then(|v| v.as_str()),
Some("Audit current implementation")
);
}
#[test]
fn does_not_extract_todos_from_plain_prose_lines() {
let input = r#"
I need more information to proceed.
Can you tell me the event size and budget?
Once I have that, I can provide a detailed plan.
"#;
let todos = extract_todo_candidates_from_text(input);
assert!(todos.is_empty());
}
#[test]
fn parses_wrapped_tool_call_from_markdown_response() {
let input = r#"
Here is the tool call:
```json
{"tool_call":{"name":"todo_write","arguments":{"todos":[{"content":"a"}]}}}
```
"#;
let parsed = parse_tool_invocation_from_response(input).expect("tool call");
assert_eq!(parsed.0, "todo_write");
assert!(parsed.1.get("todos").is_some());
}
#[test]
fn parses_top_level_name_args_tool_call() {
let input = r#"{"name":"bash","args":{"command":"echo hi"}}"#;
let parsed = parse_tool_invocation_from_response(input).expect("top-level tool call");
assert_eq!(parsed.0, "bash");
assert_eq!(
parsed.1.get("command").and_then(|v| v.as_str()),
Some("echo hi")
);
}
#[test]
fn parses_function_style_todowrite_call() {
let input = r#"Status: Completed
Call: todowrite(task_id=2, status="completed")"#;
let parsed = parse_tool_invocation_from_response(input).expect("function-style tool call");
assert_eq!(parsed.0, "todo_write");
assert_eq!(parsed.1.get("task_id").and_then(|v| v.as_i64()), Some(2));
assert_eq!(
parsed.1.get("status").and_then(|v| v.as_str()),
Some("completed")
);
}
#[test]
fn parses_multiple_function_style_todowrite_calls() {
let input = r#"
Call: todowrite(task_id=2, status="completed")
Call: todowrite(task_id=3, status="in_progress")
"#;
let parsed = parse_tool_invocations_from_response(input);
assert_eq!(parsed.len(), 2);
assert_eq!(parsed[0].0, "todo_write");
assert_eq!(parsed[0].1.get("task_id").and_then(|v| v.as_i64()), Some(2));
assert_eq!(
parsed[0].1.get("status").and_then(|v| v.as_str()),
Some("completed")
);
assert_eq!(parsed[1].1.get("task_id").and_then(|v| v.as_i64()), Some(3));
assert_eq!(
parsed[1].1.get("status").and_then(|v| v.as_str()),
Some("in_progress")
);
}
#[test]
fn applies_todo_status_update_from_task_id_args() {
let current = vec![
json!({"id":"todo-1","content":"a","status":"pending"}),
json!({"id":"todo-2","content":"b","status":"pending"}),
json!({"id":"todo-3","content":"c","status":"pending"}),
];
let updated =
apply_todo_updates_from_args(current, &json!({"task_id":2, "status":"completed"}))
.expect("status update");
assert_eq!(
updated[1].get("status").and_then(|v| v.as_str()),
Some("completed")
);
}
#[test]
fn normalizes_todo_write_tasks_alias() {
let normalized = normalize_todo_write_args(
json!({"tasks":[{"title":"Book venue"},{"name":"Send invites"}]}),
"",
);
let todos = normalized
.get("todos")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
assert_eq!(todos.len(), 2);
assert_eq!(
todos[0].get("content").and_then(|v| v.as_str()),
Some("Book venue")
);
assert_eq!(
todos[1].get("content").and_then(|v| v.as_str()),
Some("Send invites")
);
}
#[test]
fn normalizes_todo_write_from_completion_when_args_empty() {
let completion = "Plan:\n1. Secure venue\n2. Create playlist\n3. Send invites";
let normalized = normalize_todo_write_args(json!({}), completion);
let todos = normalized
.get("todos")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
assert_eq!(todos.len(), 3);
assert!(!is_empty_todo_write_args(&normalized));
}
#[test]
fn empty_todo_write_args_allows_status_updates() {
let args = json!({"task_id": 2, "status":"completed"});
assert!(!is_empty_todo_write_args(&args));
}
#[test]
fn streamed_websearch_args_fallback_to_query_string() {
let parsed = parse_streamed_tool_args("websearch", "meaning of life");
assert_eq!(
parsed.get("query").and_then(|v| v.as_str()),
Some("meaning of life")
);
}
#[test]
fn parse_scalar_like_value_handles_single_quote_character_without_panicking() {
assert_eq!(
parse_scalar_like_value("\""),
Value::String("\"".to_string())
);
assert_eq!(parse_scalar_like_value("'"), Value::String("'".to_string()));
}
#[test]
fn streamed_websearch_stringified_json_args_are_unwrapped() {
let parsed = parse_streamed_tool_args("websearch", r#""donkey gestation period""#);
assert_eq!(
parsed.get("query").and_then(|v| v.as_str()),
Some("donkey gestation period")
);
}
#[test]
fn streamed_websearch_args_strip_arg_key_value_wrappers() {
let parsed = parse_streamed_tool_args(
"websearch",
"query</arg_key><arg_value>taj card what is it benefits how to apply</arg_value>",
);
assert_eq!(
parsed.get("query").and_then(|v| v.as_str()),
Some("taj card what is it benefits how to apply")
);
}
#[test]
fn normalize_tool_args_websearch_infers_from_user_text() {
let normalized =
normalize_tool_args("websearch", json!({}), "web search meaning of life", "");
assert_eq!(
normalized.args.get("query").and_then(|v| v.as_str()),
Some("meaning of life")
);
assert_eq!(normalized.args_source, "inferred_from_user");
assert_eq!(normalized.args_integrity, "recovered");
}
#[test]
fn normalize_tool_args_websearch_keeps_existing_query() {
let normalized = normalize_tool_args(
"websearch",
json!({"query":"already set"}),
"web search should not override",
"",
);
assert_eq!(
normalized.args.get("query").and_then(|v| v.as_str()),
Some("already set")
);
assert_eq!(normalized.args_source, "provider_json");
assert_eq!(normalized.args_integrity, "ok");
}
#[test]
fn normalize_tool_args_websearch_fails_when_unrecoverable() {
let normalized = normalize_tool_args("websearch", json!({}), "search", "");
assert!(normalized.query.is_none());
assert!(normalized.missing_terminal);
assert_eq!(normalized.args_source, "missing");
assert_eq!(normalized.args_integrity, "empty");
}
#[test]
fn normalize_tool_args_webfetch_infers_url_from_user_prompt() {
let normalized = normalize_tool_args(
"webfetch",
json!({}),
"Please fetch `https://tandem.frumu.ai/docs/` in markdown mode",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("url").and_then(|v| v.as_str()),
Some("https://tandem.frumu.ai/docs/")
);
assert_eq!(normalized.args_source, "inferred_from_user");
assert_eq!(normalized.args_integrity, "recovered");
}
#[test]
fn normalize_tool_args_webfetch_recovers_nested_url_alias() {
let normalized = normalize_tool_args(
"webfetch",
json!({"args":{"uri":"https://example.com/page"}}),
"",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("url").and_then(|v| v.as_str()),
Some("https://example.com/page")
);
assert_eq!(normalized.args_source, "provider_json");
}
#[test]
fn normalize_tool_args_webfetch_fails_when_url_unrecoverable() {
let normalized = normalize_tool_args("webfetch", json!({}), "fetch the site", "");
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("WEBFETCH_URL_MISSING")
);
}
#[test]
fn normalize_tool_args_pack_builder_infers_goal_from_user_prompt() {
let user_text =
"Create a pack that checks latest headline news every day at 8 AM and emails me.";
let normalized = normalize_tool_args("pack_builder", json!({}), user_text, "");
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("goal").and_then(|v| v.as_str()),
Some(user_text)
);
assert_eq!(
normalized.args.get("mode").and_then(|v| v.as_str()),
Some("preview")
);
assert_eq!(normalized.args_source, "inferred_from_user");
assert_eq!(normalized.args_integrity, "recovered");
}
#[test]
fn normalize_tool_args_pack_builder_keeps_existing_goal_and_mode() {
let normalized = normalize_tool_args(
"pack_builder",
json!({"mode":"apply","goal":"existing goal","plan_id":"plan-1"}),
"new goal should not override",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("goal").and_then(|v| v.as_str()),
Some("existing goal")
);
assert_eq!(
normalized.args.get("mode").and_then(|v| v.as_str()),
Some("apply")
);
assert_eq!(normalized.args_source, "provider_json");
assert_eq!(normalized.args_integrity, "ok");
}
#[test]
fn normalize_tool_args_pack_builder_confirm_reuses_plan_from_context() {
let assistant_context =
"Pack Builder Preview\n- Plan ID: plan-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
let normalized =
normalize_tool_args("pack_builder", json!({}), "confirm", assistant_context);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("mode").and_then(|v| v.as_str()),
Some("apply")
);
assert_eq!(
normalized.args.get("plan_id").and_then(|v| v.as_str()),
Some("plan-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")
);
assert_eq!(
normalized
.args
.get("approve_pack_install")
.and_then(|v| v.as_bool()),
Some(true)
);
assert_eq!(normalized.args_source, "recovered_from_context");
}
#[test]
fn normalize_tool_args_pack_builder_apply_recovers_missing_plan_id() {
let assistant_context =
"{\"mode\":\"preview\",\"plan_id\":\"plan-11111111-2222-3333-4444-555555555555\"}";
let normalized = normalize_tool_args(
"pack_builder",
json!({"mode":"apply"}),
"yes",
assistant_context,
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("mode").and_then(|v| v.as_str()),
Some("apply")
);
assert_eq!(
normalized.args.get("plan_id").and_then(|v| v.as_str()),
Some("plan-11111111-2222-3333-4444-555555555555")
);
}
#[test]
fn normalize_tool_args_pack_builder_short_new_goal_does_not_force_apply() {
let assistant_context =
"Pack Builder Preview\n- Plan ID: plan-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
let normalized = normalize_tool_args(
"pack_builder",
json!({}),
"create jira sync",
assistant_context,
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("mode").and_then(|v| v.as_str()),
Some("preview")
);
assert_eq!(
normalized.args.get("goal").and_then(|v| v.as_str()),
Some("create jira sync")
);
}
#[test]
fn normalize_tool_args_write_requires_path() {
let normalized = normalize_tool_args("write", json!({}), "", "");
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("FILE_PATH_MISSING")
);
}
#[test]
fn persisted_failed_tool_args_prefers_normalized_when_raw_is_empty() {
let args = persisted_failed_tool_args(
&json!({}),
&json!({"path":"game.html","content":"<html></html>"}),
);
assert_eq!(args["path"], "game.html");
assert_eq!(args["content"], "<html></html>");
}
#[test]
fn persisted_failed_tool_args_keeps_non_empty_raw_payload() {
let args = persisted_failed_tool_args(
&json!("path=game.html content"),
&json!({"path":"game.html"}),
);
assert_eq!(args, json!("path=game.html content"));
}
#[test]
fn normalize_tool_args_write_recovers_alias_path_key() {
let normalized = normalize_tool_args(
"write",
json!({"filePath":"docs/CONCEPT.md","content":"hello"}),
"",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("docs/CONCEPT.md")
);
assert_eq!(
normalized.args.get("content").and_then(|v| v.as_str()),
Some("hello")
);
}
#[test]
fn normalize_tool_args_write_recovers_html_output_target_path() {
let normalized = normalize_tool_args_with_mode(
"write",
json!({"content":"<html></html>"}),
"Execute task.\n\nRequired output target:\n{\n \"path\": \"game.html\",\n \"kind\": \"source\",\n \"operation\": \"create_or_update\"\n}\n",
"",
WritePathRecoveryMode::OutputTargetOnly,
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("game.html")
);
}
#[test]
fn normalize_tool_args_read_infers_path_from_user_prompt() {
let normalized = normalize_tool_args(
"read",
json!({}),
"Please inspect `FEATURE_LIST.md` and summarize key sections.",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("FEATURE_LIST.md")
);
assert_eq!(normalized.args_source, "inferred_from_user");
assert_eq!(normalized.args_integrity, "recovered");
}
#[test]
fn normalize_tool_args_read_does_not_infer_path_from_assistant_context() {
let normalized = normalize_tool_args(
"read",
json!({}),
"generic instruction",
"I will read src-tauri/src/orchestrator/engine.rs first.",
);
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("FILE_PATH_MISSING")
);
}
#[test]
fn normalize_tool_args_write_recovers_path_from_nested_array_payload() {
let normalized = normalize_tool_args(
"write",
json!({"args":[{"file_path":"docs/CONCEPT.md"}],"content":"hello"}),
"",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("docs/CONCEPT.md")
);
}
#[test]
fn normalize_tool_args_write_recovers_content_alias() {
let normalized = normalize_tool_args(
"write",
json!({"path":"docs/FEATURES.md","body":"feature notes"}),
"",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("content").and_then(|v| v.as_str()),
Some("feature notes")
);
}
#[test]
fn normalize_tool_args_write_fails_when_content_missing() {
let normalized = normalize_tool_args("write", json!({"path":"docs/FEATURES.md"}), "", "");
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("WRITE_CONTENT_MISSING")
);
}
#[test]
fn normalize_tool_args_write_output_target_only_rejects_freeform_guess() {
let normalized = normalize_tool_args_with_mode(
"write",
json!({}),
"Please implement the screen/state structure in the workspace.",
"",
WritePathRecoveryMode::OutputTargetOnly,
);
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("FILE_PATH_MISSING")
);
}
#[test]
fn normalize_tool_args_write_recovers_content_from_assistant_context() {
let normalized = normalize_tool_args(
"write",
json!({"path":"docs/FEATURES.md"}),
"",
"## Features\n\n- Neon arcade gameplay\n- Single-file HTML structure\n",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("docs/FEATURES.md")
);
assert_eq!(
normalized.args.get("content").and_then(|v| v.as_str()),
Some("## Features\n\n- Neon arcade gameplay\n- Single-file HTML structure")
);
assert_eq!(normalized.args_source, "recovered_from_context");
assert_eq!(normalized.args_integrity, "recovered");
}
#[test]
fn normalize_tool_args_write_recovers_raw_nested_string_content() {
let normalized = normalize_tool_args(
"write",
json!({"path":"docs/FEATURES.md","args":"Line 1\nLine 2"}),
"",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("docs/FEATURES.md")
);
assert_eq!(
normalized.args.get("content").and_then(|v| v.as_str()),
Some("Line 1\nLine 2")
);
}
#[test]
fn normalize_tool_args_write_does_not_treat_path_as_content() {
let normalized = normalize_tool_args("write", json!("docs/FEATURES.md"), "", "");
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("WRITE_CONTENT_MISSING")
);
}
#[test]
fn normalize_tool_args_gmail_send_email_omits_empty_attachment() {
let normalized = normalize_tool_args(
"mcp.composio_1.gmail_send_email",
json!({
"to": "user123@example.com",
"subject": "Test",
"body": "Hello",
"attachment": {
"s3key": ""
}
}),
"",
"",
);
assert!(normalized.args.get("attachment").is_none());
assert_eq!(normalized.args_source, "sanitized_attachment");
}
#[test]
fn normalize_tool_args_gmail_send_email_keeps_valid_attachment() {
let normalized = normalize_tool_args(
"mcp.composio_1.gmail_send_email",
json!({
"to": "user123@example.com",
"subject": "Test",
"body": "Hello",
"attachment": {
"s3key": "file_123"
}
}),
"",
"",
);
assert_eq!(
normalized
.args
.get("attachment")
.and_then(|value| value.get("s3key"))
.and_then(|value| value.as_str()),
Some("file_123")
);
}
#[test]
fn classify_required_tool_failure_detects_empty_provider_write_args() {
let reason = classify_required_tool_failure(
&[String::from("WRITE_ARGS_EMPTY_FROM_PROVIDER")],
true,
1,
false,
false,
);
assert_eq!(reason, RequiredToolFailureKind::WriteArgsEmptyFromProvider);
}
#[test]
fn normalize_tool_args_read_infers_path_from_bold_markdown() {
let normalized = normalize_tool_args(
"read",
json!({}),
"Please read **FEATURE_LIST.md** and summarize.",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("FEATURE_LIST.md")
);
}
#[test]
fn normalize_tool_args_shell_infers_command_from_user_prompt() {
let normalized = normalize_tool_args("bash", json!({}), "Run `rg -n \"TODO\" .`", "");
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("command").and_then(|v| v.as_str()),
Some("rg -n \"TODO\" .")
);
assert_eq!(normalized.args_source, "inferred_from_user");
assert_eq!(normalized.args_integrity, "recovered");
}
#[test]
fn normalize_tool_args_read_rejects_root_only_path() {
let normalized = normalize_tool_args("read", json!({"path":"/"}), "", "");
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("FILE_PATH_MISSING")
);
}
#[test]
fn normalize_tool_args_read_recovers_when_provider_path_is_root_only() {
let normalized =
normalize_tool_args("read", json!({"path":"/"}), "Please open `CONCEPT.md`", "");
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("CONCEPT.md")
);
assert_eq!(normalized.args_source, "inferred_from_user");
assert_eq!(normalized.args_integrity, "recovered");
}
#[test]
fn normalize_tool_args_read_rejects_tool_call_markup_path() {
let normalized = normalize_tool_args(
"read",
json!({
"path":"<tool_call>\n<function=glob>\n<parameter=pattern>**/*</parameter>\n</function>\n</tool_call>"
}),
"",
"",
);
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("FILE_PATH_MISSING")
);
}
#[test]
fn normalize_tool_args_read_rejects_glob_pattern_path() {
let normalized = normalize_tool_args("read", json!({"path":"**/*"}), "", "");
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("FILE_PATH_MISSING")
);
}
#[test]
fn normalize_tool_args_read_rejects_placeholder_path() {
let normalized = normalize_tool_args("read", json!({"path":"files/directories"}), "", "");
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("FILE_PATH_MISSING")
);
}
#[test]
fn normalize_tool_args_read_rejects_tool_policy_placeholder_path() {
let normalized = normalize_tool_args("read", json!({"path":"tool/policy"}), "", "");
assert!(normalized.missing_terminal);
assert_eq!(
normalized.missing_terminal_reason.as_deref(),
Some("FILE_PATH_MISSING")
);
}
#[test]
fn normalize_tool_args_read_recovers_pdf_path_from_user_text() {
let normalized = normalize_tool_args(
"read",
json!({"path":"tool/policy"}),
"Read `T1011U kitöltési útmutató.pdf` and summarize.",
"",
);
assert!(!normalized.missing_terminal);
assert_eq!(
normalized.args.get("path").and_then(|v| v.as_str()),
Some("T1011U kitöltési útmutató.pdf")
);
assert_eq!(normalized.args_source, "inferred_from_user");
assert_eq!(normalized.args_integrity, "recovered");
}
#[test]
fn normalize_tool_name_strips_default_api_namespace() {
assert_eq!(normalize_tool_name("default_api:read"), "read");
assert_eq!(normalize_tool_name("functions.shell"), "bash");
}
#[test]
fn mcp_server_from_tool_name_parses_server_segment() {
assert_eq!(
mcp_server_from_tool_name("mcp.arcade.jira_getboards"),
Some("arcade")
);
assert_eq!(mcp_server_from_tool_name("read"), None);
assert_eq!(mcp_server_from_tool_name("mcp"), None);
}
#[test]
fn batch_helpers_use_name_when_tool_is_wrapper() {
let args = json!({
"tool_calls":[
{"tool":"default_api","name":"read","args":{"path":"CONCEPT.md"}},
{"tool":"default_api:glob","args":{"pattern":"*.md"}}
]
});
let calls = extract_batch_calls(&args);
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].0, "read");
assert_eq!(calls[1].0, "glob");
assert!(is_read_only_batch_call(&args));
let sig = batch_tool_signature(&args).unwrap_or_default();
assert!(sig.contains("read:"));
assert!(sig.contains("glob:"));
}
#[test]
fn batch_helpers_resolve_nested_function_name() {
let args = json!({
"tool_calls":[
{"tool":"default_api","function":{"name":"read"},"args":{"path":"CONCEPT.md"}}
]
});
let calls = extract_batch_calls(&args);
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, "read");
assert!(is_read_only_batch_call(&args));
}
#[test]
fn batch_output_classifier_detects_non_productive_unknown_results() {
let output = r#"
[
{"tool":"default_api","output":"Unknown tool: default_api","metadata":{}},
{"tool":"default_api","output":"Unknown tool: default_api","metadata":{}}
]
"#;
assert!(is_non_productive_batch_output(output));
}
#[test]
fn runtime_prompt_includes_execution_environment_block() {
let prompt = tandem_runtime_system_prompt(
&HostRuntimeContext {
os: HostOs::Windows,
arch: "x86_64".to_string(),
shell_family: ShellFamily::Powershell,
path_style: PathStyle::Windows,
},
&[],
);
assert!(prompt.contains("[Execution Environment]"));
assert!(prompt.contains("Host OS: windows"));
assert!(prompt.contains("Shell: powershell"));
assert!(prompt.contains("Path style: windows"));
}
#[test]
fn runtime_prompt_includes_connected_integrations_block() {
let prompt = tandem_runtime_system_prompt(
&HostRuntimeContext {
os: HostOs::Linux,
arch: "x86_64".to_string(),
shell_family: ShellFamily::Posix,
path_style: PathStyle::Posix,
},
&["notion".to_string(), "github".to_string()],
);
assert!(prompt.contains("[Connected Integrations]"));
assert!(prompt.contains("- notion"));
assert!(prompt.contains("- github"));
}
#[test]
fn detects_web_research_prompt_keywords() {
assert!(requires_web_research_prompt(
"research todays top news stories and include links"
));
assert!(!requires_web_research_prompt(
"say hello and summarize this text"
));
}
#[test]
fn detects_email_delivery_prompt_keywords() {
assert!(requires_email_delivery_prompt(
"send a full report with links to user123@example.com"
));
assert!(!requires_email_delivery_prompt("draft a summary for later"));
}
#[test]
fn completion_claim_detector_flags_sent_language() {
assert!(completion_claims_email_sent(
"Email Status: Sent to user123@example.com."
));
assert!(!completion_claims_email_sent(
"I could not send email in this run."
));
}
#[test]
fn email_tool_detector_finds_mcp_gmail_tools() {
let schemas = vec![
ToolSchema {
name: "read".to_string(),
description: String::new(),
input_schema: json!({}),
},
ToolSchema {
name: "mcp.composio.gmail_send_email".to_string(),
description: String::new(),
input_schema: json!({}),
},
];
assert!(has_email_action_tools(&schemas));
}
#[test]
fn extract_mcp_auth_required_metadata_parses_expected_shape() {
let metadata = json!({
"server": "arcade",
"mcpAuth": {
"required": true,
"challengeId": "abc123",
"authorizationUrl": "https://example.com/oauth",
"message": "Authorize first",
"pending": true,
"blocked": true,
"retryAfterMs": 8000
}
});
let parsed = extract_mcp_auth_required_metadata(&metadata).expect("expected metadata");
assert_eq!(parsed.challenge_id, "abc123");
assert_eq!(parsed.authorization_url, "https://example.com/oauth");
assert_eq!(parsed.message, "Authorize first");
assert_eq!(parsed.server.as_deref(), Some("arcade"));
assert!(parsed.pending);
assert!(parsed.blocked);
assert_eq!(parsed.retry_after_ms, Some(8000));
}
#[test]
fn auth_required_output_detector_matches_auth_text() {
assert!(is_auth_required_tool_output(
"Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com"
));
assert!(is_auth_required_tool_output(
"Authorization pending for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com\nRetry after 8s."
));
assert!(!is_auth_required_tool_output("Tool `read` result: ok"));
}
#[test]
fn productive_tool_output_detector_rejects_missing_terminal_write_errors() {
assert!(!is_productive_tool_output("write", "WRITE_CONTENT_MISSING"));
assert!(!is_productive_tool_output("write", "FILE_PATH_MISSING"));
assert!(!is_productive_tool_output(
"write",
"Tool `write` result:\nWRITE_CONTENT_MISSING"
));
assert!(!is_productive_tool_output(
"edit",
"Tool `edit` result:\nFILE_PATH_MISSING"
));
assert!(!is_productive_tool_output(
"write",
"Tool `write` result:\ninvalid_function_parameters"
));
}
#[test]
fn productive_tool_output_detector_accepts_real_tool_results() {
assert!(is_productive_tool_output(
"write",
"Tool `write` result:\nWrote /tmp/probe.html"
));
assert!(!is_productive_tool_output(
"write",
"Authorization required for `write`.\nAuthorize here: https://example.com"
));
}
#[test]
fn guard_budget_output_detector_matches_expected_text() {
assert!(is_guard_budget_tool_output(
"Tool `mcp.arcade.gmail_sendemail` call skipped: per-run guard budget exceeded (10)."
));
assert!(!is_guard_budget_tool_output("Tool `read` result: ok"));
}
#[test]
fn summarize_guard_budget_outputs_returns_run_scoped_message() {
let outputs = vec![
"Tool `mcp.arcade.gmail_sendemail` call skipped: per-run guard budget exceeded (10)."
.to_string(),
"Tool `mcp.arcade.jira_getboards` call skipped: per-run guard budget exceeded (10)."
.to_string(),
];
let summary = summarize_guard_budget_outputs(&outputs).expect("expected summary");
assert!(summary.contains("per-run tool guard budget"));
assert!(summary.contains("fresh run"));
}
#[test]
fn duplicate_signature_output_detector_matches_expected_text() {
assert!(is_duplicate_signature_limit_output(
"Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
));
assert!(!is_duplicate_signature_limit_output(
"Tool `read` result: ok"
));
}
#[test]
fn summarize_duplicate_signature_outputs_returns_run_scoped_message() {
let outputs = vec![
"Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
.to_string(),
"Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
.to_string(),
];
let summary =
summarize_duplicate_signature_outputs(&outputs).expect("expected duplicate summary");
assert!(summary.contains("same tool call kept repeating"));
assert!(summary.contains("clearer command target"));
}
#[test]
fn required_tool_mode_unsatisfied_completion_includes_marker() {
let message =
required_tool_mode_unsatisfied_completion(RequiredToolFailureKind::NoToolCallEmitted);
assert!(message.contains(REQUIRED_TOOL_MODE_UNSATISFIED_REASON));
assert!(message.contains("NO_TOOL_CALL_EMITTED"));
assert!(message.contains("tool_mode=required"));
}
#[test]
fn post_tool_final_narrative_generation_is_allowed_after_required_tools_succeed() {
assert!(should_generate_post_tool_final_narrative(
ToolMode::Required,
1
));
assert!(!should_generate_post_tool_final_narrative(
ToolMode::Required,
0
));
assert!(should_generate_post_tool_final_narrative(ToolMode::Auto, 0));
}
#[test]
fn post_tool_final_narrative_prompt_preserves_structured_response_requirements() {
let prompt = build_post_tool_final_narrative_prompt(&[String::from(
"Tool `glob` result:\n/home/user123/marketing-tandem/tandem-reference/SOURCES.md",
)]);
assert!(prompt.contains("Preserve any requested output contract"));
assert!(prompt.contains("required JSON structure"));
assert!(prompt.contains("required handoff fields"));
assert!(prompt.contains("required final status object"));
assert!(prompt.contains("Do not stop at a tool summary"));
}
#[test]
fn required_tool_retry_context_mentions_offered_tools() {
let prompt = build_required_tool_retry_context(
"read, write, apply_patch",
RequiredToolFailureKind::ToolCallInvalidArgs,
);
assert!(prompt.contains("Tool access is mandatory"));
assert!(prompt.contains("TOOL_CALL_INVALID_ARGS"));
assert!(prompt.contains("full `content`"));
assert!(prompt.contains("write, edit, or apply_patch"));
}
#[test]
fn required_tool_retry_context_requires_write_after_read_only_pass() {
let prompt = build_required_tool_retry_context(
"glob, read, write, edit, apply_patch",
RequiredToolFailureKind::WriteRequiredNotSatisfied,
);
assert!(prompt.contains("WRITE_REQUIRED_NOT_SATISFIED"));
assert!(prompt.contains("Inspection is complete"));
assert!(prompt.contains("write, edit, or apply_patch"));
}
#[test]
fn classify_required_tool_failure_detects_invalid_args() {
let reason = classify_required_tool_failure(
&[String::from("WRITE_CONTENT_MISSING")],
true,
1,
false,
false,
);
assert_eq!(reason, RequiredToolFailureKind::ToolCallInvalidArgs);
}
#[test]
fn looks_like_unparsed_tool_payload_detects_tool_call_json() {
assert!(looks_like_unparsed_tool_payload(
r#"{"content":[{"type":"tool_call","name":"write"}]}"#
));
assert!(!looks_like_unparsed_tool_payload("Updated README.md"));
}
#[test]
fn workspace_write_tool_detection_is_limited_to_mutations() {
assert!(is_workspace_write_tool("write"));
assert!(is_workspace_write_tool("edit"));
assert!(is_workspace_write_tool("apply_patch"));
assert!(!is_workspace_write_tool("read"));
assert!(!is_workspace_write_tool("glob"));
}
#[test]
fn proactive_write_gate_applies_only_before_prewrite_is_satisfied() {
assert!(should_gate_write_until_prewrite_satisfied(true, 0, false));
assert!(!should_gate_write_until_prewrite_satisfied(true, 1, false));
assert!(!should_gate_write_until_prewrite_satisfied(true, 0, true));
assert!(!should_gate_write_until_prewrite_satisfied(false, 0, false));
}
#[test]
fn prewrite_repair_can_start_before_any_write_attempt() {
assert!(should_start_prewrite_repair_before_first_write(
true, 0, false
));
assert!(!should_start_prewrite_repair_before_first_write(
true, 0, true
));
assert!(!should_start_prewrite_repair_before_first_write(
false, 0, false
));
}
#[test]
fn prewrite_repair_does_not_fire_after_first_write() {
assert!(!should_start_prewrite_repair_before_first_write(
true, 1, false
));
assert!(!should_start_prewrite_repair_before_first_write(
true, 2, false
));
}
#[test]
fn write_tool_removed_after_first_productive_write() {
let mut offered = vec!["glob", "read", "websearch", "write", "edit"];
let repair_on_unmet_requirements = true;
let productive_write_tool_calls_total = 1usize;
if repair_on_unmet_requirements && productive_write_tool_calls_total >= 3 {
offered.retain(|tool| !is_workspace_write_tool(tool));
}
assert_eq!(offered, vec!["glob", "read", "websearch", "write", "edit"]);
}
#[test]
fn write_tool_removed_after_third_productive_write() {
let mut offered = vec!["glob", "read", "websearch", "write", "edit"];
let repair_on_unmet_requirements = true;
let productive_write_tool_calls_total = 3usize;
if repair_on_unmet_requirements && productive_write_tool_calls_total >= 3 {
offered.retain(|tool| !is_workspace_write_tool(tool));
}
assert_eq!(offered, vec!["glob", "read", "websearch"]);
}
#[test]
fn force_write_only_retry_disabled_for_prewrite_repair_nodes() {
let requested_write_required = true;
let required_write_retry_count = 1usize;
let productive_write_tool_calls_total = 0usize;
let prewrite_satisfied = true;
let prewrite_gate_write = false;
let repair_on_unmet_requirements = true;
let force_write_only_retry = requested_write_required
&& required_write_retry_count > 0
&& (productive_write_tool_calls_total == 0 || prewrite_satisfied)
&& !prewrite_gate_write
&& !repair_on_unmet_requirements;
assert!(!force_write_only_retry);
}
#[test]
fn infer_required_output_target_path_reads_prompt_json_block() {
let prompt = r#"Execute task.
Required output target:
{
"path": "src/game.html",
"kind": "source",
"operation": "create"
}
"#;
assert_eq!(
infer_required_output_target_path_from_text(prompt).as_deref(),
Some("src/game.html")
);
}
#[test]
fn infer_required_output_target_path_accepts_extensionless_target() {
let prompt = r#"Execute task.
Required output target:
{
"path": "Dockerfile",
"kind": "source",
"operation": "create"
}
"#;
assert_eq!(
infer_required_output_target_path_from_text(prompt).as_deref(),
Some("Dockerfile")
);
}
#[test]
fn infer_write_file_path_from_text_rejects_workspace_root() {
let prompt = "Workspace: /home/user123/game\nCreate the scaffold in the workspace now.";
assert_eq!(infer_write_file_path_from_text(prompt), None);
}
#[test]
fn duplicate_signature_limit_defaults_to_200_for_all_tools() {
let _guard = env_test_lock();
unsafe {
std::env::remove_var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT");
}
assert_eq!(duplicate_signature_limit_for("pack_builder"), 200);
assert_eq!(duplicate_signature_limit_for("bash"), 200);
assert_eq!(duplicate_signature_limit_for("write"), 200);
}
#[test]
fn parse_streamed_tool_args_preserves_unparseable_write_payload() {
let parsed = parse_streamed_tool_args("write", "path=game.html content");
assert_ne!(parsed, json!({}));
}
#[test]
fn parse_streamed_tool_args_preserves_large_write_payload() {
let content = "x".repeat(4096);
let raw_args = format!(r#"{{"path":"game.html","content":"{}"}}"#, content);
let parsed = parse_streamed_tool_args("write", &raw_args);
assert_eq!(
parsed.get("path").and_then(|value| value.as_str()),
Some("game.html")
);
assert_eq!(
parsed.get("content").and_then(|value| value.as_str()),
Some(content.as_str())
);
}
#[test]
fn parse_streamed_tool_args_recovers_truncated_write_json() {
let raw_args = concat!(
r#"{"path":"game.html","allow_empty":false,"content":"<!DOCTYPE html>\n"#,
r#"<html lang=\"en\"><body>Neon Drift"#
);
let parsed = parse_streamed_tool_args("write", raw_args);
assert_eq!(
parsed,
json!({
"path": "game.html",
"content": "<!DOCTYPE html>\n<html lang=\"en\"><body>Neon Drift"
})
);
}
#[test]
fn parse_streamed_tool_args_recovers_truncated_write_json_without_path() {
let raw_args = concat!(
r#"{"allow_empty":false,"content":"<!DOCTYPE html>\n"#,
r#"<html lang=\"en\"><body>Neon Drift"#
);
let parsed = parse_streamed_tool_args("write", raw_args);
assert_eq!(parsed.get("path"), None);
assert_eq!(
parsed.get("content").and_then(|value| value.as_str()),
Some("<!DOCTYPE html>\n<html lang=\"en\"><body>Neon Drift")
);
}
#[test]
fn duplicate_signature_limit_env_override_respects_minimum_floor() {
let _guard = env_test_lock();
unsafe {
std::env::set_var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT", "9");
}
assert_eq!(duplicate_signature_limit_for("write"), 200);
assert_eq!(duplicate_signature_limit_for("bash"), 200);
unsafe {
std::env::set_var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT", "250");
}
assert_eq!(duplicate_signature_limit_for("bash"), 250);
unsafe {
std::env::remove_var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT");
}
}
#[test]
fn websearch_duplicate_signature_limit_is_unset_by_default() {
let _guard = env_test_lock();
unsafe {
std::env::remove_var("TANDEM_WEBSEARCH_DUPLICATE_SIGNATURE_LIMIT");
}
assert_eq!(websearch_duplicate_signature_limit(), None);
}
#[test]
fn websearch_duplicate_signature_limit_reads_env() {
let _guard = env_test_lock();
unsafe {
std::env::set_var("TANDEM_WEBSEARCH_DUPLICATE_SIGNATURE_LIMIT", "5");
}
assert_eq!(websearch_duplicate_signature_limit(), Some(200));
unsafe {
std::env::set_var("TANDEM_WEBSEARCH_DUPLICATE_SIGNATURE_LIMIT", "300");
}
assert_eq!(websearch_duplicate_signature_limit(), Some(300));
unsafe {
std::env::remove_var("TANDEM_WEBSEARCH_DUPLICATE_SIGNATURE_LIMIT");
}
}
#[test]
fn summarize_auth_pending_outputs_returns_summary_when_all_are_auth_related() {
let outputs = vec![
"Authorization pending for `mcp.arcade.gmail_sendemail`.\nAuthorize here: https://example.com/a".to_string(),
"Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com/b".to_string(),
];
let summary = summarize_auth_pending_outputs(&outputs).expect("summary expected");
assert!(summary.contains("Authorization is required before I can continue"));
assert!(summary.contains("gmail_sendemail"));
assert!(summary.contains("gmail_whoami"));
}
#[test]
fn summarize_auth_pending_outputs_returns_none_for_mixed_outputs() {
let outputs = vec![
"Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com".to_string(),
"Tool `read` result:\nok".to_string(),
];
assert!(summarize_auth_pending_outputs(&outputs).is_none());
}
#[test]
fn invalid_tool_args_retry_context_handles_missing_bash_command() {
let outputs = vec!["Tool `bash` result:\nBASH_COMMAND_MISSING".to_string()];
let message = build_invalid_tool_args_retry_context_from_outputs(&outputs, 0)
.expect("retry expected");
assert!(message.contains("required `command` field"));
assert!(message.contains("Prefer `ls`, `glob`, `search`, and `read`"));
}
#[test]
fn invalid_tool_args_retry_context_escalates_on_repeat_bash_failure() {
let outputs = vec!["Tool `bash` result:\nBASH_COMMAND_MISSING".to_string()];
let message = build_invalid_tool_args_retry_context_from_outputs(&outputs, 1)
.expect("retry expected");
assert!(message.contains("Do not repeat an empty bash call"));
}
#[test]
fn invalid_tool_args_retry_context_ignores_unrelated_outputs() {
let outputs = vec!["Tool `read` result:\nok".to_string()];
assert!(build_invalid_tool_args_retry_context_from_outputs(&outputs, 0).is_none());
}
#[test]
fn prewrite_repair_retry_context_prioritizes_research_tools_before_write() {
let requirements = PrewriteRequirements {
workspace_inspection_required: true,
web_research_required: true,
concrete_read_required: true,
successful_web_research_required: true,
repair_on_unmet_requirements: true,
coverage_mode: PrewriteCoverageMode::ResearchCorpus,
};
let prompt = build_prewrite_repair_retry_context(
"glob, read, websearch, write",
RequiredToolFailureKind::WriteRequiredNotSatisfied,
r#"Required output target:
{
"path": "marketing-brief.md",
"kind": "artifact"
}"#,
&requirements,
true,
false,
false,
false,
);
assert!(prompt.contains("requires concrete `read` calls"));
assert!(prompt.contains("call `websearch` with a concrete query now"));
assert!(prompt.contains("Use `read` and `websearch` now to gather evidence"));
assert!(prompt.contains("Do not declare the output blocked"));
assert!(!prompt.contains("blocked-but-substantive artifact"));
assert!(!prompt.contains("Your next response must be a `write` tool call"));
assert!(!prompt.contains("Do not call `glob`, `read`, or `websearch` again"));
}
#[test]
fn empty_completion_retry_context_requires_write_when_prewrite_is_satisfied() {
let requirements = PrewriteRequirements {
workspace_inspection_required: true,
web_research_required: false,
concrete_read_required: true,
successful_web_research_required: false,
repair_on_unmet_requirements: true,
coverage_mode: PrewriteCoverageMode::ResearchCorpus,
};
let prompt = build_empty_completion_retry_context(
"glob, read, write",
"Create or update `marketing-brief.md` relative to the workspace root.",
&requirements,
true,
true,
false,
false,
);
assert!(prompt.contains("returned no final output"));
assert!(prompt.contains("marketing-brief.md"));
assert!(prompt.contains("must be a `write` tool call"));
}
#[test]
fn empty_completion_retry_context_mentions_missing_prewrite_work() {
let requirements = PrewriteRequirements {
workspace_inspection_required: true,
web_research_required: true,
concrete_read_required: true,
successful_web_research_required: true,
repair_on_unmet_requirements: true,
coverage_mode: PrewriteCoverageMode::ResearchCorpus,
};
let prompt = build_empty_completion_retry_context(
"glob, read, websearch, write",
"Create or update `marketing-brief.md` relative to the workspace root.",
&requirements,
true,
false,
false,
false,
);
assert!(prompt.contains("still need to use `read`"));
assert!(prompt.contains("use `websearch`"));
assert!(prompt.contains("After completing the missing requirement"));
}
#[test]
fn synthesize_artifact_write_completion_from_tool_state_marks_completed() {
let completion = synthesize_artifact_write_completion_from_tool_state(
"Create or update `marketing-brief.md` relative to the workspace root.",
true,
false,
);
assert!(completion.contains("wrote `marketing-brief.md`"));
assert!(completion.contains("\"status\":\"completed\""));
assert!(completion.contains("Runtime validation will verify"));
}
#[test]
fn synthesize_artifact_write_completion_from_tool_state_mentions_waived_evidence() {
let completion = synthesize_artifact_write_completion_from_tool_state(
"Create or update `marketing-brief.md` relative to the workspace root.",
false,
true,
);
assert!(completion.contains("waived in-run"));
assert!(completion.contains("\"status\":\"completed\""));
}
#[test]
fn prewrite_repair_retry_budget_allows_five_repair_attempts() {
assert_eq!(prewrite_repair_retry_max_attempts(), 5);
}
#[test]
fn prewrite_repair_tool_filter_removes_write_until_evidence_is_satisfied() {
let offered = ["glob", "read", "websearch", "write", "edit"];
let filtered = offered
.iter()
.copied()
.filter(|tool| {
tool_matches_unmet_prewrite_repair_requirement(
tool,
&[
"workspace_inspection_required",
"concrete_read_required",
"successful_web_research_required",
],
)
})
.collect::<Vec<_>>();
assert_eq!(filtered, vec!["glob", "read", "websearch"]);
}
#[test]
fn prewrite_repair_tool_filter_restricts_to_glob_and_read_for_concrete_reads() {
let offered = ["glob", "read", "search", "write"];
let filtered = offered
.iter()
.copied()
.filter(|tool| {
tool_matches_unmet_prewrite_repair_requirement(tool, &["concrete_read_required"])
})
.collect::<Vec<_>>();
assert_eq!(filtered, vec!["glob", "read"]);
}
#[test]
fn prewrite_repair_tool_filter_allows_glob_only_for_workspace_inspection() {
let offered = ["glob", "read", "websearch", "write"];
let with_inspection_unmet = offered
.iter()
.copied()
.filter(|tool| {
tool_matches_unmet_prewrite_repair_requirement(
tool,
&["workspace_inspection_required", "concrete_read_required"],
)
})
.collect::<Vec<_>>();
assert_eq!(with_inspection_unmet, vec!["glob", "read"]);
let without_inspection_unmet = offered
.iter()
.copied()
.filter(|tool| {
tool_matches_unmet_prewrite_repair_requirement(
tool,
&["concrete_read_required", "web_research_required"],
)
})
.collect::<Vec<_>>();
assert_eq!(without_inspection_unmet, vec!["glob", "read", "websearch"]);
}
#[test]
fn prewrite_repair_after_glob_restricts_to_glob_read_and_websearch() {
let offered = ["glob", "read", "websearch", "write", "edit"];
let filtered = offered
.iter()
.copied()
.filter(|tool| {
tool_matches_unmet_prewrite_repair_requirement(
tool,
&[
"concrete_read_required",
"successful_web_research_required",
"coverage_mode",
],
)
})
.collect::<Vec<_>>();
assert_eq!(filtered, vec!["glob", "read", "websearch"]);
}
#[test]
fn prewrite_requirements_exhausted_completion_reports_structured_repair_state() {
let message = prewrite_requirements_exhausted_completion(
&["concrete_read_required", "successful_web_research_required"],
2,
0,
);
assert!(message.contains("PREWRITE_REQUIREMENTS_EXHAUSTED"));
assert!(message.contains("\"status\":\"blocked\""));
assert!(message.contains("\"repairAttempt\":2"));
assert!(message.contains("\"repairAttemptsRemaining\":0"));
assert!(message.contains("\"repairExhausted\":true"));
assert!(message.contains("\"unmetRequirements\":[\"concrete_read_required\", \"successful_web_research_required\"]"));
}
#[test]
fn prewrite_waived_write_context_includes_unmet_codes() {
let user_text = "Some task text without output target marker.";
let unmet = vec!["concrete_read_required", "coverage_mode"];
let ctx = build_prewrite_waived_write_context(user_text, &unmet);
assert!(ctx.contains("could not be fully satisfied"));
assert!(ctx.contains("concrete_read_required"));
assert!(ctx.contains("coverage_mode"));
assert!(ctx.contains("write"));
assert!(ctx.contains("Do not write a blocked or placeholder file"));
}
#[test]
fn prewrite_waived_write_context_includes_output_path_when_present() {
let user_text = "Required output target: {\"path\": \"marketing-brief.md\"}";
let unmet = vec!["concrete_read_required"];
let ctx = build_prewrite_waived_write_context(user_text, &unmet);
assert!(ctx.contains("marketing-brief.md"));
assert!(ctx.contains("`write`"));
}
#[test]
fn prewrite_gate_waived_disables_prewrite_gate_write() {
let repair_on_unmet = true;
let productive_write = 0;
let prewrite_satisfied = false;
let gate = should_gate_write_until_prewrite_satisfied(
repair_on_unmet,
productive_write,
prewrite_satisfied,
);
assert!(gate, "gate should be active before waiver");
let waived = true;
let gate_after = gate && !waived;
assert!(!gate_after, "gate should be off after waiver");
}
#[test]
fn prewrite_gate_waived_disables_allow_repair_tools() {
let requested_write_required = true;
let unmet_prewrite_repair_retry_count = 5usize;
let prewrite_satisfied = false;
let prewrite_gate_waived = false;
let allow_repair = requested_write_required
&& unmet_prewrite_repair_retry_count > 0
&& !prewrite_satisfied
&& !prewrite_gate_waived;
assert!(allow_repair, "repair tools should be active before waiver");
let prewrite_gate_waived = true;
let allow_repair_after = requested_write_required
&& unmet_prewrite_repair_retry_count > 0
&& !prewrite_satisfied
&& !prewrite_gate_waived;
assert!(
!allow_repair_after,
"repair tools should be disabled after waiver"
);
}
#[test]
fn force_write_only_enabled_after_prewrite_waiver() {
let requested_write_required = true;
let required_write_retry_count = 1usize;
let productive_write_tool_calls_total = 0usize;
let prewrite_satisfied = false;
let prewrite_gate_write = false;
let repair_on_unmet_requirements = true;
let prewrite_gate_waived = true;
let force_write_only = requested_write_required
&& required_write_retry_count > 0
&& (productive_write_tool_calls_total == 0 || prewrite_satisfied)
&& !prewrite_gate_write
&& (!repair_on_unmet_requirements || prewrite_gate_waived);
assert!(
force_write_only,
"force_write_only should be active after prewrite waiver + write retry"
);
}
#[test]
fn force_write_only_disabled_before_prewrite_waiver() {
let requested_write_required = true;
let required_write_retry_count = 1usize;
let productive_write_tool_calls_total = 0usize;
let prewrite_satisfied = false;
let prewrite_gate_write = false;
let repair_on_unmet_requirements = true;
let prewrite_gate_waived = false;
let force_write_only = requested_write_required
&& required_write_retry_count > 0
&& (productive_write_tool_calls_total == 0 || prewrite_satisfied)
&& !prewrite_gate_write
&& (!repair_on_unmet_requirements || prewrite_gate_waived);
assert!(
!force_write_only,
"force_write_only should be disabled before waiver for prewrite nodes"
);
}
#[test]
fn parse_budget_override_zero_disables_budget() {
unsafe {
std::env::set_var("TANDEM_TOOL_BUDGET_DEFAULT", "0");
}
assert_eq!(
parse_budget_override("TANDEM_TOOL_BUDGET_DEFAULT"),
Some(usize::MAX)
);
unsafe {
std::env::remove_var("TANDEM_TOOL_BUDGET_DEFAULT");
}
}
#[test]
fn disable_tool_guard_budgets_env_overrides_all_budgets() {
unsafe {
std::env::set_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS", "1");
}
assert_eq!(tool_budget_for("mcp.arcade.gmail_sendemail"), usize::MAX);
assert_eq!(tool_budget_for("websearch"), usize::MAX);
unsafe {
std::env::remove_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS");
}
}
#[test]
fn tool_budget_defaults_to_200_calls() {
let _guard = env_test_lock();
unsafe {
std::env::remove_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS");
std::env::remove_var("TANDEM_TOOL_BUDGET_DEFAULT");
std::env::remove_var("TANDEM_TOOL_BUDGET_WEBSEARCH");
std::env::remove_var("TANDEM_TOOL_BUDGET_READ");
}
assert_eq!(tool_budget_for("bash"), 200);
assert_eq!(tool_budget_for("websearch"), 200);
assert_eq!(tool_budget_for("read"), 200);
}
#[test]
fn tool_budget_env_override_respects_minimum_floor() {
let _guard = env_test_lock();
unsafe {
std::env::remove_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS");
std::env::set_var("TANDEM_TOOL_BUDGET_DEFAULT", "17");
std::env::set_var("TANDEM_TOOL_BUDGET_WEBSEARCH", "250");
}
assert_eq!(tool_budget_for("bash"), 200);
assert_eq!(tool_budget_for("websearch"), 250);
unsafe {
std::env::remove_var("TANDEM_TOOL_BUDGET_DEFAULT");
std::env::remove_var("TANDEM_TOOL_BUDGET_WEBSEARCH");
}
}
}