1use chrono::Utc;
2use futures::future::BoxFuture;
3use futures::StreamExt;
4use serde_json::{json, Map, Number, Value};
5use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
6use std::hash::{Hash, Hasher};
7use std::path::{Path, PathBuf};
8use std::time::Duration;
9use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
10use tandem_providers::{ChatAttachment, ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
11use tandem_tools::{validate_tool_schemas, ToolRegistry};
12use tandem_types::{
13 ContextMode, EngineEvent, HostOs, HostRuntimeContext, Message, MessagePart, MessagePartInput,
14 MessageRole, ModelSpec, PathStyle, SendMessageRequest, ShellFamily, ToolMode, ToolSchema,
15};
16use tandem_wire::WireMessagePart;
17use tokio_util::sync::CancellationToken;
18use tracing::Level;
19
20mod loop_guards;
21
22#[cfg(test)]
23use loop_guards::parse_budget_override;
24use loop_guards::{
25 duplicate_signature_limit_for, tool_budget_for, websearch_duplicate_signature_limit,
26};
27
28use crate::tool_router::{
29 classify_intent, default_mode_name, is_short_simple_prompt, max_tools_per_call_expanded,
30 select_tool_subset, should_escalate_auto_tools, tool_router_enabled, ToolIntent,
31 ToolRoutingDecision,
32};
33use crate::{
34 any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
35 tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
36 PermissionAction, PermissionManager, PluginRegistry, Storage,
37};
38use tokio::sync::RwLock;
39
40#[derive(Default)]
41struct StreamedToolCall {
42 name: String,
43 args: String,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47enum RawToolArgsState {
48 Present,
49 Empty,
50 Unparseable,
51}
52
53impl RawToolArgsState {
54 fn as_str(self) -> &'static str {
55 match self {
56 Self::Present => "present",
57 Self::Empty => "empty",
58 Self::Unparseable => "unparseable",
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64enum WritePathRecoveryMode {
65 Heuristic,
66 OutputTargetOnly,
67}
68
69#[derive(Debug, Clone)]
70pub struct SpawnAgentToolContext {
71 pub session_id: String,
72 pub message_id: String,
73 pub tool_call_id: Option<String>,
74 pub args: Value,
75}
76
77#[derive(Debug, Clone)]
78pub struct SpawnAgentToolResult {
79 pub output: String,
80 pub metadata: Value,
81}
82
83#[derive(Debug, Clone)]
84pub struct ToolPolicyContext {
85 pub session_id: String,
86 pub message_id: String,
87 pub tool: String,
88 pub args: Value,
89}
90
91#[derive(Debug, Clone)]
92pub struct ToolPolicyDecision {
93 pub allowed: bool,
94 pub reason: Option<String>,
95}
96
97pub trait SpawnAgentHook: Send + Sync {
98 fn spawn_agent(
99 &self,
100 ctx: SpawnAgentToolContext,
101 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>>;
102}
103
104pub trait ToolPolicyHook: Send + Sync {
105 fn evaluate_tool(
106 &self,
107 ctx: ToolPolicyContext,
108 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>>;
109}
110
111#[derive(Debug, Clone)]
112pub struct PromptContextHookContext {
113 pub session_id: String,
114 pub message_id: String,
115 pub provider_id: String,
116 pub model_id: String,
117 pub iteration: usize,
118}
119
120pub trait PromptContextHook: Send + Sync {
121 fn augment_provider_messages(
122 &self,
123 ctx: PromptContextHookContext,
124 messages: Vec<ChatMessage>,
125 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>>;
126}
127
128#[derive(Clone)]
129pub struct EngineLoop {
130 storage: std::sync::Arc<Storage>,
131 event_bus: EventBus,
132 providers: ProviderRegistry,
133 plugins: PluginRegistry,
134 agents: AgentRegistry,
135 permissions: PermissionManager,
136 tools: ToolRegistry,
137 cancellations: CancellationRegistry,
138 host_runtime_context: HostRuntimeContext,
139 workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
140 session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
141 session_auto_approve_permissions: std::sync::Arc<RwLock<HashMap<String, bool>>>,
142 spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
143 tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
144 prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
145}
146
147impl EngineLoop {
148 #[allow(clippy::too_many_arguments)]
149 pub fn new(
150 storage: std::sync::Arc<Storage>,
151 event_bus: EventBus,
152 providers: ProviderRegistry,
153 plugins: PluginRegistry,
154 agents: AgentRegistry,
155 permissions: PermissionManager,
156 tools: ToolRegistry,
157 cancellations: CancellationRegistry,
158 host_runtime_context: HostRuntimeContext,
159 ) -> Self {
160 Self {
161 storage,
162 event_bus,
163 providers,
164 plugins,
165 agents,
166 permissions,
167 tools,
168 cancellations,
169 host_runtime_context,
170 workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
171 session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
172 session_auto_approve_permissions: std::sync::Arc::new(RwLock::new(HashMap::new())),
173 spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
174 tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
175 prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
176 }
177 }
178
179 pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
180 *self.spawn_agent_hook.write().await = Some(hook);
181 }
182
183 pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
184 *self.tool_policy_hook.write().await = Some(hook);
185 }
186
187 pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
188 *self.prompt_context_hook.write().await = Some(hook);
189 }
190
191 pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
192 let normalized = allowed_tools
193 .into_iter()
194 .map(|tool| normalize_tool_name(&tool))
195 .filter(|tool| !tool.trim().is_empty())
196 .collect::<Vec<_>>();
197 self.session_allowed_tools
198 .write()
199 .await
200 .insert(session_id.to_string(), normalized);
201 }
202
203 pub async fn clear_session_allowed_tools(&self, session_id: &str) {
204 self.session_allowed_tools.write().await.remove(session_id);
205 }
206
207 pub async fn set_session_auto_approve_permissions(&self, session_id: &str, enabled: bool) {
208 if enabled {
209 self.session_auto_approve_permissions
210 .write()
211 .await
212 .insert(session_id.to_string(), true);
213 } else {
214 self.session_auto_approve_permissions
215 .write()
216 .await
217 .remove(session_id);
218 }
219 }
220
221 pub async fn clear_session_auto_approve_permissions(&self, session_id: &str) {
222 self.session_auto_approve_permissions
223 .write()
224 .await
225 .remove(session_id);
226 }
227
228 pub async fn grant_workspace_override_for_session(
229 &self,
230 session_id: &str,
231 ttl_seconds: u64,
232 ) -> u64 {
233 let expires_at = chrono::Utc::now()
234 .timestamp_millis()
235 .max(0)
236 .saturating_add((ttl_seconds as i64).saturating_mul(1000))
237 as u64;
238 self.workspace_overrides
239 .write()
240 .await
241 .insert(session_id.to_string(), expires_at);
242 expires_at
243 }
244
245 pub async fn run_prompt_async(
246 &self,
247 session_id: String,
248 req: SendMessageRequest,
249 ) -> anyhow::Result<()> {
250 self.run_prompt_async_with_context(session_id, req, None)
251 .await
252 }
253
254 pub async fn run_prompt_async_with_context(
255 &self,
256 session_id: String,
257 req: SendMessageRequest,
258 correlation_id: Option<String>,
259 ) -> anyhow::Result<()> {
260 let session_model = self
261 .storage
262 .get_session(&session_id)
263 .await
264 .and_then(|s| s.model);
265 let (provider_id, model_id_value) =
266 resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
267 anyhow::anyhow!(
268 "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
269 )
270 })?;
271 let correlation_ref = correlation_id.as_deref();
272 let model_id = Some(model_id_value.as_str());
273 let cancel = self.cancellations.create(&session_id).await;
274 emit_event(
275 Level::INFO,
276 ProcessKind::Engine,
277 ObservabilityEvent {
278 event: "provider.call.start",
279 component: "engine.loop",
280 correlation_id: correlation_ref,
281 session_id: Some(&session_id),
282 run_id: None,
283 message_id: None,
284 provider_id: Some(provider_id.as_str()),
285 model_id,
286 status: Some("start"),
287 error_code: None,
288 detail: Some("run_prompt_async dispatch"),
289 },
290 );
291 self.event_bus.publish(EngineEvent::new(
292 "session.status",
293 json!({"sessionID": session_id, "status":"running"}),
294 ));
295 let request_parts = req.parts.clone();
296 let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
297 let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
298 let requested_write_required = req.write_required.unwrap_or(false);
299 let request_tool_allowlist = req
300 .tool_allowlist
301 .clone()
302 .unwrap_or_default()
303 .into_iter()
304 .map(|tool| normalize_tool_name(&tool))
305 .filter(|tool| !tool.trim().is_empty())
306 .collect::<HashSet<_>>();
307 let text = req
308 .parts
309 .iter()
310 .map(|p| match p {
311 MessagePartInput::Text { text } => text.clone(),
312 MessagePartInput::File {
313 mime,
314 filename,
315 url,
316 } => format!(
317 "[file mime={} name={} url={}]",
318 mime,
319 filename.clone().unwrap_or_else(|| "unknown".to_string()),
320 url
321 ),
322 })
323 .collect::<Vec<_>>()
324 .join("\n");
325 let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
326 self.auto_rename_session_from_user_text(&session_id, &text)
327 .await;
328 let active_agent = self.agents.get(req.agent.as_deref()).await;
329 let mut user_message_id = self
330 .find_recent_matching_user_message_id(&session_id, &text)
331 .await;
332 if user_message_id.is_none() {
333 let user_message = Message::new(
334 MessageRole::User,
335 vec![MessagePart::Text { text: text.clone() }],
336 );
337 let created_message_id = user_message.id.clone();
338 self.storage
339 .append_message(&session_id, user_message)
340 .await?;
341
342 let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
343 self.event_bus.publish(EngineEvent::new(
344 "message.part.updated",
345 json!({
346 "part": user_part,
347 "delta": text,
348 "agent": active_agent.name
349 }),
350 ));
351 user_message_id = Some(created_message_id);
352 }
353 let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
354
355 if cancel.is_cancelled() {
356 self.event_bus.publish(EngineEvent::new(
357 "session.status",
358 json!({"sessionID": session_id, "status":"cancelled"}),
359 ));
360 self.cancellations.remove(&session_id).await;
361 return Ok(());
362 }
363
364 let mut question_tool_used = false;
365 let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
366 if normalize_tool_name(&tool) == "question" {
367 question_tool_used = true;
368 }
369 if !agent_can_use_tool(&active_agent, &tool) {
370 format!(
371 "Tool `{tool}` is not enabled for agent `{}`.",
372 active_agent.name
373 )
374 } else {
375 self.execute_tool_with_permission(
376 &session_id,
377 &user_message_id,
378 tool.clone(),
379 args,
380 active_agent.skills.as_deref(),
381 &text,
382 requested_write_required,
383 None,
384 cancel.clone(),
385 )
386 .await?
387 .unwrap_or_default()
388 }
389 } else {
390 let mut completion = String::new();
391 let mut max_iterations = max_tool_iterations();
392 let mut followup_context: Option<String> = None;
393 let mut last_tool_outputs: Vec<String> = Vec::new();
394 let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
395 let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
396 let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
397 let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
398 let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
399 let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
400 let mut websearch_query_blocked = false;
401 let websearch_duplicate_signature_limit = websearch_duplicate_signature_limit();
402 let mut pack_builder_executed = false;
403 let mut auto_workspace_probe_attempted = false;
404 let mut productive_tool_calls_total = 0usize;
405 let mut productive_write_tool_calls_total = 0usize;
406 let mut required_tool_retry_count = 0usize;
407 let mut required_write_retry_count = 0usize;
408 let strict_write_retry_max_attempts = strict_write_retry_max_attempts();
409 let mut required_tool_unsatisfied_emitted = false;
410 let mut latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
411 let email_delivery_requested = requires_email_delivery_prompt(&text);
412 let web_research_requested = requires_web_research_prompt(&text);
413 let mut email_action_executed = false;
414 let mut latest_email_action_note: Option<String> = None;
415 let intent = classify_intent(&text);
416 let router_enabled = tool_router_enabled();
417 let retrieval_enabled = semantic_tool_retrieval_enabled();
418 let retrieval_k = semantic_tool_retrieval_k();
419 let mcp_server_names = if mcp_catalog_in_system_prompt_enabled() {
420 self.tools.mcp_server_names().await
421 } else {
422 Vec::new()
423 };
424 let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
425 let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
426 && runtime_attachments.is_empty()
427 && is_short_simple_prompt(&text)
428 && matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
429
430 while max_iterations > 0 && !cancel.is_cancelled() {
431 let iteration = 26usize.saturating_sub(max_iterations);
432 max_iterations -= 1;
433 let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
434 ChatHistoryProfile::Full
435 } else if matches!(requested_context_mode, ContextMode::Compact)
436 || context_is_auto_compact
437 {
438 ChatHistoryProfile::Compact
439 } else {
440 ChatHistoryProfile::Standard
441 };
442 let mut messages =
443 load_chat_history(self.storage.clone(), &session_id, context_profile).await;
444 if iteration == 1 && !runtime_attachments.is_empty() {
445 attach_to_last_user_message(&mut messages, &runtime_attachments);
446 }
447 let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
448 self.event_bus.publish(EngineEvent::new(
449 "context.profile.selected",
450 json!({
451 "sessionID": session_id,
452 "messageID": user_message_id,
453 "iteration": iteration,
454 "contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
455 "historyMessageCount": messages.len(),
456 "historyCharCount": history_char_count,
457 "memoryInjected": false
458 }),
459 ));
460 let mut system_parts = vec![tandem_runtime_system_prompt(
461 &self.host_runtime_context,
462 &mcp_server_names,
463 )];
464 if let Some(system) = active_agent.system_prompt.as_ref() {
465 system_parts.push(system.clone());
466 }
467 messages.insert(
468 0,
469 ChatMessage {
470 role: "system".to_string(),
471 content: system_parts.join("\n\n"),
472 attachments: Vec::new(),
473 },
474 );
475 if let Some(extra) = followup_context.take() {
476 messages.push(ChatMessage {
477 role: "user".to_string(),
478 content: extra,
479 attachments: Vec::new(),
480 });
481 }
482 if let Some(hook) = self.prompt_context_hook.read().await.clone() {
483 let ctx = PromptContextHookContext {
484 session_id: session_id.clone(),
485 message_id: user_message_id.clone(),
486 provider_id: provider_id.clone(),
487 model_id: model_id_value.clone(),
488 iteration,
489 };
490 let hook_timeout =
491 Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
492 match tokio::time::timeout(
493 hook_timeout,
494 hook.augment_provider_messages(ctx, messages.clone()),
495 )
496 .await
497 {
498 Ok(Ok(augmented)) => {
499 messages = augmented;
500 }
501 Ok(Err(err)) => {
502 self.event_bus.publish(EngineEvent::new(
503 "memory.context.error",
504 json!({
505 "sessionID": session_id,
506 "messageID": user_message_id,
507 "iteration": iteration,
508 "error": truncate_text(&err.to_string(), 500),
509 }),
510 ));
511 }
512 Err(_) => {
513 self.event_bus.publish(EngineEvent::new(
514 "memory.context.error",
515 json!({
516 "sessionID": session_id,
517 "messageID": user_message_id,
518 "iteration": iteration,
519 "error": format!(
520 "prompt context hook timeout after {} ms",
521 hook_timeout.as_millis()
522 ),
523 }),
524 ));
525 }
526 }
527 }
528 let all_tools = self.tools.list().await;
529 let mut retrieval_fallback_reason: Option<&'static str> = None;
530 let mut candidate_tools = if retrieval_enabled {
531 self.tools.retrieve(&text, retrieval_k).await
532 } else {
533 all_tools.clone()
534 };
535 if retrieval_enabled {
536 if candidate_tools.is_empty() && !all_tools.is_empty() {
537 candidate_tools = all_tools.clone();
538 retrieval_fallback_reason = Some("retrieval_empty_result");
539 } else if web_research_requested
540 && has_web_research_tools(&all_tools)
541 && !has_web_research_tools(&candidate_tools)
542 {
543 candidate_tools = all_tools.clone();
544 retrieval_fallback_reason = Some("missing_web_tools_for_research_prompt");
545 } else if email_delivery_requested
546 && has_email_action_tools(&all_tools)
547 && !has_email_action_tools(&candidate_tools)
548 {
549 candidate_tools = all_tools.clone();
550 retrieval_fallback_reason = Some("missing_email_tools_for_delivery_prompt");
551 }
552 }
553 let mut tool_schemas = if !router_enabled {
554 candidate_tools
555 } else {
556 match requested_tool_mode {
557 ToolMode::None => Vec::new(),
558 ToolMode::Required => select_tool_subset(
559 candidate_tools,
560 intent,
561 &request_tool_allowlist,
562 iteration > 1,
563 ),
564 ToolMode::Auto => {
565 if !auto_tools_escalated {
566 Vec::new()
567 } else {
568 select_tool_subset(
569 candidate_tools,
570 intent,
571 &request_tool_allowlist,
572 iteration > 1,
573 )
574 }
575 }
576 }
577 };
578 let mut policy_patterns =
579 request_tool_allowlist.iter().cloned().collect::<Vec<_>>();
580 if let Some(agent_tools) = active_agent.tools.as_ref() {
581 policy_patterns
582 .extend(agent_tools.iter().map(|tool| normalize_tool_name(tool)));
583 }
584 let session_allowed_tools = self
585 .session_allowed_tools
586 .read()
587 .await
588 .get(&session_id)
589 .cloned()
590 .unwrap_or_default();
591 policy_patterns.extend(session_allowed_tools.iter().cloned());
592 if !policy_patterns.is_empty() {
593 let mut included = tool_schemas
594 .iter()
595 .map(|schema| normalize_tool_name(&schema.name))
596 .collect::<HashSet<_>>();
597 for schema in &all_tools {
598 let normalized = normalize_tool_name(&schema.name);
599 if policy_patterns
600 .iter()
601 .any(|pattern| tool_name_matches_policy(pattern, &normalized))
602 && included.insert(normalized)
603 {
604 tool_schemas.push(schema.clone());
605 }
606 }
607 }
608 if !request_tool_allowlist.is_empty() {
609 tool_schemas.retain(|schema| {
610 let tool = normalize_tool_name(&schema.name);
611 request_tool_allowlist
612 .iter()
613 .any(|pattern| tool_name_matches_policy(pattern, &tool))
614 });
615 }
616 if requested_write_required && required_write_retry_count > 0 {
617 tool_schemas.retain(|schema| is_workspace_write_tool(&schema.name));
618 }
619 if active_agent.tools.is_some() {
620 tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
621 }
622 tool_schemas.retain(|schema| {
623 let normalized = normalize_tool_name(&schema.name);
624 if let Some(server) = mcp_server_from_tool_name(&normalized) {
625 !blocked_mcp_servers.contains(server)
626 } else {
627 true
628 }
629 });
630 if let Some(allowed_tools) = self
631 .session_allowed_tools
632 .read()
633 .await
634 .get(&session_id)
635 .cloned()
636 {
637 if !allowed_tools.is_empty() {
638 tool_schemas.retain(|schema| {
639 let normalized = normalize_tool_name(&schema.name);
640 any_policy_matches(&allowed_tools, &normalized)
641 });
642 }
643 }
644 if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
645 let detail = validation_err.to_string();
646 emit_event(
647 Level::ERROR,
648 ProcessKind::Engine,
649 ObservabilityEvent {
650 event: "provider.call.error",
651 component: "engine.loop",
652 correlation_id: correlation_ref,
653 session_id: Some(&session_id),
654 run_id: None,
655 message_id: Some(&user_message_id),
656 provider_id: Some(provider_id.as_str()),
657 model_id,
658 status: Some("failed"),
659 error_code: Some("TOOL_SCHEMA_INVALID"),
660 detail: Some(&detail),
661 },
662 );
663 anyhow::bail!("{detail}");
664 }
665 let routing_decision = ToolRoutingDecision {
666 pass: if auto_tools_escalated { 2 } else { 1 },
667 mode: match requested_tool_mode {
668 ToolMode::Auto => default_mode_name(),
669 ToolMode::None => "none",
670 ToolMode::Required => "required",
671 },
672 intent,
673 selected_count: tool_schemas.len(),
674 total_available_count: all_tools.len(),
675 mcp_included: tool_schemas
676 .iter()
677 .any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
678 };
679 self.event_bus.publish(EngineEvent::new(
680 "tool.routing.decision",
681 json!({
682 "sessionID": session_id,
683 "messageID": user_message_id,
684 "iteration": iteration,
685 "pass": routing_decision.pass,
686 "mode": routing_decision.mode,
687 "intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
688 "selectedToolCount": routing_decision.selected_count,
689 "totalAvailableTools": routing_decision.total_available_count,
690 "mcpIncluded": routing_decision.mcp_included,
691 "retrievalEnabled": retrieval_enabled,
692 "retrievalK": retrieval_k,
693 "fallbackToFullTools": retrieval_fallback_reason.is_some(),
694 "fallbackReason": retrieval_fallback_reason
695 }),
696 ));
697 let allowed_tool_names = tool_schemas
698 .iter()
699 .map(|schema| normalize_tool_name(&schema.name))
700 .collect::<HashSet<_>>();
701 let offered_tool_preview = tool_schemas
702 .iter()
703 .take(8)
704 .map(|schema| normalize_tool_name(&schema.name))
705 .collect::<Vec<_>>()
706 .join(", ");
707 self.event_bus.publish(EngineEvent::new(
708 "provider.call.iteration.start",
709 json!({
710 "sessionID": session_id,
711 "messageID": user_message_id,
712 "iteration": iteration,
713 "selectedToolCount": allowed_tool_names.len(),
714 }),
715 ));
716 let provider_connect_timeout =
717 Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
718 let stream_result = tokio::time::timeout(
719 provider_connect_timeout,
720 self.providers.stream_for_provider(
721 Some(provider_id.as_str()),
722 Some(model_id_value.as_str()),
723 messages,
724 requested_tool_mode.clone(),
725 Some(tool_schemas),
726 cancel.clone(),
727 ),
728 )
729 .await
730 .map_err(|_| {
731 anyhow::anyhow!(
732 "provider stream connect timeout after {} ms",
733 provider_connect_timeout.as_millis()
734 )
735 })
736 .and_then(|result| result);
737 let stream = match stream_result {
738 Ok(stream) => stream,
739 Err(err) => {
740 let error_text = err.to_string();
741 let error_code = provider_error_code(&error_text);
742 let detail = truncate_text(&error_text, 500);
743 emit_event(
744 Level::ERROR,
745 ProcessKind::Engine,
746 ObservabilityEvent {
747 event: "provider.call.error",
748 component: "engine.loop",
749 correlation_id: correlation_ref,
750 session_id: Some(&session_id),
751 run_id: None,
752 message_id: Some(&user_message_id),
753 provider_id: Some(provider_id.as_str()),
754 model_id,
755 status: Some("failed"),
756 error_code: Some(error_code),
757 detail: Some(&detail),
758 },
759 );
760 self.event_bus.publish(EngineEvent::new(
761 "provider.call.iteration.error",
762 json!({
763 "sessionID": session_id,
764 "messageID": user_message_id,
765 "iteration": iteration,
766 "error": detail,
767 }),
768 ));
769 return Err(err);
770 }
771 };
772 tokio::pin!(stream);
773 completion.clear();
774 let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
775 let mut provider_usage: Option<TokenUsage> = None;
776 let mut accepted_tool_calls_in_cycle = 0usize;
777 let provider_idle_timeout =
778 Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
779 loop {
780 let next_chunk_result =
781 tokio::time::timeout(provider_idle_timeout, stream.next())
782 .await
783 .map_err(|_| {
784 anyhow::anyhow!(
785 "provider stream idle timeout after {} ms",
786 provider_idle_timeout.as_millis()
787 )
788 });
789 let next_chunk = match next_chunk_result {
790 Ok(next_chunk) => next_chunk,
791 Err(err) => {
792 self.event_bus.publish(EngineEvent::new(
793 "provider.call.iteration.error",
794 json!({
795 "sessionID": session_id,
796 "messageID": user_message_id,
797 "iteration": iteration,
798 "error": truncate_text(&err.to_string(), 500),
799 }),
800 ));
801 return Err(err);
802 }
803 };
804 let Some(chunk) = next_chunk else {
805 break;
806 };
807 let chunk = match chunk {
808 Ok(chunk) => chunk,
809 Err(err) => {
810 let error_text = err.to_string();
811 let error_code = provider_error_code(&error_text);
812 let detail = truncate_text(&error_text, 500);
813 emit_event(
814 Level::ERROR,
815 ProcessKind::Engine,
816 ObservabilityEvent {
817 event: "provider.call.error",
818 component: "engine.loop",
819 correlation_id: correlation_ref,
820 session_id: Some(&session_id),
821 run_id: None,
822 message_id: Some(&user_message_id),
823 provider_id: Some(provider_id.as_str()),
824 model_id,
825 status: Some("failed"),
826 error_code: Some(error_code),
827 detail: Some(&detail),
828 },
829 );
830 self.event_bus.publish(EngineEvent::new(
831 "provider.call.iteration.error",
832 json!({
833 "sessionID": session_id,
834 "messageID": user_message_id,
835 "iteration": iteration,
836 "error": detail,
837 }),
838 ));
839 return Err(anyhow::anyhow!(
840 "provider stream chunk error: {error_text}"
841 ));
842 }
843 };
844 match chunk {
845 StreamChunk::TextDelta(delta) => {
846 let delta = strip_model_control_markers(&delta);
847 if delta.trim().is_empty() {
848 continue;
849 }
850 if completion.is_empty() {
851 emit_event(
852 Level::INFO,
853 ProcessKind::Engine,
854 ObservabilityEvent {
855 event: "provider.call.first_byte",
856 component: "engine.loop",
857 correlation_id: correlation_ref,
858 session_id: Some(&session_id),
859 run_id: None,
860 message_id: Some(&user_message_id),
861 provider_id: Some(provider_id.as_str()),
862 model_id,
863 status: Some("streaming"),
864 error_code: None,
865 detail: Some("first text delta"),
866 },
867 );
868 }
869 completion.push_str(&delta);
870 let delta = truncate_text(&delta, 4_000);
871 let delta_part =
872 WireMessagePart::text(&session_id, &user_message_id, delta.clone());
873 self.event_bus.publish(EngineEvent::new(
874 "message.part.updated",
875 json!({"part": delta_part, "delta": delta}),
876 ));
877 }
878 StreamChunk::ReasoningDelta(_reasoning) => {}
879 StreamChunk::Done {
880 finish_reason: _,
881 usage,
882 } => {
883 if usage.is_some() {
884 provider_usage = usage;
885 }
886 break;
887 }
888 StreamChunk::ToolCallStart { id, name } => {
889 let entry = streamed_tool_calls.entry(id).or_default();
890 if entry.name.is_empty() {
891 entry.name = name;
892 }
893 }
894 StreamChunk::ToolCallDelta { id, args_delta } => {
895 let entry = streamed_tool_calls.entry(id.clone()).or_default();
896 entry.args.push_str(&args_delta);
897 let tool_name = if entry.name.trim().is_empty() {
898 "tool".to_string()
899 } else {
900 normalize_tool_name(&entry.name)
901 };
902 let parsed_preview = if entry.name.trim().is_empty() {
903 Value::String(truncate_text(&entry.args, 1_000))
904 } else {
905 parse_streamed_tool_args(&tool_name, &entry.args)
906 };
907 let mut tool_part = WireMessagePart::tool_invocation(
908 &session_id,
909 &user_message_id,
910 tool_name.clone(),
911 parsed_preview.clone(),
912 );
913 tool_part.id = Some(id.clone());
914 if tool_name == "write" {
915 tracing::info!(
916 session_id = %session_id,
917 message_id = %user_message_id,
918 tool_call_id = %id,
919 args_delta_len = args_delta.len(),
920 accumulated_args_len = entry.args.len(),
921 parsed_preview_empty = parsed_preview.is_null()
922 || parsed_preview.as_object().is_some_and(|value| value.is_empty())
923 || parsed_preview
924 .as_str()
925 .map(|value| value.trim().is_empty())
926 .unwrap_or(false),
927 "streamed write tool args delta received"
928 );
929 }
930 self.event_bus.publish(EngineEvent::new(
931 "message.part.updated",
932 json!({
933 "part": tool_part,
934 "toolCallDelta": {
935 "id": id,
936 "tool": tool_name,
937 "argsDelta": truncate_text(&args_delta, 1_000),
938 "parsedArgsPreview": parsed_preview
939 }
940 }),
941 ));
942 }
943 StreamChunk::ToolCallEnd { id: _ } => {}
944 }
945 if cancel.is_cancelled() {
946 break;
947 }
948 }
949
950 let streamed_tool_call_count = streamed_tool_calls.len();
951 let streamed_tool_call_parse_failed = streamed_tool_calls
952 .values()
953 .any(|call| !call.args.trim().is_empty() && call.name.trim().is_empty());
954 let mut tool_calls = streamed_tool_calls
955 .into_values()
956 .filter_map(|call| {
957 if call.name.trim().is_empty() {
958 return None;
959 }
960 let tool_name = normalize_tool_name(&call.name);
961 let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
962 Some((tool_name, parsed_args))
963 })
964 .collect::<Vec<_>>();
965 if tool_calls.is_empty() {
966 tool_calls = parse_tool_invocations_from_response(&completion);
967 }
968 let provider_tool_parse_failed = tool_calls.is_empty()
969 && (streamed_tool_call_parse_failed
970 || (streamed_tool_call_count > 0
971 && looks_like_unparsed_tool_payload(&completion))
972 || looks_like_unparsed_tool_payload(&completion));
973 if provider_tool_parse_failed {
974 latest_required_tool_failure_kind =
975 RequiredToolFailureKind::ToolCallParseFailed;
976 } else if tool_calls.is_empty() {
977 latest_required_tool_failure_kind = RequiredToolFailureKind::NoToolCallEmitted;
978 }
979 if router_enabled
980 && matches!(requested_tool_mode, ToolMode::Auto)
981 && !auto_tools_escalated
982 && iteration == 1
983 && should_escalate_auto_tools(intent, &text, &completion)
984 {
985 auto_tools_escalated = true;
986 followup_context = Some(
987 "Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
988 .to_string(),
989 );
990 self.event_bus.publish(EngineEvent::new(
991 "provider.call.iteration.finish",
992 json!({
993 "sessionID": session_id,
994 "messageID": user_message_id,
995 "iteration": iteration,
996 "finishReason": "auto_escalate",
997 "acceptedToolCalls": accepted_tool_calls_in_cycle,
998 "rejectedToolCalls": 0,
999 }),
1000 ));
1001 continue;
1002 }
1003 if tool_calls.is_empty()
1004 && !auto_workspace_probe_attempted
1005 && should_force_workspace_probe(&text, &completion)
1006 && allowed_tool_names.contains("glob")
1007 {
1008 auto_workspace_probe_attempted = true;
1009 tool_calls = vec![("glob".to_string(), json!({ "pattern": "*" }))];
1010 }
1011 if !tool_calls.is_empty() {
1012 let saw_tool_call_candidate = true;
1013 let mut outputs = Vec::new();
1014 let mut executed_productive_tool = false;
1015 let mut write_tool_attempted_in_cycle = false;
1016 let mut auth_required_hit_in_cycle = false;
1017 let mut guard_budget_hit_in_cycle = false;
1018 let mut duplicate_signature_hit_in_cycle = false;
1019 let mut rejected_tool_call_in_cycle = false;
1020 for (tool, args) in tool_calls {
1021 if !agent_can_use_tool(&active_agent, &tool) {
1022 rejected_tool_call_in_cycle = true;
1023 continue;
1024 }
1025 let tool_key = normalize_tool_name(&tool);
1026 if is_workspace_write_tool(&tool_key) {
1027 write_tool_attempted_in_cycle = true;
1028 }
1029 if !allowed_tool_names.contains(&tool_key) {
1030 rejected_tool_call_in_cycle = true;
1031 let note = if offered_tool_preview.is_empty() {
1032 format!(
1033 "Tool `{}` call skipped: it is not available in this turn.",
1034 tool_key
1035 )
1036 } else {
1037 format!(
1038 "Tool `{}` call skipped: it is not available in this turn. Available tools: {}.",
1039 tool_key, offered_tool_preview
1040 )
1041 };
1042 self.event_bus.publish(EngineEvent::new(
1043 "tool.call.rejected_unoffered",
1044 json!({
1045 "sessionID": session_id,
1046 "messageID": user_message_id,
1047 "iteration": iteration,
1048 "tool": tool_key,
1049 "offeredToolCount": allowed_tool_names.len()
1050 }),
1051 ));
1052 if tool_name_looks_like_email_action(&tool_key) {
1053 latest_email_action_note = Some(note.clone());
1054 }
1055 outputs.push(note);
1056 continue;
1057 }
1058 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1059 if blocked_mcp_servers.contains(server) {
1060 rejected_tool_call_in_cycle = true;
1061 outputs.push(format!(
1062 "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
1063 tool_key, server
1064 ));
1065 continue;
1066 }
1067 }
1068 if tool_key == "question" {
1069 question_tool_used = true;
1070 }
1071 if tool_key == "pack_builder" && pack_builder_executed {
1072 rejected_tool_call_in_cycle = true;
1073 outputs.push(
1074 "Tool `pack_builder` call skipped: already executed in this run. Provide a final response or ask any required follow-up question."
1075 .to_string(),
1076 );
1077 continue;
1078 }
1079 if websearch_query_blocked && tool_key == "websearch" {
1080 rejected_tool_call_in_cycle = true;
1081 outputs.push(
1082 "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
1083 .to_string(),
1084 );
1085 continue;
1086 }
1087 let mut effective_args = args.clone();
1088 if tool_key == "todo_write" {
1089 effective_args = normalize_todo_write_args(effective_args, &completion);
1090 if is_empty_todo_write_args(&effective_args) {
1091 rejected_tool_call_in_cycle = true;
1092 outputs.push(
1093 "Tool `todo_write` call skipped: empty todo payload."
1094 .to_string(),
1095 );
1096 continue;
1097 }
1098 }
1099 let signature = if tool_key == "batch" {
1100 batch_tool_signature(&args)
1101 .unwrap_or_else(|| tool_signature(&tool_key, &args))
1102 } else {
1103 tool_signature(&tool_key, &args)
1104 };
1105 if is_shell_tool_name(&tool_key)
1106 && shell_mismatch_signatures.contains(&signature)
1107 {
1108 rejected_tool_call_in_cycle = true;
1109 outputs.push(
1110 "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
1111 .to_string(),
1112 );
1113 continue;
1114 }
1115 let mut signature_count = 1usize;
1116 if is_read_only_tool(&tool_key)
1117 || (tool_key == "batch" && is_read_only_batch_call(&args))
1118 {
1119 let count = readonly_signature_counts
1120 .entry(signature.clone())
1121 .and_modify(|v| *v = v.saturating_add(1))
1122 .or_insert(1);
1123 signature_count = *count;
1124 if tool_key == "websearch" {
1125 if let Some(limit) = websearch_duplicate_signature_limit {
1126 if *count > limit {
1127 rejected_tool_call_in_cycle = true;
1128 self.event_bus.publish(EngineEvent::new(
1129 "tool.loop_guard.triggered",
1130 json!({
1131 "sessionID": session_id,
1132 "messageID": user_message_id,
1133 "tool": tool_key,
1134 "reason": "duplicate_signature_retry_exhausted",
1135 "duplicateLimit": limit,
1136 "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
1137 "loop_guard_triggered": true
1138 }),
1139 ));
1140 outputs.push(
1141 "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
1142 .to_string(),
1143 );
1144 continue;
1145 }
1146 }
1147 }
1148 if tool_key != "websearch" && *count > 1 {
1149 rejected_tool_call_in_cycle = true;
1150 if let Some(cached) = readonly_tool_cache.get(&signature) {
1151 outputs.push(cached.clone());
1152 } else {
1153 outputs.push(format!(
1154 "Tool `{}` call skipped: duplicate call signature detected.",
1155 tool_key
1156 ));
1157 }
1158 continue;
1159 }
1160 }
1161 let is_read_only_signature = is_read_only_tool(&tool_key)
1162 || (tool_key == "batch" && is_read_only_batch_call(&args));
1163 if !is_read_only_signature {
1164 let duplicate_limit = duplicate_signature_limit_for(&tool_key);
1165 let seen = mutable_signature_counts
1166 .entry(signature.clone())
1167 .and_modify(|v| *v = v.saturating_add(1))
1168 .or_insert(1);
1169 if *seen > duplicate_limit {
1170 rejected_tool_call_in_cycle = true;
1171 self.event_bus.publish(EngineEvent::new(
1172 "tool.loop_guard.triggered",
1173 json!({
1174 "sessionID": session_id,
1175 "messageID": user_message_id,
1176 "tool": tool_key,
1177 "reason": "duplicate_signature_retry_exhausted",
1178 "signatureHash": stable_hash(&signature),
1179 "duplicateLimit": duplicate_limit,
1180 "loop_guard_triggered": true
1181 }),
1182 ));
1183 outputs.push(format!(
1184 "Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
1185 tool_key, duplicate_limit
1186 ));
1187 duplicate_signature_hit_in_cycle = true;
1188 continue;
1189 }
1190 }
1191 let budget = tool_budget_for(&tool_key);
1192 let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
1193 if *entry >= budget {
1194 rejected_tool_call_in_cycle = true;
1195 outputs.push(format!(
1196 "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
1197 tool_key, budget
1198 ));
1199 guard_budget_hit_in_cycle = true;
1200 continue;
1201 }
1202 let mut finalized_part = WireMessagePart::tool_invocation(
1203 &session_id,
1204 &user_message_id,
1205 tool.clone(),
1206 effective_args.clone(),
1207 );
1208 finalized_part.state = Some("pending".to_string());
1209 self.event_bus.publish(EngineEvent::new(
1210 "message.part.updated",
1211 json!({"part": finalized_part}),
1212 ));
1213 *entry += 1;
1214 accepted_tool_calls_in_cycle =
1215 accepted_tool_calls_in_cycle.saturating_add(1);
1216 if let Some(output) = self
1217 .execute_tool_with_permission(
1218 &session_id,
1219 &user_message_id,
1220 tool,
1221 effective_args,
1222 active_agent.skills.as_deref(),
1223 &text,
1224 requested_write_required,
1225 Some(&completion),
1226 cancel.clone(),
1227 )
1228 .await?
1229 {
1230 let productive = is_productive_tool_output(&tool_key, &output);
1231 if output.contains("WEBSEARCH_QUERY_MISSING") {
1232 websearch_query_blocked = true;
1233 }
1234 if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
1235 {
1236 shell_mismatch_signatures.insert(signature.clone());
1237 }
1238 if is_read_only_tool(&tool_key)
1239 && tool_key != "websearch"
1240 && signature_count == 1
1241 {
1242 readonly_tool_cache.insert(signature, output.clone());
1243 }
1244 if productive {
1245 productive_tool_calls_total =
1246 productive_tool_calls_total.saturating_add(1);
1247 if is_workspace_write_tool(&tool_key) {
1248 productive_write_tool_calls_total =
1249 productive_write_tool_calls_total.saturating_add(1);
1250 }
1251 executed_productive_tool = true;
1252 if tool_key == "pack_builder" {
1253 pack_builder_executed = true;
1254 }
1255 }
1256 if tool_name_looks_like_email_action(&tool_key) {
1257 if productive {
1258 email_action_executed = true;
1259 } else {
1260 latest_email_action_note =
1261 Some(truncate_text(&output, 280).replace('\n', " "));
1262 }
1263 }
1264 if is_auth_required_tool_output(&output) {
1265 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1266 blocked_mcp_servers.insert(server.to_string());
1267 }
1268 auth_required_hit_in_cycle = true;
1269 }
1270 outputs.push(output);
1271 if auth_required_hit_in_cycle {
1272 break;
1273 }
1274 if guard_budget_hit_in_cycle {
1275 break;
1276 }
1277 }
1278 }
1279 if !outputs.is_empty() {
1280 last_tool_outputs = outputs.clone();
1281 if matches!(requested_tool_mode, ToolMode::Required)
1282 && productive_tool_calls_total == 0
1283 {
1284 latest_required_tool_failure_kind = classify_required_tool_failure(
1285 &outputs,
1286 saw_tool_call_candidate,
1287 accepted_tool_calls_in_cycle,
1288 provider_tool_parse_failed,
1289 rejected_tool_call_in_cycle,
1290 );
1291 if requested_write_required
1292 && write_tool_attempted_in_cycle
1293 && productive_write_tool_calls_total == 0
1294 && is_write_invalid_args_failure_kind(
1295 latest_required_tool_failure_kind,
1296 )
1297 {
1298 if required_write_retry_count + 1 < strict_write_retry_max_attempts
1299 {
1300 required_write_retry_count += 1;
1301 required_tool_retry_count += 1;
1302 followup_context = Some(build_write_required_retry_context(
1303 &offered_tool_preview,
1304 latest_required_tool_failure_kind,
1305 &text,
1306 ));
1307 self.event_bus.publish(EngineEvent::new(
1308 "provider.call.iteration.finish",
1309 json!({
1310 "sessionID": session_id,
1311 "messageID": user_message_id,
1312 "iteration": iteration,
1313 "finishReason": "required_write_invalid_retry",
1314 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1315 "rejectedToolCalls": 0,
1316 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1317 }),
1318 ));
1319 continue;
1320 }
1321 }
1322 if !requested_write_required && required_tool_retry_count == 0 {
1323 required_tool_retry_count += 1;
1324 followup_context = Some(build_required_tool_retry_context(
1325 &offered_tool_preview,
1326 latest_required_tool_failure_kind,
1327 ));
1328 self.event_bus.publish(EngineEvent::new(
1329 "provider.call.iteration.finish",
1330 json!({
1331 "sessionID": session_id,
1332 "messageID": user_message_id,
1333 "iteration": iteration,
1334 "finishReason": "required_tool_retry",
1335 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1336 "rejectedToolCalls": 0,
1337 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1338 }),
1339 ));
1340 continue;
1341 }
1342 completion = required_tool_mode_unsatisfied_completion(
1343 latest_required_tool_failure_kind,
1344 );
1345 if !required_tool_unsatisfied_emitted {
1346 required_tool_unsatisfied_emitted = true;
1347 self.event_bus.publish(EngineEvent::new(
1348 "tool.mode.required.unsatisfied",
1349 json!({
1350 "sessionID": session_id,
1351 "messageID": user_message_id,
1352 "iteration": iteration,
1353 "selectedToolCount": allowed_tool_names.len(),
1354 "offeredToolsPreview": offered_tool_preview,
1355 "reason": latest_required_tool_failure_kind.code(),
1356 }),
1357 ));
1358 }
1359 self.event_bus.publish(EngineEvent::new(
1360 "provider.call.iteration.finish",
1361 json!({
1362 "sessionID": session_id,
1363 "messageID": user_message_id,
1364 "iteration": iteration,
1365 "finishReason": "required_tool_unsatisfied",
1366 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1367 "rejectedToolCalls": 0,
1368 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1369 }),
1370 ));
1371 break;
1372 }
1373 if requested_write_required
1374 && productive_tool_calls_total > 0
1375 && productive_write_tool_calls_total == 0
1376 {
1377 latest_required_tool_failure_kind =
1378 RequiredToolFailureKind::WriteRequiredNotSatisfied;
1379 if required_write_retry_count + 1 < strict_write_retry_max_attempts {
1380 required_write_retry_count += 1;
1381 followup_context = Some(build_write_required_retry_context(
1382 &offered_tool_preview,
1383 latest_required_tool_failure_kind,
1384 &text,
1385 ));
1386 self.event_bus.publish(EngineEvent::new(
1387 "provider.call.iteration.finish",
1388 json!({
1389 "sessionID": session_id,
1390 "messageID": user_message_id,
1391 "iteration": iteration,
1392 "finishReason": "required_write_retry",
1393 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1394 "rejectedToolCalls": 0,
1395 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1396 }),
1397 ));
1398 continue;
1399 }
1400 completion = required_tool_mode_unsatisfied_completion(
1401 latest_required_tool_failure_kind,
1402 );
1403 if !required_tool_unsatisfied_emitted {
1404 required_tool_unsatisfied_emitted = true;
1405 self.event_bus.publish(EngineEvent::new(
1406 "tool.mode.required.unsatisfied",
1407 json!({
1408 "sessionID": session_id,
1409 "messageID": user_message_id,
1410 "iteration": iteration,
1411 "selectedToolCount": allowed_tool_names.len(),
1412 "offeredToolsPreview": offered_tool_preview,
1413 "reason": latest_required_tool_failure_kind.code(),
1414 }),
1415 ));
1416 }
1417 self.event_bus.publish(EngineEvent::new(
1418 "provider.call.iteration.finish",
1419 json!({
1420 "sessionID": session_id,
1421 "messageID": user_message_id,
1422 "iteration": iteration,
1423 "finishReason": "required_write_unsatisfied",
1424 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1425 "rejectedToolCalls": 0,
1426 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1427 }),
1428 ));
1429 break;
1430 }
1431 let guard_budget_hit =
1432 outputs.iter().any(|o| is_guard_budget_tool_output(o));
1433 if executed_productive_tool {
1434 followup_context = Some(format!(
1435 "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
1436 summarize_tool_outputs(&outputs)
1437 ));
1438 self.event_bus.publish(EngineEvent::new(
1439 "provider.call.iteration.finish",
1440 json!({
1441 "sessionID": session_id,
1442 "messageID": user_message_id,
1443 "iteration": iteration,
1444 "finishReason": "tool_followup",
1445 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1446 "rejectedToolCalls": 0,
1447 }),
1448 ));
1449 continue;
1450 }
1451 if guard_budget_hit {
1452 completion = summarize_guard_budget_outputs(&outputs)
1453 .unwrap_or_else(|| {
1454 "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
1455 });
1456 } else if duplicate_signature_hit_in_cycle {
1457 completion = summarize_duplicate_signature_outputs(&outputs)
1458 .unwrap_or_else(|| {
1459 "This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
1460 });
1461 } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
1462 completion = summary;
1463 } else {
1464 completion.clear();
1465 }
1466 self.event_bus.publish(EngineEvent::new(
1467 "provider.call.iteration.finish",
1468 json!({
1469 "sessionID": session_id,
1470 "messageID": user_message_id,
1471 "iteration": iteration,
1472 "finishReason": "tool_summary",
1473 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1474 "rejectedToolCalls": 0,
1475 }),
1476 ));
1477 break;
1478 } else if matches!(requested_tool_mode, ToolMode::Required) {
1479 latest_required_tool_failure_kind = classify_required_tool_failure(
1480 &outputs,
1481 saw_tool_call_candidate,
1482 accepted_tool_calls_in_cycle,
1483 provider_tool_parse_failed,
1484 rejected_tool_call_in_cycle,
1485 );
1486 }
1487 }
1488
1489 if let Some(usage) = provider_usage {
1490 self.event_bus.publish(EngineEvent::new(
1491 "provider.usage",
1492 json!({
1493 "sessionID": session_id,
1494 "messageID": user_message_id,
1495 "promptTokens": usage.prompt_tokens,
1496 "completionTokens": usage.completion_tokens,
1497 "totalTokens": usage.total_tokens,
1498 }),
1499 ));
1500 }
1501
1502 if matches!(requested_tool_mode, ToolMode::Required)
1503 && productive_tool_calls_total == 0
1504 {
1505 if requested_write_required
1506 && required_write_retry_count > 0
1507 && productive_write_tool_calls_total == 0
1508 && !is_write_invalid_args_failure_kind(latest_required_tool_failure_kind)
1509 {
1510 latest_required_tool_failure_kind =
1511 RequiredToolFailureKind::WriteRequiredNotSatisfied;
1512 }
1513 if requested_write_required
1514 && required_write_retry_count + 1 < strict_write_retry_max_attempts
1515 {
1516 required_write_retry_count += 1;
1517 followup_context = Some(build_write_required_retry_context(
1518 &offered_tool_preview,
1519 latest_required_tool_failure_kind,
1520 &text,
1521 ));
1522 continue;
1523 }
1524 if !requested_write_required && required_tool_retry_count == 0 {
1525 required_tool_retry_count += 1;
1526 followup_context = Some(build_required_tool_retry_context(
1527 &offered_tool_preview,
1528 latest_required_tool_failure_kind,
1529 ));
1530 continue;
1531 }
1532 completion = required_tool_mode_unsatisfied_completion(
1533 latest_required_tool_failure_kind,
1534 );
1535 if !required_tool_unsatisfied_emitted {
1536 required_tool_unsatisfied_emitted = true;
1537 self.event_bus.publish(EngineEvent::new(
1538 "tool.mode.required.unsatisfied",
1539 json!({
1540 "sessionID": session_id,
1541 "messageID": user_message_id,
1542 "iteration": iteration,
1543 "selectedToolCount": allowed_tool_names.len(),
1544 "offeredToolsPreview": offered_tool_preview,
1545 "reason": latest_required_tool_failure_kind.code(),
1546 }),
1547 ));
1548 }
1549 self.event_bus.publish(EngineEvent::new(
1550 "provider.call.iteration.finish",
1551 json!({
1552 "sessionID": session_id,
1553 "messageID": user_message_id,
1554 "iteration": iteration,
1555 "finishReason": "required_tool_unsatisfied",
1556 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1557 "rejectedToolCalls": 0,
1558 "requiredToolFailureReason": latest_required_tool_failure_kind.code(),
1559 }),
1560 ));
1561 } else {
1562 self.event_bus.publish(EngineEvent::new(
1563 "provider.call.iteration.finish",
1564 json!({
1565 "sessionID": session_id,
1566 "messageID": user_message_id,
1567 "iteration": iteration,
1568 "finishReason": "provider_completion",
1569 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1570 "rejectedToolCalls": 0,
1571 }),
1572 ));
1573 }
1574 break;
1575 }
1576 if matches!(requested_tool_mode, ToolMode::Required) && productive_tool_calls_total == 0
1577 {
1578 completion =
1579 required_tool_mode_unsatisfied_completion(latest_required_tool_failure_kind);
1580 if !required_tool_unsatisfied_emitted {
1581 self.event_bus.publish(EngineEvent::new(
1582 "tool.mode.required.unsatisfied",
1583 json!({
1584 "sessionID": session_id,
1585 "messageID": user_message_id,
1586 "selectedToolCount": tool_call_counts.len(),
1587 "reason": latest_required_tool_failure_kind.code(),
1588 }),
1589 ));
1590 }
1591 }
1592 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
1593 if let Some(narrative) = self
1594 .generate_final_narrative_without_tools(
1595 &session_id,
1596 &active_agent,
1597 Some(provider_id.as_str()),
1598 Some(model_id_value.as_str()),
1599 cancel.clone(),
1600 &last_tool_outputs,
1601 )
1602 .await
1603 {
1604 completion = narrative;
1605 }
1606 }
1607 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
1608 let preview = last_tool_outputs
1609 .iter()
1610 .take(3)
1611 .map(|o| truncate_text(o, 240))
1612 .collect::<Vec<_>>()
1613 .join("\n");
1614 completion = format!(
1615 "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
1616 preview
1617 );
1618 }
1619 if completion.trim().is_empty() {
1620 completion =
1621 "I couldn't produce a final response for that run. Please retry your request."
1622 .to_string();
1623 }
1624 if email_delivery_requested
1625 && !email_action_executed
1626 && completion_claims_email_sent(&completion)
1627 {
1628 let mut fallback = "I could not verify that an email was sent in this run. I did not complete the delivery action."
1629 .to_string();
1630 if let Some(note) = latest_email_action_note.as_ref() {
1631 fallback.push_str("\n\nLast email tool status: ");
1632 fallback.push_str(note);
1633 }
1634 fallback.push_str(
1635 "\n\nPlease retry with an explicit available email tool (for example a draft, reply, or send MCP tool in your current connector set).",
1636 );
1637 completion = fallback;
1638 }
1639 completion = strip_model_control_markers(&completion);
1640 truncate_text(&completion, 16_000)
1641 };
1642 emit_event(
1643 Level::INFO,
1644 ProcessKind::Engine,
1645 ObservabilityEvent {
1646 event: "provider.call.finish",
1647 component: "engine.loop",
1648 correlation_id: correlation_ref,
1649 session_id: Some(&session_id),
1650 run_id: None,
1651 message_id: Some(&user_message_id),
1652 provider_id: Some(provider_id.as_str()),
1653 model_id,
1654 status: Some("ok"),
1655 error_code: None,
1656 detail: Some("provider stream complete"),
1657 },
1658 );
1659 if active_agent.name.eq_ignore_ascii_case("plan") {
1660 emit_plan_todo_fallback(
1661 self.storage.clone(),
1662 &self.event_bus,
1663 &session_id,
1664 &user_message_id,
1665 &completion,
1666 )
1667 .await;
1668 let todos_after_fallback = self.storage.get_todos(&session_id).await;
1669 if todos_after_fallback.is_empty() && !question_tool_used {
1670 emit_plan_question_fallback(
1671 self.storage.clone(),
1672 &self.event_bus,
1673 &session_id,
1674 &user_message_id,
1675 &completion,
1676 )
1677 .await;
1678 }
1679 }
1680 if cancel.is_cancelled() {
1681 self.event_bus.publish(EngineEvent::new(
1682 "session.status",
1683 json!({"sessionID": session_id, "status":"cancelled"}),
1684 ));
1685 self.cancellations.remove(&session_id).await;
1686 return Ok(());
1687 }
1688 let assistant = Message::new(
1689 MessageRole::Assistant,
1690 vec![MessagePart::Text {
1691 text: completion.clone(),
1692 }],
1693 );
1694 let assistant_message_id = assistant.id.clone();
1695 self.storage.append_message(&session_id, assistant).await?;
1696 let final_part = WireMessagePart::text(
1697 &session_id,
1698 &assistant_message_id,
1699 truncate_text(&completion, 16_000),
1700 );
1701 self.event_bus.publish(EngineEvent::new(
1702 "message.part.updated",
1703 json!({"part": final_part}),
1704 ));
1705 self.event_bus.publish(EngineEvent::new(
1706 "session.updated",
1707 json!({"sessionID": session_id, "status":"idle"}),
1708 ));
1709 self.event_bus.publish(EngineEvent::new(
1710 "session.status",
1711 json!({"sessionID": session_id, "status":"idle"}),
1712 ));
1713 self.cancellations.remove(&session_id).await;
1714 Ok(())
1715 }
1716
1717 pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
1718 self.providers.default_complete(&prompt).await
1719 }
1720
1721 pub async fn run_oneshot_for_provider(
1722 &self,
1723 prompt: String,
1724 provider_id: Option<&str>,
1725 ) -> anyhow::Result<String> {
1726 self.providers
1727 .complete_for_provider(provider_id, &prompt, None)
1728 .await
1729 }
1730
1731 #[allow(clippy::too_many_arguments)]
1732 async fn execute_tool_with_permission(
1733 &self,
1734 session_id: &str,
1735 message_id: &str,
1736 tool: String,
1737 args: Value,
1738 equipped_skills: Option<&[String]>,
1739 latest_user_text: &str,
1740 write_required: bool,
1741 latest_assistant_context: Option<&str>,
1742 cancel: CancellationToken,
1743 ) -> anyhow::Result<Option<String>> {
1744 let tool = normalize_tool_name(&tool);
1745 let raw_args = args.clone();
1746 let normalized = normalize_tool_args_with_mode(
1747 &tool,
1748 args,
1749 latest_user_text,
1750 latest_assistant_context.unwrap_or_default(),
1751 if write_required {
1752 WritePathRecoveryMode::OutputTargetOnly
1753 } else {
1754 WritePathRecoveryMode::Heuristic
1755 },
1756 );
1757 self.event_bus.publish(EngineEvent::new(
1758 "tool.args.normalized",
1759 json!({
1760 "sessionID": session_id,
1761 "messageID": message_id,
1762 "tool": tool,
1763 "argsSource": normalized.args_source,
1764 "argsIntegrity": normalized.args_integrity,
1765 "rawArgsState": normalized.raw_args_state.as_str(),
1766 "query": normalized.query,
1767 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
1768 "requestID": Value::Null
1769 }),
1770 ));
1771 if normalized.args_integrity == "recovered" {
1772 self.event_bus.publish(EngineEvent::new(
1773 "tool.args.recovered",
1774 json!({
1775 "sessionID": session_id,
1776 "messageID": message_id,
1777 "tool": tool,
1778 "argsSource": normalized.args_source,
1779 "query": normalized.query,
1780 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
1781 "requestID": Value::Null
1782 }),
1783 ));
1784 }
1785 if normalized.missing_terminal {
1786 let missing_reason = normalized
1787 .missing_terminal_reason
1788 .clone()
1789 .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
1790 let raw_args_preview = truncate_text(&raw_args.to_string(), 2_000);
1791 let normalized_args_preview = truncate_text(&normalized.args.to_string(), 2_000);
1792 let latest_user_preview = truncate_text(latest_user_text, 500);
1793 let latest_assistant_preview =
1794 truncate_text(latest_assistant_context.unwrap_or_default(), 500);
1795 self.event_bus.publish(EngineEvent::new(
1796 "tool.args.missing_terminal",
1797 json!({
1798 "sessionID": session_id,
1799 "messageID": message_id,
1800 "tool": tool,
1801 "argsSource": normalized.args_source,
1802 "argsIntegrity": normalized.args_integrity,
1803 "rawArgsState": normalized.raw_args_state.as_str(),
1804 "requestID": Value::Null,
1805 "error": missing_reason,
1806 "rawArgsPreview": raw_args_preview,
1807 "normalizedArgsPreview": normalized_args_preview,
1808 "latestUserPreview": latest_user_preview,
1809 "latestAssistantPreview": latest_assistant_preview,
1810 }),
1811 ));
1812 if tool == "write" {
1813 tracing::warn!(
1814 session_id = %session_id,
1815 message_id = %message_id,
1816 tool = %tool,
1817 reason = %missing_reason,
1818 args_source = %normalized.args_source,
1819 args_integrity = %normalized.args_integrity,
1820 raw_args_state = %normalized.raw_args_state.as_str(),
1821 raw_args = %raw_args_preview,
1822 normalized_args = %normalized_args_preview,
1823 latest_user = %latest_user_preview,
1824 latest_assistant = %latest_assistant_preview,
1825 "write tool arguments missing terminal field"
1826 );
1827 }
1828 let mut failed_part = WireMessagePart::tool_result(
1829 session_id,
1830 message_id,
1831 tool.clone(),
1832 Some(raw_args.clone()),
1833 json!(null),
1834 );
1835 failed_part.state = Some("failed".to_string());
1836 let surfaced_reason =
1837 provider_specific_write_reason(&tool, &missing_reason, normalized.raw_args_state)
1838 .unwrap_or_else(|| missing_reason.clone());
1839 failed_part.error = Some(surfaced_reason.clone());
1840 self.event_bus.publish(EngineEvent::new(
1841 "message.part.updated",
1842 json!({"part": failed_part}),
1843 ));
1844 return Ok(Some(surfaced_reason));
1845 }
1846
1847 let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
1848 Ok(args) => args,
1849 Err(message) => return Ok(Some(message)),
1850 };
1851 if let Some(allowed_tools) = self
1852 .session_allowed_tools
1853 .read()
1854 .await
1855 .get(session_id)
1856 .cloned()
1857 {
1858 if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
1859 return Ok(Some(format!("Tool `{tool}` is not allowed for this run.")));
1860 }
1861 }
1862 if let Some(hook) = self.tool_policy_hook.read().await.clone() {
1863 let decision = hook
1864 .evaluate_tool(ToolPolicyContext {
1865 session_id: session_id.to_string(),
1866 message_id: message_id.to_string(),
1867 tool: tool.clone(),
1868 args: args.clone(),
1869 })
1870 .await?;
1871 if !decision.allowed {
1872 let reason = decision
1873 .reason
1874 .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
1875 let mut blocked_part = WireMessagePart::tool_result(
1876 session_id,
1877 message_id,
1878 tool.clone(),
1879 Some(args.clone()),
1880 json!(null),
1881 );
1882 blocked_part.state = Some("failed".to_string());
1883 blocked_part.error = Some(reason.clone());
1884 self.event_bus.publish(EngineEvent::new(
1885 "message.part.updated",
1886 json!({"part": blocked_part}),
1887 ));
1888 return Ok(Some(reason));
1889 }
1890 }
1891 let mut tool_call_id: Option<String> = None;
1892 if let Some(violation) = self
1893 .workspace_sandbox_violation(session_id, &tool, &args)
1894 .await
1895 {
1896 let mut blocked_part = WireMessagePart::tool_result(
1897 session_id,
1898 message_id,
1899 tool.clone(),
1900 Some(args.clone()),
1901 json!(null),
1902 );
1903 blocked_part.state = Some("failed".to_string());
1904 blocked_part.error = Some(violation.clone());
1905 self.event_bus.publish(EngineEvent::new(
1906 "message.part.updated",
1907 json!({"part": blocked_part}),
1908 ));
1909 return Ok(Some(violation));
1910 }
1911 let rule = self
1912 .plugins
1913 .permission_override(&tool)
1914 .await
1915 .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
1916 if matches!(rule, PermissionAction::Deny) {
1917 return Ok(Some(format!(
1918 "Permission denied for tool `{tool}` by policy."
1919 )));
1920 }
1921
1922 let mut effective_args = args.clone();
1923 if matches!(rule, PermissionAction::Ask) {
1924 let auto_approve_permissions = self
1925 .session_auto_approve_permissions
1926 .read()
1927 .await
1928 .get(session_id)
1929 .copied()
1930 .unwrap_or(false);
1931 if auto_approve_permissions {
1932 self.event_bus.publish(EngineEvent::new(
1933 "permission.auto_approved",
1934 json!({
1935 "sessionID": session_id,
1936 "messageID": message_id,
1937 "tool": tool,
1938 }),
1939 ));
1940 effective_args = args;
1941 } else {
1942 let pending = self
1943 .permissions
1944 .ask_for_session_with_context(
1945 Some(session_id),
1946 &tool,
1947 args.clone(),
1948 Some(crate::PermissionArgsContext {
1949 args_source: normalized.args_source.clone(),
1950 args_integrity: normalized.args_integrity.clone(),
1951 query: normalized.query.clone(),
1952 }),
1953 )
1954 .await;
1955 let mut pending_part = WireMessagePart::tool_invocation(
1956 session_id,
1957 message_id,
1958 tool.clone(),
1959 args.clone(),
1960 );
1961 pending_part.id = Some(pending.id.clone());
1962 tool_call_id = Some(pending.id.clone());
1963 pending_part.state = Some("pending".to_string());
1964 self.event_bus.publish(EngineEvent::new(
1965 "message.part.updated",
1966 json!({"part": pending_part}),
1967 ));
1968 let reply = self
1969 .permissions
1970 .wait_for_reply_with_timeout(
1971 &pending.id,
1972 cancel.clone(),
1973 Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
1974 )
1975 .await;
1976 let (reply, timed_out) = reply;
1977 if cancel.is_cancelled() {
1978 return Ok(None);
1979 }
1980 if timed_out {
1981 let timeout_ms = permission_wait_timeout_ms();
1982 self.event_bus.publish(EngineEvent::new(
1983 "permission.wait.timeout",
1984 json!({
1985 "sessionID": session_id,
1986 "messageID": message_id,
1987 "tool": tool,
1988 "requestID": pending.id,
1989 "timeoutMs": timeout_ms,
1990 }),
1991 ));
1992 let mut timeout_part = WireMessagePart::tool_result(
1993 session_id,
1994 message_id,
1995 tool.clone(),
1996 Some(args.clone()),
1997 json!(null),
1998 );
1999 timeout_part.id = Some(pending.id);
2000 timeout_part.state = Some("failed".to_string());
2001 timeout_part.error = Some(format!(
2002 "Permission request timed out after {} ms",
2003 timeout_ms
2004 ));
2005 self.event_bus.publish(EngineEvent::new(
2006 "message.part.updated",
2007 json!({"part": timeout_part}),
2008 ));
2009 return Ok(Some(format!(
2010 "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
2011 )));
2012 }
2013 let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
2014 if !approved {
2015 let mut denied_part = WireMessagePart::tool_result(
2016 session_id,
2017 message_id,
2018 tool.clone(),
2019 Some(args.clone()),
2020 json!(null),
2021 );
2022 denied_part.id = Some(pending.id);
2023 denied_part.state = Some("denied".to_string());
2024 denied_part.error = Some("Permission denied by user".to_string());
2025 self.event_bus.publish(EngineEvent::new(
2026 "message.part.updated",
2027 json!({"part": denied_part}),
2028 ));
2029 return Ok(Some(format!(
2030 "Permission denied for tool `{tool}` by user."
2031 )));
2032 }
2033 effective_args = args;
2034 }
2035 }
2036
2037 let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
2038 let tool_context = self.resolve_tool_execution_context(session_id).await;
2039 if let Some((workspace_root, effective_cwd, project_id)) = tool_context.as_ref() {
2040 if let Some(obj) = args.as_object_mut() {
2041 obj.insert(
2042 "__workspace_root".to_string(),
2043 Value::String(workspace_root.clone()),
2044 );
2045 obj.insert(
2046 "__effective_cwd".to_string(),
2047 Value::String(effective_cwd.clone()),
2048 );
2049 obj.insert(
2050 "__session_id".to_string(),
2051 Value::String(session_id.to_string()),
2052 );
2053 if let Some(project_id) = project_id.clone() {
2054 obj.insert("__project_id".to_string(), Value::String(project_id));
2055 }
2056 }
2057 tracing::info!(
2058 "tool execution context session_id={} tool={} workspace_root={} effective_cwd={} project_id={}",
2059 session_id,
2060 tool,
2061 workspace_root,
2062 effective_cwd,
2063 project_id.clone().unwrap_or_default()
2064 );
2065 }
2066 let mut invoke_part =
2067 WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
2068 if let Some(call_id) = tool_call_id.clone() {
2069 invoke_part.id = Some(call_id);
2070 }
2071 let invoke_part_id = invoke_part.id.clone();
2072 self.event_bus.publish(EngineEvent::new(
2073 "message.part.updated",
2074 json!({"part": invoke_part}),
2075 ));
2076 let args_for_side_events = args.clone();
2077 if tool == "spawn_agent" {
2078 let hook = self.spawn_agent_hook.read().await.clone();
2079 if let Some(hook) = hook {
2080 let spawned = hook
2081 .spawn_agent(SpawnAgentToolContext {
2082 session_id: session_id.to_string(),
2083 message_id: message_id.to_string(),
2084 tool_call_id: invoke_part_id.clone(),
2085 args: args_for_side_events.clone(),
2086 })
2087 .await?;
2088 let output = self.plugins.transform_tool_output(spawned.output).await;
2089 let output = truncate_text(&output, 16_000);
2090 emit_tool_side_events(
2091 self.storage.clone(),
2092 &self.event_bus,
2093 ToolSideEventContext {
2094 session_id,
2095 message_id,
2096 tool: &tool,
2097 args: &args_for_side_events,
2098 metadata: &spawned.metadata,
2099 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
2100 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
2101 },
2102 )
2103 .await;
2104 let mut result_part = WireMessagePart::tool_result(
2105 session_id,
2106 message_id,
2107 tool.clone(),
2108 Some(args_for_side_events.clone()),
2109 json!(output.clone()),
2110 );
2111 result_part.id = invoke_part_id;
2112 self.event_bus.publish(EngineEvent::new(
2113 "message.part.updated",
2114 json!({"part": result_part}),
2115 ));
2116 return Ok(Some(truncate_text(
2117 &format!("Tool `{tool}` result:\n{output}"),
2118 16_000,
2119 )));
2120 }
2121 let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
2122 let mut failed_part = WireMessagePart::tool_result(
2123 session_id,
2124 message_id,
2125 tool.clone(),
2126 Some(args_for_side_events.clone()),
2127 json!(null),
2128 );
2129 failed_part.id = invoke_part_id.clone();
2130 failed_part.state = Some("failed".to_string());
2131 failed_part.error = Some(output.to_string());
2132 self.event_bus.publish(EngineEvent::new(
2133 "message.part.updated",
2134 json!({"part": failed_part}),
2135 ));
2136 return Ok(Some(output.to_string()));
2137 }
2138 let result = match self
2139 .execute_tool_with_timeout(&tool, args, cancel.clone())
2140 .await
2141 {
2142 Ok(result) => result,
2143 Err(err) => {
2144 let err_text = err.to_string();
2145 if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
2146 let timeout_ms = tool_exec_timeout_ms();
2147 let timeout_output = format!(
2148 "Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
2149 );
2150 let mut failed_part = WireMessagePart::tool_result(
2151 session_id,
2152 message_id,
2153 tool.clone(),
2154 Some(args_for_side_events.clone()),
2155 json!(null),
2156 );
2157 failed_part.id = invoke_part_id.clone();
2158 failed_part.state = Some("failed".to_string());
2159 failed_part.error = Some(timeout_output.clone());
2160 self.event_bus.publish(EngineEvent::new(
2161 "message.part.updated",
2162 json!({"part": failed_part}),
2163 ));
2164 return Ok(Some(timeout_output));
2165 }
2166 if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
2167 self.event_bus.publish(EngineEvent::new(
2168 "mcp.auth.required",
2169 json!({
2170 "sessionID": session_id,
2171 "messageID": message_id,
2172 "tool": tool.clone(),
2173 "server": auth.server,
2174 "authorizationUrl": auth.authorization_url,
2175 "message": auth.message,
2176 "challengeId": auth.challenge_id
2177 }),
2178 ));
2179 let auth_output = format!(
2180 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
2181 tool, auth.message, auth.authorization_url
2182 );
2183 let mut result_part = WireMessagePart::tool_result(
2184 session_id,
2185 message_id,
2186 tool.clone(),
2187 Some(args_for_side_events.clone()),
2188 json!(auth_output.clone()),
2189 );
2190 result_part.id = invoke_part_id.clone();
2191 self.event_bus.publish(EngineEvent::new(
2192 "message.part.updated",
2193 json!({"part": result_part}),
2194 ));
2195 return Ok(Some(truncate_text(
2196 &format!("Tool `{tool}` result:\n{auth_output}"),
2197 16_000,
2198 )));
2199 }
2200 let mut failed_part = WireMessagePart::tool_result(
2201 session_id,
2202 message_id,
2203 tool.clone(),
2204 Some(args_for_side_events.clone()),
2205 json!(null),
2206 );
2207 failed_part.id = invoke_part_id.clone();
2208 failed_part.state = Some("failed".to_string());
2209 failed_part.error = Some(err_text.clone());
2210 self.event_bus.publish(EngineEvent::new(
2211 "message.part.updated",
2212 json!({"part": failed_part}),
2213 ));
2214 return Err(err);
2215 }
2216 };
2217 if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
2218 let event_name = if auth.pending && auth.blocked {
2219 "mcp.auth.pending"
2220 } else {
2221 "mcp.auth.required"
2222 };
2223 self.event_bus.publish(EngineEvent::new(
2224 event_name,
2225 json!({
2226 "sessionID": session_id,
2227 "messageID": message_id,
2228 "tool": tool.clone(),
2229 "server": auth.server,
2230 "authorizationUrl": auth.authorization_url,
2231 "message": auth.message,
2232 "challengeId": auth.challenge_id,
2233 "pending": auth.pending,
2234 "blocked": auth.blocked,
2235 "retryAfterMs": auth.retry_after_ms
2236 }),
2237 ));
2238 }
2239 emit_tool_side_events(
2240 self.storage.clone(),
2241 &self.event_bus,
2242 ToolSideEventContext {
2243 session_id,
2244 message_id,
2245 tool: &tool,
2246 args: &args_for_side_events,
2247 metadata: &result.metadata,
2248 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
2249 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
2250 },
2251 )
2252 .await;
2253 let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
2254 if auth.pending && auth.blocked {
2255 let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
2256 format!(
2257 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
2258 tool, auth.message, auth.authorization_url, retry_after_secs
2259 )
2260 } else {
2261 format!(
2262 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
2263 tool, auth.message, auth.authorization_url
2264 )
2265 }
2266 } else {
2267 self.plugins.transform_tool_output(result.output).await
2268 };
2269 let output = truncate_text(&output, 16_000);
2270 let mut result_part = WireMessagePart::tool_result(
2271 session_id,
2272 message_id,
2273 tool.clone(),
2274 Some(args_for_side_events.clone()),
2275 json!(output.clone()),
2276 );
2277 result_part.id = invoke_part_id;
2278 self.event_bus.publish(EngineEvent::new(
2279 "message.part.updated",
2280 json!({"part": result_part}),
2281 ));
2282 Ok(Some(truncate_text(
2283 &format!("Tool `{tool}` result:\n{output}"),
2284 16_000,
2285 )))
2286 }
2287
2288 async fn execute_tool_with_timeout(
2289 &self,
2290 tool: &str,
2291 args: Value,
2292 cancel: CancellationToken,
2293 ) -> anyhow::Result<tandem_types::ToolResult> {
2294 let timeout_ms = tool_exec_timeout_ms() as u64;
2295 match tokio::time::timeout(
2296 Duration::from_millis(timeout_ms),
2297 self.tools.execute_with_cancel(tool, args, cancel),
2298 )
2299 .await
2300 {
2301 Ok(result) => result,
2302 Err(_) => anyhow::bail!("TOOL_EXEC_TIMEOUT_MS_EXCEEDED({timeout_ms})"),
2303 }
2304 }
2305
2306 async fn find_recent_matching_user_message_id(
2307 &self,
2308 session_id: &str,
2309 text: &str,
2310 ) -> Option<String> {
2311 let session = self.storage.get_session(session_id).await?;
2312 let last = session.messages.last()?;
2313 if !matches!(last.role, MessageRole::User) {
2314 return None;
2315 }
2316 let age_ms = (Utc::now() - last.created_at).num_milliseconds().max(0) as u64;
2317 if age_ms > 10_000 {
2318 return None;
2319 }
2320 let last_text = last
2321 .parts
2322 .iter()
2323 .filter_map(|part| match part {
2324 MessagePart::Text { text } => Some(text.clone()),
2325 _ => None,
2326 })
2327 .collect::<Vec<_>>()
2328 .join("\n");
2329 if last_text == text {
2330 return Some(last.id.clone());
2331 }
2332 None
2333 }
2334
2335 async fn auto_rename_session_from_user_text(&self, session_id: &str, fallback_text: &str) {
2336 let Some(mut session) = self.storage.get_session(session_id).await else {
2337 return;
2338 };
2339 if !title_needs_repair(&session.title) {
2340 return;
2341 }
2342
2343 let first_user_text = session.messages.iter().find_map(|message| {
2344 if !matches!(message.role, MessageRole::User) {
2345 return None;
2346 }
2347 message.parts.iter().find_map(|part| match part {
2348 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.clone()),
2349 _ => None,
2350 })
2351 });
2352
2353 let source = first_user_text.unwrap_or_else(|| fallback_text.to_string());
2354 let Some(title) = derive_session_title_from_prompt(&source, 60) else {
2355 return;
2356 };
2357
2358 session.title = title;
2359 session.time.updated = Utc::now();
2360 let _ = self.storage.save_session(session).await;
2361 }
2362
2363 async fn workspace_sandbox_violation(
2364 &self,
2365 session_id: &str,
2366 tool: &str,
2367 args: &Value,
2368 ) -> Option<String> {
2369 if self.workspace_override_active(session_id).await {
2370 return None;
2371 }
2372 let session = self.storage.get_session(session_id).await?;
2373 let workspace = session
2374 .workspace_root
2375 .or_else(|| crate::normalize_workspace_path(&session.directory))?;
2376 let workspace_path = PathBuf::from(&workspace);
2377 let candidate_paths = extract_tool_candidate_paths(tool, args);
2378 if candidate_paths.is_empty() {
2379 if is_shell_tool_name(tool) {
2380 if let Some(command) = extract_shell_command(args) {
2381 if shell_command_targets_sensitive_path(&command) {
2382 return Some(format!(
2383 "Sandbox blocked `{tool}` command targeting sensitive paths."
2384 ));
2385 }
2386 }
2387 }
2388 return None;
2389 }
2390 if let Some(sensitive) = candidate_paths.iter().find(|path| {
2391 let raw = Path::new(path);
2392 let resolved = if raw.is_absolute() {
2393 raw.to_path_buf()
2394 } else {
2395 workspace_path.join(raw)
2396 };
2397 is_sensitive_path_candidate(&resolved)
2398 }) {
2399 return Some(format!(
2400 "Sandbox blocked `{tool}` path `{sensitive}` (sensitive path policy)."
2401 ));
2402 }
2403
2404 let outside = candidate_paths.iter().find(|path| {
2405 let raw = Path::new(path);
2406 let resolved = if raw.is_absolute() {
2407 raw.to_path_buf()
2408 } else {
2409 workspace_path.join(raw)
2410 };
2411 !crate::is_within_workspace_root(&resolved, &workspace_path)
2412 })?;
2413 Some(format!(
2414 "Sandbox blocked `{tool}` path `{outside}` (workspace root: `{workspace}`)"
2415 ))
2416 }
2417
2418 async fn resolve_tool_execution_context(
2419 &self,
2420 session_id: &str,
2421 ) -> Option<(String, String, Option<String>)> {
2422 let session = self.storage.get_session(session_id).await?;
2423 let workspace_root = session
2424 .workspace_root
2425 .or_else(|| crate::normalize_workspace_path(&session.directory))?;
2426 let effective_cwd = if session.directory.trim().is_empty()
2427 || session.directory.trim() == "."
2428 {
2429 workspace_root.clone()
2430 } else {
2431 crate::normalize_workspace_path(&session.directory).unwrap_or(workspace_root.clone())
2432 };
2433 let project_id = session
2434 .project_id
2435 .clone()
2436 .or_else(|| crate::workspace_project_id(&workspace_root));
2437 Some((workspace_root, effective_cwd, project_id))
2438 }
2439
2440 async fn workspace_override_active(&self, session_id: &str) -> bool {
2441 let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
2442 let mut overrides = self.workspace_overrides.write().await;
2443 overrides.retain(|_, expires_at| *expires_at > now);
2444 overrides
2445 .get(session_id)
2446 .map(|expires_at| *expires_at > now)
2447 .unwrap_or(false)
2448 }
2449
2450 async fn generate_final_narrative_without_tools(
2451 &self,
2452 session_id: &str,
2453 active_agent: &AgentDefinition,
2454 provider_hint: Option<&str>,
2455 model_id: Option<&str>,
2456 cancel: CancellationToken,
2457 tool_outputs: &[String],
2458 ) -> Option<String> {
2459 if cancel.is_cancelled() {
2460 return None;
2461 }
2462 let mut messages = load_chat_history(
2463 self.storage.clone(),
2464 session_id,
2465 ChatHistoryProfile::Standard,
2466 )
2467 .await;
2468 let mut system_parts = vec![tandem_runtime_system_prompt(
2469 &self.host_runtime_context,
2470 &[],
2471 )];
2472 if let Some(system) = active_agent.system_prompt.as_ref() {
2473 system_parts.push(system.clone());
2474 }
2475 messages.insert(
2476 0,
2477 ChatMessage {
2478 role: "system".to_string(),
2479 content: system_parts.join("\n\n"),
2480 attachments: Vec::new(),
2481 },
2482 );
2483 messages.push(ChatMessage {
2484 role: "user".to_string(),
2485 content: format!(
2486 "Tool observations:\n{}\n\nProvide a direct final answer now. Do not call tools.",
2487 summarize_tool_outputs(tool_outputs)
2488 ),
2489 attachments: Vec::new(),
2490 });
2491 let stream = self
2492 .providers
2493 .stream_for_provider(
2494 provider_hint,
2495 model_id,
2496 messages,
2497 ToolMode::None,
2498 None,
2499 cancel.clone(),
2500 )
2501 .await
2502 .ok()?;
2503 tokio::pin!(stream);
2504 let mut completion = String::new();
2505 while let Some(chunk) = stream.next().await {
2506 if cancel.is_cancelled() {
2507 return None;
2508 }
2509 match chunk {
2510 Ok(StreamChunk::TextDelta(delta)) => {
2511 let delta = strip_model_control_markers(&delta);
2512 if !delta.trim().is_empty() {
2513 completion.push_str(&delta);
2514 }
2515 }
2516 Ok(StreamChunk::Done { .. }) => break,
2517 Ok(_) => {}
2518 Err(_) => return None,
2519 }
2520 }
2521 let completion = truncate_text(&strip_model_control_markers(&completion), 16_000);
2522 if completion.trim().is_empty() {
2523 None
2524 } else {
2525 Some(completion)
2526 }
2527 }
2528}
2529
2530fn resolve_model_route(
2531 request_model: Option<&ModelSpec>,
2532 session_model: Option<&ModelSpec>,
2533) -> Option<(String, String)> {
2534 fn normalize(spec: &ModelSpec) -> Option<(String, String)> {
2535 let provider_id = spec.provider_id.trim();
2536 let model_id = spec.model_id.trim();
2537 if provider_id.is_empty() || model_id.is_empty() {
2538 return None;
2539 }
2540 Some((provider_id.to_string(), model_id.to_string()))
2541 }
2542
2543 request_model
2544 .and_then(normalize)
2545 .or_else(|| session_model.and_then(normalize))
2546}
2547
2548fn strip_model_control_markers(input: &str) -> String {
2549 let mut cleaned = input.to_string();
2550 for marker in ["<|eom|>", "<|eot_id|>", "<|im_end|>", "<|end|>"] {
2551 if cleaned.contains(marker) {
2552 cleaned = cleaned.replace(marker, "");
2553 }
2554 }
2555 cleaned
2556}
2557
2558fn truncate_text(input: &str, max_len: usize) -> String {
2559 if input.len() <= max_len {
2560 return input.to_string();
2561 }
2562 let mut out = input[..max_len].to_string();
2563 out.push_str("...<truncated>");
2564 out
2565}
2566
2567fn provider_error_code(error_text: &str) -> &'static str {
2568 let lower = error_text.to_lowercase();
2569 if lower.contains("invalid_function_parameters")
2570 || lower.contains("array schema missing items")
2571 || lower.contains("tool schema")
2572 {
2573 return "TOOL_SCHEMA_INVALID";
2574 }
2575 if lower.contains("rate limit") || lower.contains("too many requests") || lower.contains("429")
2576 {
2577 return "RATE_LIMIT_EXCEEDED";
2578 }
2579 if lower.contains("context length")
2580 || lower.contains("max tokens")
2581 || lower.contains("token limit")
2582 {
2583 return "CONTEXT_LENGTH_EXCEEDED";
2584 }
2585 if lower.contains("unauthorized")
2586 || lower.contains("authentication")
2587 || lower.contains("401")
2588 || lower.contains("403")
2589 {
2590 return "AUTHENTICATION_ERROR";
2591 }
2592 if lower.contains("timeout") || lower.contains("timed out") {
2593 return "TIMEOUT";
2594 }
2595 if lower.contains("server error")
2596 || lower.contains("500")
2597 || lower.contains("502")
2598 || lower.contains("503")
2599 || lower.contains("504")
2600 {
2601 return "PROVIDER_SERVER_ERROR";
2602 }
2603 "PROVIDER_REQUEST_FAILED"
2604}
2605
2606fn normalize_tool_name(name: &str) -> String {
2607 let mut normalized = name.trim().to_ascii_lowercase().replace('-', "_");
2608 for prefix in [
2609 "default_api:",
2610 "default_api.",
2611 "functions.",
2612 "function.",
2613 "tools.",
2614 "tool.",
2615 "builtin:",
2616 "builtin.",
2617 ] {
2618 if let Some(rest) = normalized.strip_prefix(prefix) {
2619 let trimmed = rest.trim();
2620 if !trimmed.is_empty() {
2621 normalized = trimmed.to_string();
2622 break;
2623 }
2624 }
2625 }
2626 match normalized.as_str() {
2627 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
2628 "run_command" | "shell" | "powershell" | "cmd" => "bash".to_string(),
2629 other => other.to_string(),
2630 }
2631}
2632
2633fn mcp_server_from_tool_name(tool_name: &str) -> Option<&str> {
2634 let mut parts = tool_name.split('.');
2635 let prefix = parts.next()?;
2636 if prefix != "mcp" {
2637 return None;
2638 }
2639 parts.next().filter(|server| !server.is_empty())
2640}
2641
2642fn requires_web_research_prompt(input: &str) -> bool {
2643 let lower = input.to_ascii_lowercase();
2644 [
2645 "research",
2646 "top news",
2647 "today's news",
2648 "todays news",
2649 "with links",
2650 "latest headlines",
2651 "current events",
2652 ]
2653 .iter()
2654 .any(|needle| lower.contains(needle))
2655}
2656
2657fn requires_email_delivery_prompt(input: &str) -> bool {
2658 let lower = input.to_ascii_lowercase();
2659 (lower.contains("send") && lower.contains("email"))
2660 || (lower.contains("send") && lower.contains('@') && lower.contains("to"))
2661 || lower.contains("email to")
2662}
2663
2664fn has_web_research_tools(schemas: &[ToolSchema]) -> bool {
2665 schemas.iter().any(|schema| {
2666 let name = normalize_tool_name(&schema.name);
2667 name == "websearch" || name == "webfetch" || name == "webfetch_html"
2668 })
2669}
2670
2671fn has_email_action_tools(schemas: &[ToolSchema]) -> bool {
2672 schemas
2673 .iter()
2674 .map(|schema| normalize_tool_name(&schema.name))
2675 .any(|name| tool_name_looks_like_email_action(&name))
2676}
2677
2678fn tool_name_looks_like_email_action(name: &str) -> bool {
2679 let normalized = normalize_tool_name(name);
2680 if normalized.starts_with("mcp.") {
2681 return normalized.contains("gmail")
2682 || normalized.contains("mail")
2683 || normalized.contains("email");
2684 }
2685 normalized.contains("mail") || normalized.contains("email")
2686}
2687
2688fn completion_claims_email_sent(text: &str) -> bool {
2689 let lower = text.to_ascii_lowercase();
2690 let has_email_marker = lower.contains("email status")
2691 || lower.contains("emailed")
2692 || lower.contains("email sent")
2693 || lower.contains("sent to");
2694 has_email_marker
2695 && (lower.contains("sent")
2696 || lower.contains("delivered")
2697 || lower.contains("has been sent"))
2698}
2699
2700fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
2701 let Some(obj) = args.as_object() else {
2702 return Vec::new();
2703 };
2704 let keys: &[&str] = match tool {
2705 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
2706 "glob" => &["pattern"],
2707 "lsp" => &["filePath", "path"],
2708 "bash" => &["cwd"],
2709 "apply_patch" => &[],
2710 _ => &["path", "cwd"],
2711 };
2712 keys.iter()
2713 .filter_map(|key| obj.get(*key))
2714 .filter_map(|value| value.as_str())
2715 .filter(|s| !s.trim().is_empty())
2716 .map(ToString::to_string)
2717 .collect()
2718}
2719
2720fn agent_can_use_tool(agent: &AgentDefinition, tool_name: &str) -> bool {
2721 let target = normalize_tool_name(tool_name);
2722 match agent.tools.as_ref() {
2723 None => true,
2724 Some(list) => {
2725 let normalized = list
2726 .iter()
2727 .map(|t| normalize_tool_name(t))
2728 .collect::<Vec<_>>();
2729 any_policy_matches(&normalized, &target)
2730 }
2731 }
2732}
2733
2734fn enforce_skill_scope(
2735 tool_name: &str,
2736 args: Value,
2737 equipped_skills: Option<&[String]>,
2738) -> Result<Value, String> {
2739 if normalize_tool_name(tool_name) != "skill" {
2740 return Ok(args);
2741 }
2742 let Some(configured) = equipped_skills else {
2743 return Ok(args);
2744 };
2745
2746 let mut allowed = configured
2747 .iter()
2748 .map(|s| s.trim().to_string())
2749 .filter(|s| !s.is_empty())
2750 .collect::<Vec<_>>();
2751 if allowed
2752 .iter()
2753 .any(|s| s == "*" || s.eq_ignore_ascii_case("all"))
2754 {
2755 return Ok(args);
2756 }
2757 allowed.sort();
2758 allowed.dedup();
2759 if allowed.is_empty() {
2760 return Err("No skills are equipped for this agent.".to_string());
2761 }
2762
2763 let requested = args
2764 .get("name")
2765 .and_then(|v| v.as_str())
2766 .map(|v| v.trim().to_string())
2767 .unwrap_or_default();
2768 if !requested.is_empty() && !allowed.iter().any(|s| s == &requested) {
2769 return Err(format!(
2770 "Skill '{}' is not equipped for this agent. Equipped skills: {}",
2771 requested,
2772 allowed.join(", ")
2773 ));
2774 }
2775
2776 let mut out = if let Some(obj) = args.as_object() {
2777 Value::Object(obj.clone())
2778 } else {
2779 json!({})
2780 };
2781 if let Some(obj) = out.as_object_mut() {
2782 obj.insert("allowed_skills".to_string(), json!(allowed));
2783 }
2784 Ok(out)
2785}
2786
2787fn is_read_only_tool(tool_name: &str) -> bool {
2788 matches!(
2789 normalize_tool_name(tool_name).as_str(),
2790 "glob"
2791 | "read"
2792 | "grep"
2793 | "search"
2794 | "codesearch"
2795 | "list"
2796 | "ls"
2797 | "lsp"
2798 | "websearch"
2799 | "webfetch"
2800 | "webfetch_html"
2801 )
2802}
2803
2804fn is_workspace_write_tool(tool_name: &str) -> bool {
2805 matches!(
2806 normalize_tool_name(tool_name).as_str(),
2807 "write" | "edit" | "apply_patch"
2808 )
2809}
2810
2811fn is_batch_wrapper_tool_name(name: &str) -> bool {
2812 matches!(
2813 normalize_tool_name(name).as_str(),
2814 "default_api" | "default" | "api" | "function" | "functions" | "tool" | "tools"
2815 )
2816}
2817
2818fn non_empty_string_at<'a>(obj: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
2819 obj.get(key)
2820 .and_then(|v| v.as_str())
2821 .map(str::trim)
2822 .filter(|s| !s.is_empty())
2823}
2824
2825fn nested_non_empty_string_at<'a>(
2826 obj: &'a Map<String, Value>,
2827 parent: &str,
2828 key: &str,
2829) -> Option<&'a str> {
2830 obj.get(parent)
2831 .and_then(|v| v.as_object())
2832 .and_then(|nested| nested.get(key))
2833 .and_then(|v| v.as_str())
2834 .map(str::trim)
2835 .filter(|s| !s.is_empty())
2836}
2837
2838fn extract_batch_calls(args: &Value) -> Vec<(String, Value)> {
2839 let calls = args
2840 .get("tool_calls")
2841 .and_then(|v| v.as_array())
2842 .cloned()
2843 .unwrap_or_default();
2844 calls
2845 .into_iter()
2846 .filter_map(|call| {
2847 let obj = call.as_object()?;
2848 let tool_raw = non_empty_string_at(obj, "tool")
2849 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
2850 .or_else(|| nested_non_empty_string_at(obj, "function", "tool"))
2851 .or_else(|| nested_non_empty_string_at(obj, "function_call", "tool"))
2852 .or_else(|| nested_non_empty_string_at(obj, "call", "tool"));
2853 let name_raw = non_empty_string_at(obj, "name")
2854 .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
2855 .or_else(|| nested_non_empty_string_at(obj, "function_call", "name"))
2856 .or_else(|| nested_non_empty_string_at(obj, "call", "name"))
2857 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"));
2858 let effective = match (tool_raw, name_raw) {
2859 (Some(t), Some(n)) if is_batch_wrapper_tool_name(t) => n,
2860 (Some(t), _) => t,
2861 (None, Some(n)) => n,
2862 (None, None) => return None,
2863 };
2864 let normalized = normalize_tool_name(effective);
2865 let call_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
2866 Some((normalized, call_args))
2867 })
2868 .collect()
2869}
2870
2871fn is_read_only_batch_call(args: &Value) -> bool {
2872 let calls = extract_batch_calls(args);
2873 !calls.is_empty() && calls.iter().all(|(tool, _)| is_read_only_tool(tool))
2874}
2875
2876fn batch_tool_signature(args: &Value) -> Option<String> {
2877 let calls = extract_batch_calls(args);
2878 if calls.is_empty() {
2879 return None;
2880 }
2881 let parts = calls
2882 .into_iter()
2883 .map(|(tool, call_args)| tool_signature(&tool, &call_args))
2884 .collect::<Vec<_>>();
2885 Some(format!("batch:{}", parts.join("|")))
2886}
2887
2888fn is_productive_tool_output(tool_name: &str, output: &str) -> bool {
2889 let normalized_tool = normalize_tool_name(tool_name);
2890 if normalized_tool == "batch" && is_non_productive_batch_output(output) {
2891 return false;
2892 }
2893 if is_auth_required_tool_output(output) {
2894 return false;
2895 }
2896 let Some(result_body) = extract_tool_result_body(output) else {
2897 return false;
2898 };
2899 !is_non_productive_tool_result_body(result_body)
2900}
2901
2902fn extract_tool_result_body(output: &str) -> Option<&str> {
2903 let trimmed = output.trim();
2904 let rest = trimmed.strip_prefix("Tool `")?;
2905 let (_, result_body) = rest.split_once("` result:")?;
2906 Some(result_body.trim())
2907}
2908
2909fn is_non_productive_tool_result_body(output: &str) -> bool {
2910 let trimmed = output.trim();
2911 if trimmed.is_empty() {
2912 return true;
2913 }
2914 let lower = trimmed.to_ascii_lowercase();
2915 lower.starts_with("unknown tool:")
2916 || lower.contains("call skipped")
2917 || lower.contains("guard budget exceeded")
2918 || lower.contains("invalid_function_parameters")
2919 || is_terminal_tool_error_reason(trimmed)
2920}
2921
2922fn is_terminal_tool_error_reason(output: &str) -> bool {
2923 let first_line = output.lines().next().unwrap_or_default().trim();
2924 if first_line.is_empty() {
2925 return false;
2926 }
2927 let normalized = first_line.to_ascii_uppercase();
2928 matches!(
2929 normalized.as_str(),
2930 "TOOL_ARGUMENTS_MISSING"
2931 | "WEBSEARCH_QUERY_MISSING"
2932 | "BASH_COMMAND_MISSING"
2933 | "FILE_PATH_MISSING"
2934 | "WRITE_CONTENT_MISSING"
2935 | "WRITE_ARGS_EMPTY_FROM_PROVIDER"
2936 | "WRITE_ARGS_UNPARSEABLE_FROM_PROVIDER"
2937 | "WEBFETCH_URL_MISSING"
2938 | "PACK_BUILDER_PLAN_ID_MISSING"
2939 | "PACK_BUILDER_GOAL_MISSING"
2940 | "PROVIDER_REQUEST_FAILED"
2941 | "AUTHENTICATION_ERROR"
2942 | "CONTEXT_LENGTH_EXCEEDED"
2943 | "RATE_LIMIT_EXCEEDED"
2944 ) || normalized.ends_with("_MISSING")
2945 || normalized.ends_with("_ERROR")
2946}
2947
2948fn is_non_productive_batch_output(output: &str) -> bool {
2949 let Ok(value) = serde_json::from_str::<Value>(output.trim()) else {
2950 return false;
2951 };
2952 let Some(items) = value.as_array() else {
2953 return false;
2954 };
2955 if items.is_empty() {
2956 return true;
2957 }
2958 items.iter().all(|item| {
2959 let text = item
2960 .get("output")
2961 .and_then(|v| v.as_str())
2962 .map(str::trim)
2963 .unwrap_or_default()
2964 .to_ascii_lowercase();
2965 text.is_empty()
2966 || text.starts_with("unknown tool:")
2967 || text.contains("call skipped")
2968 || text.contains("guard budget exceeded")
2969 })
2970}
2971
2972fn is_auth_required_tool_output(output: &str) -> bool {
2973 let lower = output.to_ascii_lowercase();
2974 (lower.contains("authorization required")
2975 || lower.contains("requires authorization")
2976 || lower.contains("authorization pending"))
2977 && (lower.contains("authorize here") || lower.contains("http"))
2978}
2979
2980#[derive(Debug, Clone)]
2981struct McpAuthRequiredMetadata {
2982 challenge_id: String,
2983 authorization_url: String,
2984 message: String,
2985 server: Option<String>,
2986 pending: bool,
2987 blocked: bool,
2988 retry_after_ms: Option<u64>,
2989}
2990
2991fn extract_mcp_auth_required_metadata(metadata: &Value) -> Option<McpAuthRequiredMetadata> {
2992 let auth = metadata.get("mcpAuth")?;
2993 if !auth
2994 .get("required")
2995 .and_then(|v| v.as_bool())
2996 .unwrap_or(false)
2997 {
2998 return None;
2999 }
3000 let authorization_url = auth
3001 .get("authorizationUrl")
3002 .and_then(|v| v.as_str())
3003 .map(str::trim)
3004 .filter(|v| !v.is_empty())?
3005 .to_string();
3006 let message = auth
3007 .get("message")
3008 .and_then(|v| v.as_str())
3009 .map(str::trim)
3010 .filter(|v| !v.is_empty())
3011 .unwrap_or("This tool requires authorization before it can run.")
3012 .to_string();
3013 let challenge_id = auth
3014 .get("challengeId")
3015 .and_then(|v| v.as_str())
3016 .map(str::trim)
3017 .filter(|v| !v.is_empty())
3018 .unwrap_or("unknown")
3019 .to_string();
3020 let server = metadata
3021 .get("server")
3022 .and_then(|v| v.as_str())
3023 .map(str::trim)
3024 .filter(|v| !v.is_empty())
3025 .map(ToString::to_string);
3026 let pending = auth
3027 .get("pending")
3028 .and_then(|v| v.as_bool())
3029 .unwrap_or(false);
3030 let blocked = auth
3031 .get("blocked")
3032 .and_then(|v| v.as_bool())
3033 .unwrap_or(false);
3034 let retry_after_ms = auth.get("retryAfterMs").and_then(|v| v.as_u64());
3035 Some(McpAuthRequiredMetadata {
3036 challenge_id,
3037 authorization_url,
3038 message,
3039 server,
3040 pending,
3041 blocked,
3042 retry_after_ms,
3043 })
3044}
3045
3046fn extract_mcp_auth_required_from_error_text(
3047 tool_name: &str,
3048 error_text: &str,
3049) -> Option<McpAuthRequiredMetadata> {
3050 let lower = error_text.to_ascii_lowercase();
3051 let auth_hint = lower.contains("authorization")
3052 || lower.contains("oauth")
3053 || lower.contains("invalid oauth token")
3054 || lower.contains("requires authorization");
3055 if !auth_hint {
3056 return None;
3057 }
3058 let authorization_url = find_first_url(error_text)?;
3059 let challenge_id = stable_hash(&format!("{tool_name}:{authorization_url}"));
3060 let server = tool_name
3061 .strip_prefix("mcp.")
3062 .and_then(|rest| rest.split('.').next())
3063 .filter(|s| !s.is_empty())
3064 .map(ToString::to_string);
3065 Some(McpAuthRequiredMetadata {
3066 challenge_id,
3067 authorization_url,
3068 message: "This integration requires authorization before this action can run.".to_string(),
3069 server,
3070 pending: false,
3071 blocked: false,
3072 retry_after_ms: None,
3073 })
3074}
3075
3076fn summarize_auth_pending_outputs(outputs: &[String]) -> Option<String> {
3077 if outputs.is_empty()
3078 || !outputs
3079 .iter()
3080 .all(|output| is_auth_required_tool_output(output))
3081 {
3082 return None;
3083 }
3084 let mut auth_lines = outputs
3085 .iter()
3086 .filter_map(|output| {
3087 let trimmed = output.trim();
3088 if trimmed.is_empty() {
3089 None
3090 } else {
3091 Some(trimmed.to_string())
3092 }
3093 })
3094 .collect::<Vec<_>>();
3095 auth_lines.sort();
3096 auth_lines.dedup();
3097 if auth_lines.is_empty() {
3098 return None;
3099 }
3100 Some(format!(
3101 "Authorization is required before I can continue with this action.\n\n{}",
3102 auth_lines.join("\n\n")
3103 ))
3104}
3105
3106fn summarize_guard_budget_outputs(outputs: &[String]) -> Option<String> {
3107 if outputs.is_empty()
3108 || !outputs
3109 .iter()
3110 .all(|output| is_guard_budget_tool_output(output))
3111 {
3112 return None;
3113 }
3114 let mut lines = outputs
3115 .iter()
3116 .filter_map(|output| {
3117 let trimmed = output.trim();
3118 if trimmed.is_empty() {
3119 None
3120 } else {
3121 Some(trimmed.to_string())
3122 }
3123 })
3124 .collect::<Vec<_>>();
3125 lines.sort();
3126 lines.dedup();
3127 if lines.is_empty() {
3128 return None;
3129 }
3130 Some(format!(
3131 "This run hit the per-run tool guard budget, so I paused tool execution to avoid runaway retries.\n\n{}\n\nSend a new message to start a fresh run.",
3132 lines.join("\n")
3133 ))
3134}
3135
3136fn summarize_duplicate_signature_outputs(outputs: &[String]) -> Option<String> {
3137 if outputs.is_empty()
3138 || !outputs
3139 .iter()
3140 .all(|output| is_duplicate_signature_limit_output(output))
3141 {
3142 return None;
3143 }
3144 let mut lines = outputs
3145 .iter()
3146 .filter_map(|output| {
3147 let trimmed = output.trim();
3148 if trimmed.is_empty() {
3149 None
3150 } else {
3151 Some(trimmed.to_string())
3152 }
3153 })
3154 .collect::<Vec<_>>();
3155 lines.sort();
3156 lines.dedup();
3157 if lines.is_empty() {
3158 return None;
3159 }
3160 Some(format!(
3161 "This run paused because the same tool call kept repeating.\n\n{}\n\nRephrase the request or start a new message with a clearer command target.",
3162 lines.join("\n")
3163 ))
3164}
3165
3166const REQUIRED_TOOL_MODE_UNSATISFIED_REASON: &str = "TOOL_MODE_REQUIRED_NOT_SATISFIED";
3167
3168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3169enum RequiredToolFailureKind {
3170 NoToolCallEmitted,
3171 ToolCallParseFailed,
3172 ToolCallInvalidArgs,
3173 WriteArgsEmptyFromProvider,
3174 WriteArgsUnparseableFromProvider,
3175 ToolCallRejectedByPolicy,
3176 ToolCallExecutedNonProductive,
3177 WriteRequiredNotSatisfied,
3178}
3179
3180impl RequiredToolFailureKind {
3181 fn code(self) -> &'static str {
3182 match self {
3183 Self::NoToolCallEmitted => "NO_TOOL_CALL_EMITTED",
3184 Self::ToolCallParseFailed => "TOOL_CALL_PARSE_FAILED",
3185 Self::ToolCallInvalidArgs => "TOOL_CALL_INVALID_ARGS",
3186 Self::WriteArgsEmptyFromProvider => "WRITE_ARGS_EMPTY_FROM_PROVIDER",
3187 Self::WriteArgsUnparseableFromProvider => "WRITE_ARGS_UNPARSEABLE_FROM_PROVIDER",
3188 Self::ToolCallRejectedByPolicy => "TOOL_CALL_REJECTED_BY_POLICY",
3189 Self::ToolCallExecutedNonProductive => "TOOL_CALL_EXECUTED_NON_PRODUCTIVE",
3190 Self::WriteRequiredNotSatisfied => "WRITE_REQUIRED_NOT_SATISFIED",
3191 }
3192 }
3193}
3194
3195fn required_tool_mode_unsatisfied_completion(reason: RequiredToolFailureKind) -> String {
3196 format!(
3197 "{REQUIRED_TOOL_MODE_UNSATISFIED_REASON}: {}: tool_mode=required but the model ended without executing a productive tool call.",
3198 reason.code()
3199 )
3200}
3201
3202fn build_required_tool_retry_context(
3203 offered_tool_preview: &str,
3204 previous_reason: RequiredToolFailureKind,
3205) -> String {
3206 let offered = offered_tool_preview.trim();
3207 let available_tools = if offered.is_empty() {
3208 "Use one of the tools offered in this turn before you produce final text.".to_string()
3209 } else {
3210 format!("Use one of these offered tools before you produce final text: {offered}.")
3211 };
3212 let execution_instruction = if previous_reason
3213 == RequiredToolFailureKind::WriteRequiredNotSatisfied
3214 {
3215 "Inspection is complete; now create or modify workspace files with write, edit, or apply_patch.".to_string()
3216 } else if is_write_invalid_args_failure_kind(previous_reason) {
3217 "Previous tool call arguments were invalid. If you use write, include both `path` and the full `content`. If inspection is already complete, use write, edit, or apply_patch now.".to_string()
3218 } else {
3219 available_tools
3220 };
3221 format!(
3222 "Tool access is mandatory for this request. Previous attempt failed with {}. Execute at least one valid offered tool call before any final text. {}",
3223 previous_reason.code(),
3224 execution_instruction
3225 )
3226}
3227
3228fn is_write_invalid_args_failure_kind(reason: RequiredToolFailureKind) -> bool {
3229 matches!(
3230 reason,
3231 RequiredToolFailureKind::ToolCallInvalidArgs
3232 | RequiredToolFailureKind::WriteArgsEmptyFromProvider
3233 | RequiredToolFailureKind::WriteArgsUnparseableFromProvider
3234 )
3235}
3236
3237fn build_write_required_retry_context(
3238 offered_tool_preview: &str,
3239 previous_reason: RequiredToolFailureKind,
3240 latest_user_text: &str,
3241) -> String {
3242 let mut prompt = build_required_tool_retry_context(offered_tool_preview, previous_reason);
3243 if let Some(path) = infer_required_output_target_path_from_text(latest_user_text) {
3244 prompt.push(' ');
3245 prompt.push_str(&format!(
3246 "The required output target for this task is `{path}`. Write or update that file now."
3247 ));
3248 }
3249 prompt
3250}
3251
3252fn looks_like_unparsed_tool_payload(output: &str) -> bool {
3253 let trimmed = output.trim();
3254 if trimmed.is_empty() {
3255 return false;
3256 }
3257 let lower = trimmed.to_ascii_lowercase();
3258 lower.contains("\"tool_calls\"")
3259 || lower.contains("\"function_call\"")
3260 || lower.contains("\"function\":{")
3261 || lower.contains("\"type\":\"tool_call\"")
3262 || lower.contains("\"type\":\"function_call\"")
3263 || lower.contains("\"type\":\"tool_use\"")
3264}
3265
3266fn is_policy_rejection_output(output: &str) -> bool {
3267 let lower = output.trim().to_ascii_lowercase();
3268 lower.contains("call skipped")
3269 || lower.contains("authorization required")
3270 || lower.contains("not allowed")
3271 || lower.contains("permission denied")
3272}
3273
3274fn classify_required_tool_failure(
3275 outputs: &[String],
3276 saw_tool_call_candidate: bool,
3277 accepted_tool_calls: usize,
3278 parse_failed: bool,
3279 rejected_by_policy: bool,
3280) -> RequiredToolFailureKind {
3281 if parse_failed {
3282 return RequiredToolFailureKind::ToolCallParseFailed;
3283 }
3284 if !saw_tool_call_candidate {
3285 return RequiredToolFailureKind::NoToolCallEmitted;
3286 }
3287 if accepted_tool_calls == 0 || rejected_by_policy {
3288 return RequiredToolFailureKind::ToolCallRejectedByPolicy;
3289 }
3290 if outputs
3291 .iter()
3292 .any(|output| output.contains("WRITE_ARGS_EMPTY_FROM_PROVIDER"))
3293 {
3294 return RequiredToolFailureKind::WriteArgsEmptyFromProvider;
3295 }
3296 if outputs
3297 .iter()
3298 .any(|output| output.contains("WRITE_ARGS_UNPARSEABLE_FROM_PROVIDER"))
3299 {
3300 return RequiredToolFailureKind::WriteArgsUnparseableFromProvider;
3301 }
3302 if outputs
3303 .iter()
3304 .any(|output| is_terminal_tool_error_reason(output))
3305 {
3306 return RequiredToolFailureKind::ToolCallInvalidArgs;
3307 }
3308 if outputs
3309 .iter()
3310 .any(|output| is_policy_rejection_output(output))
3311 {
3312 return RequiredToolFailureKind::ToolCallRejectedByPolicy;
3313 }
3314 RequiredToolFailureKind::ToolCallExecutedNonProductive
3315}
3316
3317fn find_first_url(text: &str) -> Option<String> {
3318 text.split_whitespace().find_map(|token| {
3319 if token.starts_with("https://") || token.starts_with("http://") {
3320 let cleaned = token.trim_end_matches(&[')', ']', '}', '"', '\'', ',', '.'][..]);
3321 if cleaned.len() > "https://".len() {
3322 return Some(cleaned.to_string());
3323 }
3324 }
3325 None
3326 })
3327}
3328
3329fn max_tool_iterations() -> usize {
3330 let default_iterations = 25usize;
3331 std::env::var("TANDEM_MAX_TOOL_ITERATIONS")
3332 .ok()
3333 .and_then(|raw| raw.trim().parse::<usize>().ok())
3334 .filter(|value| *value > 0)
3335 .unwrap_or(default_iterations)
3336}
3337
3338fn strict_write_retry_max_attempts() -> usize {
3339 std::env::var("TANDEM_STRICT_WRITE_RETRY_MAX_ATTEMPTS")
3340 .ok()
3341 .and_then(|raw| raw.trim().parse::<usize>().ok())
3342 .filter(|value| *value > 0)
3343 .unwrap_or(3)
3344}
3345
3346fn provider_stream_connect_timeout_ms() -> usize {
3347 std::env::var("TANDEM_PROVIDER_STREAM_CONNECT_TIMEOUT_MS")
3348 .ok()
3349 .and_then(|raw| raw.trim().parse::<usize>().ok())
3350 .filter(|value| *value > 0)
3351 .unwrap_or(90_000)
3352}
3353
3354fn provider_stream_idle_timeout_ms() -> usize {
3355 std::env::var("TANDEM_PROVIDER_STREAM_IDLE_TIMEOUT_MS")
3356 .ok()
3357 .and_then(|raw| raw.trim().parse::<usize>().ok())
3358 .filter(|value| *value > 0)
3359 .unwrap_or(90_000)
3360}
3361
3362fn prompt_context_hook_timeout_ms() -> usize {
3363 std::env::var("TANDEM_PROMPT_CONTEXT_HOOK_TIMEOUT_MS")
3364 .ok()
3365 .and_then(|raw| raw.trim().parse::<usize>().ok())
3366 .filter(|value| *value > 0)
3367 .unwrap_or(5_000)
3368}
3369
3370fn permission_wait_timeout_ms() -> usize {
3371 std::env::var("TANDEM_PERMISSION_WAIT_TIMEOUT_MS")
3372 .ok()
3373 .and_then(|raw| raw.trim().parse::<usize>().ok())
3374 .filter(|value| *value > 0)
3375 .unwrap_or(15_000)
3376}
3377
3378fn tool_exec_timeout_ms() -> usize {
3379 std::env::var("TANDEM_TOOL_EXEC_TIMEOUT_MS")
3380 .ok()
3381 .and_then(|raw| raw.trim().parse::<usize>().ok())
3382 .filter(|value| *value > 0)
3383 .unwrap_or(45_000)
3384}
3385
3386fn is_guard_budget_tool_output(output: &str) -> bool {
3387 output
3388 .to_ascii_lowercase()
3389 .contains("per-run guard budget exceeded")
3390}
3391
3392fn is_duplicate_signature_limit_output(output: &str) -> bool {
3393 output
3394 .to_ascii_lowercase()
3395 .contains("duplicate call signature retry limit reached")
3396}
3397
3398fn is_sensitive_path_candidate(path: &Path) -> bool {
3399 let lowered = path.to_string_lossy().to_ascii_lowercase();
3400 if lowered.contains("/.ssh/")
3401 || lowered.ends_with("/.ssh")
3402 || lowered.contains("/.gnupg/")
3403 || lowered.ends_with("/.gnupg")
3404 {
3405 return true;
3406 }
3407 if lowered.contains("/.aws/credentials")
3408 || lowered.ends_with("/.npmrc")
3409 || lowered.ends_with("/.netrc")
3410 || lowered.ends_with("/.pypirc")
3411 {
3412 return true;
3413 }
3414 if lowered.contains("id_rsa")
3415 || lowered.contains("id_ed25519")
3416 || lowered.contains("id_ecdsa")
3417 || lowered.contains(".pem")
3418 || lowered.contains(".p12")
3419 || lowered.contains(".pfx")
3420 || lowered.contains(".key")
3421 {
3422 return true;
3423 }
3424 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
3425 let n = name.to_ascii_lowercase();
3426 if n == ".env" || n.starts_with(".env.") {
3427 return true;
3428 }
3429 }
3430 false
3431}
3432
3433fn shell_command_targets_sensitive_path(command: &str) -> bool {
3434 let lower = command.to_ascii_lowercase();
3435 let patterns = [
3436 ".env",
3437 ".ssh",
3438 ".gnupg",
3439 ".aws/credentials",
3440 "id_rsa",
3441 "id_ed25519",
3442 ".pem",
3443 ".p12",
3444 ".pfx",
3445 ".key",
3446 ];
3447 patterns.iter().any(|p| lower.contains(p))
3448}
3449
3450#[derive(Debug, Clone)]
3451struct NormalizedToolArgs {
3452 args: Value,
3453 args_source: String,
3454 args_integrity: String,
3455 raw_args_state: RawToolArgsState,
3456 query: Option<String>,
3457 missing_terminal: bool,
3458 missing_terminal_reason: Option<String>,
3459}
3460
3461#[cfg(test)]
3462fn normalize_tool_args(
3463 tool_name: &str,
3464 raw_args: Value,
3465 latest_user_text: &str,
3466 latest_assistant_context: &str,
3467) -> NormalizedToolArgs {
3468 normalize_tool_args_with_mode(
3469 tool_name,
3470 raw_args,
3471 latest_user_text,
3472 latest_assistant_context,
3473 WritePathRecoveryMode::Heuristic,
3474 )
3475}
3476
3477fn normalize_tool_args_with_mode(
3478 tool_name: &str,
3479 raw_args: Value,
3480 latest_user_text: &str,
3481 latest_assistant_context: &str,
3482 write_path_recovery_mode: WritePathRecoveryMode,
3483) -> NormalizedToolArgs {
3484 let normalized_tool = normalize_tool_name(tool_name);
3485 let original_args = raw_args.clone();
3486 let mut args = raw_args;
3487 let mut args_source = if args.is_string() {
3488 "provider_string".to_string()
3489 } else {
3490 "provider_json".to_string()
3491 };
3492 let mut args_integrity = "ok".to_string();
3493 let raw_args_state = classify_raw_tool_args_state(&args);
3494 let mut query = None;
3495 let mut missing_terminal = false;
3496 let mut missing_terminal_reason = None;
3497
3498 if normalized_tool == "websearch" {
3499 if let Some(found) = extract_websearch_query(&args) {
3500 query = Some(found);
3501 args = set_websearch_query_and_source(args, query.clone(), "tool_args");
3502 } else if let Some(inferred) = infer_websearch_query_from_text(latest_user_text) {
3503 args_source = "inferred_from_user".to_string();
3504 args_integrity = "recovered".to_string();
3505 query = Some(inferred);
3506 args = set_websearch_query_and_source(args, query.clone(), "inferred_from_user");
3507 } else if let Some(recovered) = infer_websearch_query_from_text(latest_assistant_context) {
3508 args_source = "recovered_from_context".to_string();
3509 args_integrity = "recovered".to_string();
3510 query = Some(recovered);
3511 args = set_websearch_query_and_source(args, query.clone(), "recovered_from_context");
3512 } else {
3513 args_source = "missing".to_string();
3514 args_integrity = "empty".to_string();
3515 missing_terminal = true;
3516 missing_terminal_reason = Some("WEBSEARCH_QUERY_MISSING".to_string());
3517 }
3518 } else if is_shell_tool_name(&normalized_tool) {
3519 if let Some(command) = extract_shell_command(&args) {
3520 args = set_shell_command(args, command);
3521 } else if let Some(inferred) = infer_shell_command_from_text(latest_assistant_context) {
3522 args_source = "inferred_from_context".to_string();
3523 args_integrity = "recovered".to_string();
3524 args = set_shell_command(args, inferred);
3525 } else if let Some(inferred) = infer_shell_command_from_text(latest_user_text) {
3526 args_source = "inferred_from_user".to_string();
3527 args_integrity = "recovered".to_string();
3528 args = set_shell_command(args, inferred);
3529 } else {
3530 args_source = "missing".to_string();
3531 args_integrity = "empty".to_string();
3532 missing_terminal = true;
3533 missing_terminal_reason = Some("BASH_COMMAND_MISSING".to_string());
3534 }
3535 } else if matches!(normalized_tool.as_str(), "read" | "write" | "edit") {
3536 if let Some(path) = extract_file_path_arg(&args) {
3537 args = set_file_path_arg(args, path);
3538 } else if normalized_tool == "write" || normalized_tool == "edit" {
3539 if let Some(inferred) = infer_required_output_target_path_from_text(latest_user_text)
3540 .or_else(|| infer_required_output_target_path_from_text(latest_assistant_context))
3541 {
3542 args_source = "recovered_from_context".to_string();
3543 args_integrity = "recovered".to_string();
3544 args = set_file_path_arg(args, inferred);
3545 } else if write_path_recovery_mode == WritePathRecoveryMode::Heuristic {
3546 if let Some(inferred) = infer_write_file_path_from_text(latest_user_text) {
3547 args_source = "inferred_from_user".to_string();
3548 args_integrity = "recovered".to_string();
3549 args = set_file_path_arg(args, inferred);
3550 } else {
3551 args_source = "missing".to_string();
3552 args_integrity = "empty".to_string();
3553 missing_terminal = true;
3554 missing_terminal_reason = Some("FILE_PATH_MISSING".to_string());
3555 }
3556 } else {
3557 args_source = "missing".to_string();
3558 args_integrity = "empty".to_string();
3559 missing_terminal = true;
3560 missing_terminal_reason = Some("FILE_PATH_MISSING".to_string());
3561 }
3562 } else if let Some(inferred) = infer_file_path_from_text(latest_user_text) {
3563 args_source = "inferred_from_user".to_string();
3564 args_integrity = "recovered".to_string();
3565 args = set_file_path_arg(args, inferred);
3566 } else {
3567 args_source = "missing".to_string();
3568 args_integrity = "empty".to_string();
3569 missing_terminal = true;
3570 missing_terminal_reason = Some("FILE_PATH_MISSING".to_string());
3571 }
3572
3573 if !missing_terminal && normalized_tool == "write" {
3574 if let Some(content) = extract_write_content_arg(&args) {
3575 args = set_write_content_arg(args, content);
3576 } else if let Some(recovered) =
3577 infer_write_content_from_assistant_context(latest_assistant_context)
3578 {
3579 args_source = "recovered_from_context".to_string();
3580 args_integrity = "recovered".to_string();
3581 args = set_write_content_arg(args, recovered);
3582 } else {
3583 args_source = "missing".to_string();
3584 args_integrity = "empty".to_string();
3585 missing_terminal = true;
3586 missing_terminal_reason = Some("WRITE_CONTENT_MISSING".to_string());
3587 }
3588 }
3589 } else if matches!(normalized_tool.as_str(), "webfetch" | "webfetch_html") {
3590 if let Some(url) = extract_webfetch_url_arg(&args) {
3591 args = set_webfetch_url_arg(args, url);
3592 } else if let Some(inferred) = infer_url_from_text(latest_assistant_context) {
3593 args_source = "inferred_from_context".to_string();
3594 args_integrity = "recovered".to_string();
3595 args = set_webfetch_url_arg(args, inferred);
3596 } else if let Some(inferred) = infer_url_from_text(latest_user_text) {
3597 args_source = "inferred_from_user".to_string();
3598 args_integrity = "recovered".to_string();
3599 args = set_webfetch_url_arg(args, inferred);
3600 } else {
3601 args_source = "missing".to_string();
3602 args_integrity = "empty".to_string();
3603 missing_terminal = true;
3604 missing_terminal_reason = Some("WEBFETCH_URL_MISSING".to_string());
3605 }
3606 } else if normalized_tool == "pack_builder" {
3607 let mode = extract_pack_builder_mode_arg(&args);
3608 let plan_id = extract_pack_builder_plan_id_arg(&args);
3609 if mode.as_deref() == Some("apply") && plan_id.is_none() {
3610 if let Some(inferred_plan) =
3611 infer_pack_builder_apply_plan_id(latest_user_text, latest_assistant_context)
3612 {
3613 args_source = "recovered_from_context".to_string();
3614 args_integrity = "recovered".to_string();
3615 args = set_pack_builder_apply_args(args, inferred_plan);
3616 } else {
3617 args_source = "missing".to_string();
3618 args_integrity = "empty".to_string();
3619 missing_terminal = true;
3620 missing_terminal_reason = Some("PACK_BUILDER_PLAN_ID_MISSING".to_string());
3621 }
3622 } else if mode.as_deref() == Some("apply") {
3623 args = ensure_pack_builder_default_mode(args);
3624 } else if let Some(inferred_plan) =
3625 infer_pack_builder_apply_plan_id(latest_user_text, latest_assistant_context)
3626 {
3627 args_source = "recovered_from_context".to_string();
3628 args_integrity = "recovered".to_string();
3629 args = set_pack_builder_apply_args(args, inferred_plan);
3630 } else if let Some(goal) = extract_pack_builder_goal_arg(&args) {
3631 args = set_pack_builder_goal_arg(args, goal);
3632 } else if let Some(inferred) = infer_pack_builder_goal_from_text(latest_user_text) {
3633 args_source = "inferred_from_user".to_string();
3634 args_integrity = "recovered".to_string();
3635 args = set_pack_builder_goal_arg(args, inferred);
3636 } else if let Some(recovered) = infer_pack_builder_goal_from_text(latest_assistant_context)
3637 {
3638 args_source = "recovered_from_context".to_string();
3639 args_integrity = "recovered".to_string();
3640 args = set_pack_builder_goal_arg(args, recovered);
3641 } else {
3642 args_source = "missing".to_string();
3643 args_integrity = "empty".to_string();
3644 missing_terminal = true;
3645 missing_terminal_reason = Some("PACK_BUILDER_GOAL_MISSING".to_string());
3646 }
3647 args = ensure_pack_builder_default_mode(args);
3648 } else if is_email_delivery_tool_name(&normalized_tool) {
3649 let sanitized = sanitize_email_attachment_args(args);
3650 if sanitized != original_args {
3651 args_source = "sanitized_attachment".to_string();
3652 args_integrity = "recovered".to_string();
3653 }
3654 args = sanitized;
3655 }
3656
3657 NormalizedToolArgs {
3658 args,
3659 args_source,
3660 args_integrity,
3661 raw_args_state,
3662 query,
3663 missing_terminal,
3664 missing_terminal_reason,
3665 }
3666}
3667
3668fn classify_raw_tool_args_state(raw_args: &Value) -> RawToolArgsState {
3669 match raw_args {
3670 Value::Null => RawToolArgsState::Empty,
3671 Value::Object(obj) => {
3672 if obj.is_empty() {
3673 RawToolArgsState::Empty
3674 } else {
3675 RawToolArgsState::Present
3676 }
3677 }
3678 Value::Array(items) => {
3679 if items.is_empty() {
3680 RawToolArgsState::Empty
3681 } else {
3682 RawToolArgsState::Present
3683 }
3684 }
3685 Value::String(raw) => {
3686 let trimmed = raw.trim();
3687 if trimmed.is_empty() {
3688 return RawToolArgsState::Empty;
3689 }
3690 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
3691 return classify_raw_tool_args_state(&parsed);
3692 }
3693 if parse_function_style_args(trimmed).is_empty() {
3694 return RawToolArgsState::Unparseable;
3695 }
3696 RawToolArgsState::Present
3697 }
3698 _ => RawToolArgsState::Present,
3699 }
3700}
3701
3702fn provider_specific_write_reason(
3703 tool: &str,
3704 missing_reason: &str,
3705 raw_args_state: RawToolArgsState,
3706) -> Option<String> {
3707 if tool != "write"
3708 || !matches!(
3709 missing_reason,
3710 "FILE_PATH_MISSING" | "WRITE_CONTENT_MISSING"
3711 )
3712 {
3713 return None;
3714 }
3715 match raw_args_state {
3716 RawToolArgsState::Empty => Some("WRITE_ARGS_EMPTY_FROM_PROVIDER".to_string()),
3717 RawToolArgsState::Unparseable => Some("WRITE_ARGS_UNPARSEABLE_FROM_PROVIDER".to_string()),
3718 RawToolArgsState::Present => None,
3719 }
3720}
3721
3722fn is_shell_tool_name(tool_name: &str) -> bool {
3723 matches!(
3724 tool_name.trim().to_ascii_lowercase().as_str(),
3725 "bash" | "shell" | "powershell" | "cmd"
3726 )
3727}
3728
3729fn is_email_delivery_tool_name(tool_name: &str) -> bool {
3730 matches!(
3731 normalize_tool_name(tool_name).as_str(),
3732 "mcp.composio_1.gmail_send_email"
3733 | "mcp.composio_1.gmail_send_draft"
3734 | "mcp.composio.gmail_send_email"
3735 | "mcp.composio.gmail_send_draft"
3736 ) || tool_name.ends_with(".gmail_send_email")
3737 || tool_name.ends_with(".gmail_send_draft")
3738}
3739
3740fn sanitize_email_attachment_args(args: Value) -> Value {
3741 let mut obj = match args {
3742 Value::Object(map) => map,
3743 other => return other,
3744 };
3745 if let Some(Value::Object(attachment)) = obj.get("attachment") {
3746 let s3key = attachment
3747 .get("s3key")
3748 .and_then(Value::as_str)
3749 .map(str::trim)
3750 .unwrap_or("");
3751 if s3key.is_empty() {
3752 obj.remove("attachment");
3753 }
3754 } else if obj.get("attachment").is_some() && obj.get("attachment").is_some_and(Value::is_null) {
3755 obj.remove("attachment");
3756 }
3757 if let Some(Value::Array(attachments)) = obj.get_mut("attachments") {
3758 attachments.retain(|entry| {
3759 entry
3760 .get("s3key")
3761 .and_then(Value::as_str)
3762 .map(str::trim)
3763 .map(|value| !value.is_empty())
3764 .unwrap_or(false)
3765 });
3766 if attachments.is_empty() {
3767 obj.remove("attachments");
3768 }
3769 }
3770 Value::Object(obj)
3771}
3772
3773fn set_file_path_arg(args: Value, path: String) -> Value {
3774 let mut obj = args.as_object().cloned().unwrap_or_default();
3775 obj.insert("path".to_string(), Value::String(path));
3776 Value::Object(obj)
3777}
3778
3779fn set_write_content_arg(args: Value, content: String) -> Value {
3780 let mut obj = args.as_object().cloned().unwrap_or_default();
3781 obj.insert("content".to_string(), Value::String(content));
3782 Value::Object(obj)
3783}
3784
3785fn extract_file_path_arg(args: &Value) -> Option<String> {
3786 extract_file_path_arg_internal(args, 0)
3787}
3788
3789fn extract_write_content_arg(args: &Value) -> Option<String> {
3790 extract_write_content_arg_internal(args, 0)
3791}
3792
3793fn extract_file_path_arg_internal(args: &Value, depth: usize) -> Option<String> {
3794 if depth > 5 {
3795 return None;
3796 }
3797
3798 match args {
3799 Value::String(raw) => {
3800 let trimmed = raw.trim();
3801 if trimmed.is_empty() {
3802 return None;
3803 }
3804 if !(trimmed.starts_with('{') || trimmed.starts_with('[') || trimmed.starts_with('"')) {
3806 return sanitize_path_candidate(trimmed);
3807 }
3808 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
3809 return extract_file_path_arg_internal(&parsed, depth + 1);
3810 }
3811 sanitize_path_candidate(trimmed)
3812 }
3813 Value::Array(items) => items
3814 .iter()
3815 .find_map(|item| extract_file_path_arg_internal(item, depth + 1)),
3816 Value::Object(obj) => {
3817 for key in FILE_PATH_KEYS {
3818 if let Some(raw) = obj.get(key).and_then(|v| v.as_str()) {
3819 if let Some(path) = sanitize_path_candidate(raw) {
3820 return Some(path);
3821 }
3822 }
3823 }
3824 for container in NESTED_ARGS_KEYS {
3825 if let Some(nested) = obj.get(container) {
3826 if let Some(path) = extract_file_path_arg_internal(nested, depth + 1) {
3827 return Some(path);
3828 }
3829 }
3830 }
3831 None
3832 }
3833 _ => None,
3834 }
3835}
3836
3837fn extract_write_content_arg_internal(args: &Value, depth: usize) -> Option<String> {
3838 if depth > 5 {
3839 return None;
3840 }
3841
3842 match args {
3843 Value::String(raw) => {
3844 let trimmed = raw.trim();
3845 if trimmed.is_empty() {
3846 return None;
3847 }
3848 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
3849 return extract_write_content_arg_internal(&parsed, depth + 1);
3850 }
3851 if sanitize_path_candidate(trimmed).is_some()
3854 && !trimmed.contains('\n')
3855 && trimmed.split_whitespace().count() <= 3
3856 {
3857 return None;
3858 }
3859 Some(trimmed.to_string())
3860 }
3861 Value::Array(items) => items
3862 .iter()
3863 .find_map(|item| extract_write_content_arg_internal(item, depth + 1)),
3864 Value::Object(obj) => {
3865 for key in WRITE_CONTENT_KEYS {
3866 if let Some(value) = obj.get(key) {
3867 if let Some(raw) = value.as_str() {
3868 if !raw.is_empty() {
3869 return Some(raw.to_string());
3870 }
3871 } else if let Some(recovered) =
3872 extract_write_content_arg_internal(value, depth + 1)
3873 {
3874 return Some(recovered);
3875 }
3876 }
3877 }
3878 for container in NESTED_ARGS_KEYS {
3879 if let Some(nested) = obj.get(container) {
3880 if let Some(content) = extract_write_content_arg_internal(nested, depth + 1) {
3881 return Some(content);
3882 }
3883 }
3884 }
3885 None
3886 }
3887 _ => None,
3888 }
3889}
3890
3891fn infer_write_content_from_assistant_context(latest_assistant_context: &str) -> Option<String> {
3892 let text = latest_assistant_context.trim();
3893 if text.len() < 32 {
3894 return None;
3895 }
3896 Some(text.to_string())
3897}
3898
3899fn set_shell_command(args: Value, command: String) -> Value {
3900 let mut obj = args.as_object().cloned().unwrap_or_default();
3901 obj.insert("command".to_string(), Value::String(command));
3902 Value::Object(obj)
3903}
3904
3905fn extract_shell_command(args: &Value) -> Option<String> {
3906 extract_shell_command_internal(args, 0)
3907}
3908
3909fn extract_shell_command_internal(args: &Value, depth: usize) -> Option<String> {
3910 if depth > 5 {
3911 return None;
3912 }
3913
3914 match args {
3915 Value::String(raw) => {
3916 let trimmed = raw.trim();
3917 if trimmed.is_empty() {
3918 return None;
3919 }
3920 if !(trimmed.starts_with('{') || trimmed.starts_with('[') || trimmed.starts_with('"')) {
3921 return sanitize_shell_command_candidate(trimmed);
3922 }
3923 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
3924 return extract_shell_command_internal(&parsed, depth + 1);
3925 }
3926 sanitize_shell_command_candidate(trimmed)
3927 }
3928 Value::Array(items) => items
3929 .iter()
3930 .find_map(|item| extract_shell_command_internal(item, depth + 1)),
3931 Value::Object(obj) => {
3932 for key in SHELL_COMMAND_KEYS {
3933 if let Some(raw) = obj.get(key).and_then(|v| v.as_str()) {
3934 if let Some(command) = sanitize_shell_command_candidate(raw) {
3935 return Some(command);
3936 }
3937 }
3938 }
3939 for container in NESTED_ARGS_KEYS {
3940 if let Some(nested) = obj.get(container) {
3941 if let Some(command) = extract_shell_command_internal(nested, depth + 1) {
3942 return Some(command);
3943 }
3944 }
3945 }
3946 None
3947 }
3948 _ => None,
3949 }
3950}
3951
3952fn infer_shell_command_from_text(text: &str) -> Option<String> {
3953 let trimmed = text.trim();
3954 if trimmed.is_empty() {
3955 return None;
3956 }
3957
3958 let mut in_tick = false;
3960 let mut tick_buf = String::new();
3961 for ch in trimmed.chars() {
3962 if ch == '`' {
3963 if in_tick {
3964 if let Some(candidate) = sanitize_shell_command_candidate(&tick_buf) {
3965 if looks_like_shell_command(&candidate) {
3966 return Some(candidate);
3967 }
3968 }
3969 tick_buf.clear();
3970 }
3971 in_tick = !in_tick;
3972 continue;
3973 }
3974 if in_tick {
3975 tick_buf.push(ch);
3976 }
3977 }
3978
3979 for line in trimmed.lines() {
3980 let line = line.trim();
3981 if line.is_empty() {
3982 continue;
3983 }
3984 let lower = line.to_ascii_lowercase();
3985 for prefix in [
3986 "run ",
3987 "execute ",
3988 "call ",
3989 "use bash ",
3990 "use shell ",
3991 "bash ",
3992 "shell ",
3993 "powershell ",
3994 "pwsh ",
3995 ] {
3996 if lower.starts_with(prefix) {
3997 let candidate = line[prefix.len()..].trim();
3998 if let Some(command) = sanitize_shell_command_candidate(candidate) {
3999 if looks_like_shell_command(&command) {
4000 return Some(command);
4001 }
4002 }
4003 }
4004 }
4005 }
4006
4007 None
4008}
4009
4010fn set_websearch_query_and_source(args: Value, query: Option<String>, query_source: &str) -> Value {
4011 let mut obj = args.as_object().cloned().unwrap_or_default();
4012 if let Some(q) = query {
4013 obj.insert("query".to_string(), Value::String(q));
4014 }
4015 obj.insert(
4016 "__query_source".to_string(),
4017 Value::String(query_source.to_string()),
4018 );
4019 Value::Object(obj)
4020}
4021
4022fn set_webfetch_url_arg(args: Value, url: String) -> Value {
4023 let mut obj = args.as_object().cloned().unwrap_or_default();
4024 obj.insert("url".to_string(), Value::String(url));
4025 Value::Object(obj)
4026}
4027
4028fn set_pack_builder_goal_arg(args: Value, goal: String) -> Value {
4029 let mut obj = args.as_object().cloned().unwrap_or_default();
4030 obj.insert("goal".to_string(), Value::String(goal));
4031 Value::Object(obj)
4032}
4033
4034fn set_pack_builder_apply_args(args: Value, plan_id: String) -> Value {
4035 let mut obj = args.as_object().cloned().unwrap_or_default();
4036 obj.insert("mode".to_string(), Value::String("apply".to_string()));
4037 obj.insert("plan_id".to_string(), Value::String(plan_id));
4038 obj.insert(
4039 "approve_connector_registration".to_string(),
4040 Value::Bool(true),
4041 );
4042 obj.insert("approve_pack_install".to_string(), Value::Bool(true));
4043 obj.insert("approve_enable_routines".to_string(), Value::Bool(false));
4044 Value::Object(obj)
4045}
4046
4047fn extract_pack_builder_mode_arg(args: &Value) -> Option<String> {
4048 for key in ["mode"] {
4049 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
4050 let mode = value.trim().to_ascii_lowercase();
4051 if !mode.is_empty() {
4052 return Some(mode);
4053 }
4054 }
4055 }
4056 for container in ["arguments", "args", "input", "params"] {
4057 if let Some(obj) = args.get(container) {
4058 if let Some(value) = obj.get("mode").and_then(|v| v.as_str()) {
4059 let mode = value.trim().to_ascii_lowercase();
4060 if !mode.is_empty() {
4061 return Some(mode);
4062 }
4063 }
4064 }
4065 }
4066 None
4067}
4068
4069fn extract_pack_builder_plan_id_arg(args: &Value) -> Option<String> {
4070 for key in ["plan_id", "planId"] {
4071 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
4072 let plan_id = value.trim();
4073 if !plan_id.is_empty() {
4074 return Some(plan_id.to_string());
4075 }
4076 }
4077 }
4078 for container in ["arguments", "args", "input", "params"] {
4079 if let Some(obj) = args.get(container) {
4080 for key in ["plan_id", "planId"] {
4081 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
4082 let plan_id = value.trim();
4083 if !plan_id.is_empty() {
4084 return Some(plan_id.to_string());
4085 }
4086 }
4087 }
4088 }
4089 }
4090 None
4091}
4092
4093fn extract_pack_builder_plan_id_from_text(text: &str) -> Option<String> {
4094 if text.trim().is_empty() {
4095 return None;
4096 }
4097 let bytes = text.as_bytes();
4098 let mut idx = 0usize;
4099 while idx + 5 <= bytes.len() {
4100 if &bytes[idx..idx + 5] != b"plan-" {
4101 idx += 1;
4102 continue;
4103 }
4104 let mut end = idx + 5;
4105 while end < bytes.len() {
4106 let ch = bytes[end] as char;
4107 if ch.is_ascii_alphanumeric() || ch == '-' {
4108 end += 1;
4109 } else {
4110 break;
4111 }
4112 }
4113 if end > idx + 5 {
4114 let candidate = &text[idx..end];
4115 if candidate.len() >= 10 {
4116 return Some(candidate.to_string());
4117 }
4118 }
4119 idx = end.saturating_add(1);
4120 }
4121 None
4122}
4123
4124fn is_pack_builder_confirmation_text(text: &str) -> bool {
4125 let trimmed = text.trim();
4126 if trimmed.is_empty() {
4127 return false;
4128 }
4129 let lower = trimmed.to_ascii_lowercase();
4130 matches!(
4131 lower.as_str(),
4132 "confirm"
4133 | "confirmed"
4134 | "approve"
4135 | "approved"
4136 | "yes"
4137 | "y"
4138 | "ok"
4139 | "okay"
4140 | "go"
4141 | "go ahead"
4142 | "ship it"
4143 | "do it"
4144 | "apply"
4145 | "run it"
4146 | "✅"
4147 | "👍"
4148 )
4149}
4150
4151fn infer_pack_builder_apply_plan_id(
4152 latest_user_text: &str,
4153 latest_assistant_context: &str,
4154) -> Option<String> {
4155 if let Some(plan_id) = extract_pack_builder_plan_id_from_text(latest_user_text) {
4156 return Some(plan_id);
4157 }
4158 if !is_pack_builder_confirmation_text(latest_user_text) {
4159 return None;
4160 }
4161 extract_pack_builder_plan_id_from_text(latest_assistant_context)
4162}
4163
4164fn ensure_pack_builder_default_mode(args: Value) -> Value {
4165 let mut obj = args.as_object().cloned().unwrap_or_default();
4166 let has_mode = obj
4167 .get("mode")
4168 .and_then(Value::as_str)
4169 .map(str::trim)
4170 .is_some_and(|v| !v.is_empty());
4171 if !has_mode {
4172 obj.insert("mode".to_string(), Value::String("preview".to_string()));
4173 }
4174 Value::Object(obj)
4175}
4176
4177fn extract_webfetch_url_arg(args: &Value) -> Option<String> {
4178 const URL_KEYS: [&str; 5] = ["url", "uri", "link", "href", "target_url"];
4179 for key in URL_KEYS {
4180 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
4181 if let Some(url) = sanitize_url_candidate(value) {
4182 return Some(url);
4183 }
4184 }
4185 }
4186 for container in ["arguments", "args", "input", "params"] {
4187 if let Some(obj) = args.get(container) {
4188 for key in URL_KEYS {
4189 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
4190 if let Some(url) = sanitize_url_candidate(value) {
4191 return Some(url);
4192 }
4193 }
4194 }
4195 }
4196 }
4197 args.as_str().and_then(sanitize_url_candidate)
4198}
4199
4200fn extract_pack_builder_goal_arg(args: &Value) -> Option<String> {
4201 const GOAL_KEYS: [&str; 1] = ["goal"];
4202 for key in GOAL_KEYS {
4203 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
4204 let trimmed = value.trim();
4205 if !trimmed.is_empty() {
4206 return Some(trimmed.to_string());
4207 }
4208 }
4209 }
4210 for container in ["arguments", "args", "input", "params"] {
4211 if let Some(obj) = args.get(container) {
4212 for key in GOAL_KEYS {
4213 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
4214 let trimmed = value.trim();
4215 if !trimmed.is_empty() {
4216 return Some(trimmed.to_string());
4217 }
4218 }
4219 }
4220 }
4221 }
4222 args.as_str()
4223 .map(str::trim)
4224 .filter(|v| !v.is_empty())
4225 .map(ToString::to_string)
4226}
4227
4228fn extract_websearch_query(args: &Value) -> Option<String> {
4229 const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
4230 for key in QUERY_KEYS {
4231 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
4232 if let Some(query) = sanitize_websearch_query_candidate(value) {
4233 return Some(query);
4234 }
4235 }
4236 }
4237 for container in ["arguments", "args", "input", "params"] {
4238 if let Some(obj) = args.get(container) {
4239 for key in QUERY_KEYS {
4240 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
4241 if let Some(query) = sanitize_websearch_query_candidate(value) {
4242 return Some(query);
4243 }
4244 }
4245 }
4246 }
4247 }
4248 args.as_str().and_then(sanitize_websearch_query_candidate)
4249}
4250
4251fn sanitize_websearch_query_candidate(raw: &str) -> Option<String> {
4252 let trimmed = raw.trim();
4253 if trimmed.is_empty() {
4254 return None;
4255 }
4256
4257 let lower = trimmed.to_ascii_lowercase();
4258 if let Some(start) = lower.find("<arg_value>") {
4259 let value_start = start + "<arg_value>".len();
4260 let tail = &trimmed[value_start..];
4261 let value = if let Some(end) = tail.to_ascii_lowercase().find("</arg_value>") {
4262 &tail[..end]
4263 } else {
4264 tail
4265 };
4266 let cleaned = value.trim();
4267 if !cleaned.is_empty() {
4268 return Some(cleaned.to_string());
4269 }
4270 }
4271
4272 let without_wrappers = trimmed
4273 .replace("<arg_key>", " ")
4274 .replace("</arg_key>", " ")
4275 .replace("<arg_value>", " ")
4276 .replace("</arg_value>", " ");
4277 let collapsed = without_wrappers
4278 .split_whitespace()
4279 .collect::<Vec<_>>()
4280 .join(" ");
4281 if collapsed.is_empty() {
4282 return None;
4283 }
4284
4285 let collapsed_lower = collapsed.to_ascii_lowercase();
4286 if let Some(rest) = collapsed_lower.strip_prefix("websearch query ") {
4287 let offset = collapsed.len() - rest.len();
4288 let q = collapsed[offset..].trim();
4289 if !q.is_empty() {
4290 return Some(q.to_string());
4291 }
4292 }
4293 if let Some(rest) = collapsed_lower.strip_prefix("query ") {
4294 let offset = collapsed.len() - rest.len();
4295 let q = collapsed[offset..].trim();
4296 if !q.is_empty() {
4297 return Some(q.to_string());
4298 }
4299 }
4300
4301 Some(collapsed)
4302}
4303
4304fn infer_websearch_query_from_text(text: &str) -> Option<String> {
4305 let trimmed = text.trim();
4306 if trimmed.is_empty() {
4307 return None;
4308 }
4309
4310 let lower = trimmed.to_lowercase();
4311 const PREFIXES: [&str; 11] = [
4312 "web search",
4313 "websearch",
4314 "search web for",
4315 "search web",
4316 "search for",
4317 "search",
4318 "look up",
4319 "lookup",
4320 "find",
4321 "web lookup",
4322 "query",
4323 ];
4324
4325 let mut candidate = trimmed;
4326 for prefix in PREFIXES {
4327 if lower.starts_with(prefix) && lower.len() >= prefix.len() {
4328 let remainder = trimmed[prefix.len()..]
4329 .trim_start_matches(|c: char| c.is_whitespace() || c == ':' || c == '-');
4330 candidate = remainder;
4331 break;
4332 }
4333 }
4334
4335 let normalized = candidate
4336 .trim()
4337 .trim_matches(|c: char| c == '"' || c == '\'' || c.is_whitespace())
4338 .trim_matches(|c: char| matches!(c, '.' | ',' | '!' | '?'))
4339 .trim()
4340 .to_string();
4341
4342 if normalized.split_whitespace().count() < 2 {
4343 return None;
4344 }
4345 Some(normalized)
4346}
4347
4348fn infer_file_path_from_text(text: &str) -> Option<String> {
4349 let trimmed = text.trim();
4350 if trimmed.is_empty() {
4351 return None;
4352 }
4353
4354 let mut candidates: Vec<String> = Vec::new();
4355
4356 let mut in_tick = false;
4358 let mut tick_buf = String::new();
4359 for ch in trimmed.chars() {
4360 if ch == '`' {
4361 if in_tick {
4362 let cand = sanitize_path_candidate(&tick_buf);
4363 if let Some(path) = cand {
4364 candidates.push(path);
4365 }
4366 tick_buf.clear();
4367 }
4368 in_tick = !in_tick;
4369 continue;
4370 }
4371 if in_tick {
4372 tick_buf.push(ch);
4373 }
4374 }
4375
4376 for raw in trimmed.split_whitespace() {
4378 if let Some(path) = sanitize_path_candidate(raw) {
4379 candidates.push(path);
4380 }
4381 }
4382
4383 let mut deduped = Vec::new();
4384 let mut seen = HashSet::new();
4385 for candidate in candidates {
4386 if seen.insert(candidate.clone()) {
4387 deduped.push(candidate);
4388 }
4389 }
4390
4391 deduped.into_iter().next()
4392}
4393
4394fn infer_workspace_root_from_text(text: &str) -> Option<String> {
4395 text.lines().find_map(|line| {
4396 let trimmed = line.trim();
4397 let value = trimmed.strip_prefix("Workspace:")?.trim();
4398 sanitize_path_candidate(value)
4399 })
4400}
4401
4402fn infer_required_output_target_path_from_text(text: &str) -> Option<String> {
4403 let marker = "Required output target:";
4404 let idx = text.find(marker)?;
4405 let tail = text[idx + marker.len()..].trim_start();
4406 if let Some(start) = tail.find('{') {
4407 let json_candidate = tail[start..]
4408 .lines()
4409 .take_while(|line| {
4410 let trimmed = line.trim();
4411 !(trimmed.is_empty() && !trimmed.starts_with('{'))
4412 })
4413 .collect::<Vec<_>>()
4414 .join("\n");
4415 if let Ok(parsed) = serde_json::from_str::<Value>(&json_candidate) {
4416 if let Some(path) = parsed.get("path").and_then(|v| v.as_str()) {
4417 if let Some(clean) = sanitize_explicit_output_target_path(path) {
4418 return Some(clean);
4419 }
4420 }
4421 }
4422 }
4423 None
4424}
4425
4426fn infer_write_file_path_from_text(text: &str) -> Option<String> {
4427 let inferred = infer_file_path_from_text(text)?;
4428 let workspace_root = infer_workspace_root_from_text(text);
4429 if workspace_root
4430 .as_deref()
4431 .is_some_and(|root| root == inferred)
4432 {
4433 return None;
4434 }
4435 Some(inferred)
4436}
4437
4438fn infer_url_from_text(text: &str) -> Option<String> {
4439 let trimmed = text.trim();
4440 if trimmed.is_empty() {
4441 return None;
4442 }
4443
4444 let mut candidates: Vec<String> = Vec::new();
4445
4446 let mut in_tick = false;
4448 let mut tick_buf = String::new();
4449 for ch in trimmed.chars() {
4450 if ch == '`' {
4451 if in_tick {
4452 if let Some(url) = sanitize_url_candidate(&tick_buf) {
4453 candidates.push(url);
4454 }
4455 tick_buf.clear();
4456 }
4457 in_tick = !in_tick;
4458 continue;
4459 }
4460 if in_tick {
4461 tick_buf.push(ch);
4462 }
4463 }
4464
4465 for raw in trimmed.split_whitespace() {
4467 if let Some(url) = sanitize_url_candidate(raw) {
4468 candidates.push(url);
4469 }
4470 }
4471
4472 let mut seen = HashSet::new();
4473 candidates
4474 .into_iter()
4475 .find(|candidate| seen.insert(candidate.clone()))
4476}
4477
4478fn infer_pack_builder_goal_from_text(text: &str) -> Option<String> {
4479 let trimmed = text.trim();
4480 if trimmed.is_empty() {
4481 None
4482 } else {
4483 Some(trimmed.to_string())
4484 }
4485}
4486
4487fn sanitize_url_candidate(raw: &str) -> Option<String> {
4488 let token = raw
4489 .trim()
4490 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | '*' | '|'))
4491 .trim_start_matches(['(', '[', '{', '<'])
4492 .trim_end_matches([',', ';', ':', ')', ']', '}', '>'])
4493 .trim_end_matches('.')
4494 .trim();
4495
4496 if token.is_empty() {
4497 return None;
4498 }
4499 let lower = token.to_ascii_lowercase();
4500 if !(lower.starts_with("http://") || lower.starts_with("https://")) {
4501 return None;
4502 }
4503 Some(token.to_string())
4504}
4505
4506fn clean_path_candidate_token(raw: &str) -> Option<String> {
4507 let token = raw
4508 .trim()
4509 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | '*' | '|'))
4510 .trim_start_matches(['(', '[', '{', '<'])
4511 .trim_end_matches([',', ';', ':', ')', ']', '}', '>'])
4512 .trim_end_matches('.')
4513 .trim();
4514
4515 if token.is_empty() {
4516 return None;
4517 }
4518 Some(token.to_string())
4519}
4520
4521fn sanitize_explicit_output_target_path(raw: &str) -> Option<String> {
4522 let token = clean_path_candidate_token(raw)?;
4523 let lower = token.to_ascii_lowercase();
4524 if lower.starts_with("http://") || lower.starts_with("https://") {
4525 return None;
4526 }
4527 if is_malformed_tool_path_token(&token) {
4528 return None;
4529 }
4530 if is_root_only_path_token(&token) {
4531 return None;
4532 }
4533 if is_placeholder_path_token(&token) {
4534 return None;
4535 }
4536 if token.ends_with('/') || token.ends_with('\\') {
4537 return None;
4538 }
4539 Some(token.to_string())
4540}
4541
4542fn sanitize_path_candidate(raw: &str) -> Option<String> {
4543 let token = clean_path_candidate_token(raw)?;
4544 let lower = token.to_ascii_lowercase();
4545 if lower.starts_with("http://") || lower.starts_with("https://") {
4546 return None;
4547 }
4548 if is_malformed_tool_path_token(token.as_str()) {
4549 return None;
4550 }
4551 if is_root_only_path_token(token.as_str()) {
4552 return None;
4553 }
4554 if is_placeholder_path_token(token.as_str()) {
4555 return None;
4556 }
4557
4558 let looks_like_path = token.contains('/') || token.contains('\\');
4559 let has_file_ext = [
4560 ".md", ".txt", ".json", ".yaml", ".yml", ".toml", ".rs", ".ts", ".tsx", ".js", ".jsx",
4561 ".py", ".go", ".java", ".cpp", ".c", ".h", ".pdf", ".docx", ".pptx", ".xlsx", ".rtf",
4562 ".html", ".htm", ".css", ".scss", ".sass", ".less", ".svg", ".xml", ".sql", ".sh",
4563 ]
4564 .iter()
4565 .any(|ext| lower.ends_with(ext));
4566
4567 if !looks_like_path && !has_file_ext {
4568 return None;
4569 }
4570
4571 Some(token)
4572}
4573
4574fn is_placeholder_path_token(token: &str) -> bool {
4575 let lowered = token.trim().to_ascii_lowercase();
4576 if lowered.is_empty() {
4577 return true;
4578 }
4579 matches!(
4580 lowered.as_str(),
4581 "files/directories"
4582 | "file/directory"
4583 | "relative/or/absolute/path"
4584 | "path/to/file"
4585 | "path/to/your/file"
4586 | "tool/policy"
4587 | "tools/policy"
4588 | "the expected artifact file"
4589 | "workspace/file"
4590 )
4591}
4592
4593fn is_malformed_tool_path_token(token: &str) -> bool {
4594 let lower = token.to_ascii_lowercase();
4595 if lower.contains("<tool_call")
4597 || lower.contains("</tool_call")
4598 || lower.contains("<function=")
4599 || lower.contains("<parameter=")
4600 || lower.contains("</function>")
4601 || lower.contains("</parameter>")
4602 {
4603 return true;
4604 }
4605 if token.contains('\n') || token.contains('\r') {
4607 return true;
4608 }
4609 if token.contains('*') || token.contains('?') {
4611 return true;
4612 }
4613 false
4614}
4615
4616fn is_root_only_path_token(token: &str) -> bool {
4617 let trimmed = token.trim();
4618 if trimmed.is_empty() {
4619 return true;
4620 }
4621 if matches!(trimmed, "/" | "\\" | "." | ".." | "~") {
4622 return true;
4623 }
4624 let bytes = trimmed.as_bytes();
4626 if bytes.len() == 2 && bytes[1] == b':' && (bytes[0] as char).is_ascii_alphabetic() {
4627 return true;
4628 }
4629 if bytes.len() == 3
4630 && bytes[1] == b':'
4631 && (bytes[0] as char).is_ascii_alphabetic()
4632 && (bytes[2] == b'\\' || bytes[2] == b'/')
4633 {
4634 return true;
4635 }
4636 false
4637}
4638
4639fn sanitize_shell_command_candidate(raw: &str) -> Option<String> {
4640 let token = raw
4641 .trim()
4642 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | ',' | ';'))
4643 .trim();
4644 if token.is_empty() {
4645 return None;
4646 }
4647 Some(token.to_string())
4648}
4649
4650fn looks_like_shell_command(candidate: &str) -> bool {
4651 let lower = candidate.to_ascii_lowercase();
4652 if lower.is_empty() {
4653 return false;
4654 }
4655 let first = lower.split_whitespace().next().unwrap_or_default();
4656 let common = [
4657 "rg",
4658 "git",
4659 "cargo",
4660 "pnpm",
4661 "npm",
4662 "node",
4663 "python",
4664 "pytest",
4665 "pwsh",
4666 "powershell",
4667 "cmd",
4668 "dir",
4669 "ls",
4670 "cat",
4671 "type",
4672 "echo",
4673 "cd",
4674 "mkdir",
4675 "cp",
4676 "copy",
4677 "move",
4678 "del",
4679 "rm",
4680 ];
4681 common.contains(&first)
4682 || first.starts_with("get-")
4683 || first.starts_with("./")
4684 || first.starts_with(".\\")
4685 || lower.contains(" | ")
4686 || lower.contains(" && ")
4687 || lower.contains(" ; ")
4688}
4689
4690const FILE_PATH_KEYS: [&str; 10] = [
4691 "path",
4692 "file_path",
4693 "filePath",
4694 "filepath",
4695 "filename",
4696 "file",
4697 "target",
4698 "targetFile",
4699 "absolutePath",
4700 "uri",
4701];
4702
4703const SHELL_COMMAND_KEYS: [&str; 4] = ["command", "cmd", "script", "line"];
4704
4705const WRITE_CONTENT_KEYS: [&str; 8] = [
4706 "content",
4707 "text",
4708 "body",
4709 "value",
4710 "markdown",
4711 "document",
4712 "output",
4713 "file_content",
4714];
4715
4716const NESTED_ARGS_KEYS: [&str; 10] = [
4717 "arguments",
4718 "args",
4719 "input",
4720 "params",
4721 "payload",
4722 "data",
4723 "tool_input",
4724 "toolInput",
4725 "tool_args",
4726 "toolArgs",
4727];
4728
4729fn tool_signature(tool_name: &str, args: &Value) -> String {
4730 let normalized = normalize_tool_name(tool_name);
4731 if normalized == "websearch" {
4732 let query = extract_websearch_query(args)
4733 .unwrap_or_default()
4734 .to_lowercase();
4735 let limit = args
4736 .get("limit")
4737 .or_else(|| args.get("numResults"))
4738 .or_else(|| args.get("num_results"))
4739 .and_then(|v| v.as_u64())
4740 .unwrap_or(8);
4741 let domains = args
4742 .get("domains")
4743 .or_else(|| args.get("domain"))
4744 .map(|v| v.to_string())
4745 .unwrap_or_default();
4746 let recency = args.get("recency").and_then(|v| v.as_u64()).unwrap_or(0);
4747 return format!("websearch:q={query}|limit={limit}|domains={domains}|recency={recency}");
4748 }
4749 format!("{}:{}", normalized, args)
4750}
4751
4752fn stable_hash(input: &str) -> String {
4753 let mut hasher = DefaultHasher::new();
4754 input.hash(&mut hasher);
4755 format!("{:016x}", hasher.finish())
4756}
4757
4758fn summarize_tool_outputs(outputs: &[String]) -> String {
4759 outputs
4760 .iter()
4761 .take(6)
4762 .map(|output| truncate_text(output, 600))
4763 .collect::<Vec<_>>()
4764 .join("\n\n")
4765}
4766
4767fn is_os_mismatch_tool_output(output: &str) -> bool {
4768 let lower = output.to_ascii_lowercase();
4769 lower.contains("os error 3")
4770 || lower.contains("system cannot find the path specified")
4771 || lower.contains("command not found")
4772 || lower.contains("is not recognized as an internal or external command")
4773 || lower.contains("shell command blocked on windows")
4774}
4775
4776fn format_context_mode(requested: &ContextMode, auto_compact: bool) -> &'static str {
4777 match requested {
4778 ContextMode::Full => "full",
4779 ContextMode::Compact => "compact",
4780 ContextMode::Auto => {
4781 if auto_compact {
4782 "auto_compact"
4783 } else {
4784 "auto_standard"
4785 }
4786 }
4787 }
4788}
4789
4790fn tandem_runtime_system_prompt(host: &HostRuntimeContext, mcp_server_names: &[String]) -> String {
4791 let mut sections = Vec::new();
4792 if os_aware_prompts_enabled() {
4793 sections.push(format!(
4794 "[Execution Environment]\nHost OS: {}\nShell: {}\nPath style: {}\nArchitecture: {}",
4795 host_os_label(host.os),
4796 shell_family_label(host.shell_family),
4797 path_style_label(host.path_style),
4798 host.arch
4799 ));
4800 }
4801 sections.push(
4802 "You are operating inside Tandem (Desktop/TUI) as an engine-backed coding assistant.
4803Use tool calls to inspect and modify the workspace when needed instead of asking the user
4804to manually run basic discovery steps. Permission prompts may occur for some tools; if
4805a tool is denied or blocked, explain what was blocked and suggest a concrete next step."
4806 .to_string(),
4807 );
4808 sections.push(
4809 "For greetings or simple conversational messages (for example: hi, hello, thanks),
4810respond directly without calling tools."
4811 .to_string(),
4812 );
4813 if host.os == HostOs::Windows {
4814 sections.push(
4815 "Windows guidance: prefer cross-platform tools (`glob`, `grep`, `read`, `write`, `edit`) and PowerShell-native commands.
4816Avoid Unix-only shell syntax (`ls -la`, `find ... -type f`, `cat` pipelines) unless translated.
4817If a shell command fails with a path/shell mismatch, immediately switch to cross-platform tools (`read`, `glob`, `grep`)."
4818 .to_string(),
4819 );
4820 } else {
4821 sections.push(
4822 "POSIX guidance: standard shell commands are available.
4823Use cross-platform tools (`glob`, `grep`, `read`) when they are simpler and safer for codebase exploration."
4824 .to_string(),
4825 );
4826 }
4827 if !mcp_server_names.is_empty() {
4828 let cap = mcp_catalog_max_servers();
4829 let mut listed = mcp_server_names
4830 .iter()
4831 .take(cap)
4832 .cloned()
4833 .collect::<Vec<_>>();
4834 listed.sort();
4835 let mut catalog = listed
4836 .iter()
4837 .map(|name| format!("- {name}"))
4838 .collect::<Vec<_>>();
4839 if mcp_server_names.len() > cap {
4840 catalog.push(format!("- (+{} more)", mcp_server_names.len() - cap));
4841 }
4842 sections.push(format!(
4843 "[Connected Integrations]\nThe following external integrations are currently connected and available:\n{}",
4844 catalog.join("\n")
4845 ));
4846 }
4847 sections.join("\n\n")
4848}
4849
4850fn os_aware_prompts_enabled() -> bool {
4851 std::env::var("TANDEM_OS_AWARE_PROMPTS")
4852 .ok()
4853 .map(|v| {
4854 let normalized = v.trim().to_ascii_lowercase();
4855 !(normalized == "0" || normalized == "false" || normalized == "off")
4856 })
4857 .unwrap_or(true)
4858}
4859
4860fn semantic_tool_retrieval_enabled() -> bool {
4861 std::env::var("TANDEM_SEMANTIC_TOOL_RETRIEVAL")
4862 .ok()
4863 .map(|raw| {
4864 !matches!(
4865 raw.trim().to_ascii_lowercase().as_str(),
4866 "0" | "false" | "off" | "no"
4867 )
4868 })
4869 .unwrap_or(true)
4870}
4871
4872fn semantic_tool_retrieval_k() -> usize {
4873 std::env::var("TANDEM_SEMANTIC_TOOL_RETRIEVAL_K")
4874 .ok()
4875 .and_then(|raw| raw.trim().parse::<usize>().ok())
4876 .filter(|value| *value > 0)
4877 .unwrap_or_else(max_tools_per_call_expanded)
4878}
4879
4880fn mcp_catalog_in_system_prompt_enabled() -> bool {
4881 std::env::var("TANDEM_MCP_CATALOG_IN_SYSTEM_PROMPT")
4882 .ok()
4883 .map(|raw| {
4884 !matches!(
4885 raw.trim().to_ascii_lowercase().as_str(),
4886 "0" | "false" | "off" | "no"
4887 )
4888 })
4889 .unwrap_or(true)
4890}
4891
4892fn mcp_catalog_max_servers() -> usize {
4893 std::env::var("TANDEM_MCP_CATALOG_MAX_SERVERS")
4894 .ok()
4895 .and_then(|raw| raw.trim().parse::<usize>().ok())
4896 .filter(|value| *value > 0)
4897 .unwrap_or(20)
4898}
4899
4900fn host_os_label(os: HostOs) -> &'static str {
4901 match os {
4902 HostOs::Windows => "windows",
4903 HostOs::Linux => "linux",
4904 HostOs::Macos => "macos",
4905 }
4906}
4907
4908fn shell_family_label(shell: ShellFamily) -> &'static str {
4909 match shell {
4910 ShellFamily::Powershell => "powershell",
4911 ShellFamily::Posix => "posix",
4912 }
4913}
4914
4915fn path_style_label(path_style: PathStyle) -> &'static str {
4916 match path_style {
4917 PathStyle::Windows => "windows",
4918 PathStyle::Posix => "posix",
4919 }
4920}
4921
4922fn should_force_workspace_probe(user_text: &str, completion: &str) -> bool {
4923 let user = user_text.to_lowercase();
4924 let reply = completion.to_lowercase();
4925
4926 let asked_for_project_context = [
4927 "what is this project",
4928 "what's this project",
4929 "what project is this",
4930 "explain this project",
4931 "analyze this project",
4932 "inspect this project",
4933 "look at the project",
4934 "summarize this project",
4935 "show me this project",
4936 "what files are in",
4937 "show files",
4938 "list files",
4939 "read files",
4940 "browse files",
4941 "use glob",
4942 "run glob",
4943 ]
4944 .iter()
4945 .any(|needle| user.contains(needle));
4946
4947 if !asked_for_project_context {
4948 return false;
4949 }
4950
4951 let assistant_claimed_no_access = [
4952 "can't inspect",
4953 "cannot inspect",
4954 "unable to inspect",
4955 "unable to directly inspect",
4956 "can't access",
4957 "cannot access",
4958 "unable to access",
4959 "can't read files",
4960 "cannot read files",
4961 "unable to read files",
4962 "tool restriction",
4963 "tool restrictions",
4964 "don't have visibility",
4965 "no visibility",
4966 "haven't been able to inspect",
4967 "i don't know what this project is",
4968 "need your help to",
4969 "sandbox",
4970 "restriction",
4971 "system restriction",
4972 "permissions restrictions",
4973 ]
4974 .iter()
4975 .any(|needle| reply.contains(needle));
4976
4977 asked_for_project_context && assistant_claimed_no_access
4980}
4981
4982fn parse_tool_invocation(input: &str) -> Option<(String, serde_json::Value)> {
4983 let raw = input.trim();
4984 if !raw.starts_with("/tool ") {
4985 return None;
4986 }
4987 let rest = raw.trim_start_matches("/tool ").trim();
4988 let mut split = rest.splitn(2, ' ');
4989 let tool = normalize_tool_name(split.next()?.trim());
4990 let args = split
4991 .next()
4992 .and_then(|v| serde_json::from_str::<serde_json::Value>(v).ok())
4993 .unwrap_or_else(|| json!({}));
4994 Some((tool, args))
4995}
4996
4997fn parse_tool_invocations_from_response(input: &str) -> Vec<(String, serde_json::Value)> {
4998 let trimmed = input.trim();
4999 if trimmed.is_empty() {
5000 return Vec::new();
5001 }
5002
5003 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
5004 if let Some(found) = extract_tool_call_from_value(&parsed) {
5005 return vec![found];
5006 }
5007 }
5008
5009 if let Some(block) = extract_first_json_object(trimmed) {
5010 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&block) {
5011 if let Some(found) = extract_tool_call_from_value(&parsed) {
5012 return vec![found];
5013 }
5014 }
5015 }
5016
5017 parse_function_style_tool_calls(trimmed)
5018}
5019
5020#[cfg(test)]
5021fn parse_tool_invocation_from_response(input: &str) -> Option<(String, serde_json::Value)> {
5022 parse_tool_invocations_from_response(input)
5023 .into_iter()
5024 .next()
5025}
5026
5027fn parse_function_style_tool_calls(input: &str) -> Vec<(String, Value)> {
5028 let mut calls = Vec::new();
5029 let lower = input.to_lowercase();
5030 let names = [
5031 "todo_write",
5032 "todowrite",
5033 "update_todo_list",
5034 "update_todos",
5035 ];
5036 let mut cursor = 0usize;
5037
5038 while cursor < lower.len() {
5039 let mut best: Option<(usize, &str)> = None;
5040 for name in names {
5041 let needle = format!("{name}(");
5042 if let Some(rel_idx) = lower[cursor..].find(&needle) {
5043 let idx = cursor + rel_idx;
5044 if best.as_ref().is_none_or(|(best_idx, _)| idx < *best_idx) {
5045 best = Some((idx, name));
5046 }
5047 }
5048 }
5049
5050 let Some((tool_start, tool_name)) = best else {
5051 break;
5052 };
5053
5054 let open_paren = tool_start + tool_name.len();
5055 if let Some(close_paren) = find_matching_paren(input, open_paren) {
5056 if let Some(args_text) = input.get(open_paren + 1..close_paren) {
5057 let args = parse_function_style_args(args_text.trim());
5058 calls.push((normalize_tool_name(tool_name), Value::Object(args)));
5059 }
5060 cursor = close_paren.saturating_add(1);
5061 } else {
5062 cursor = tool_start.saturating_add(tool_name.len());
5063 }
5064 }
5065
5066 calls
5067}
5068
5069fn find_matching_paren(input: &str, open_paren: usize) -> Option<usize> {
5070 if input.as_bytes().get(open_paren).copied()? != b'(' {
5071 return None;
5072 }
5073
5074 let mut depth = 0usize;
5075 let mut in_single = false;
5076 let mut in_double = false;
5077 let mut escaped = false;
5078
5079 for (offset, ch) in input.get(open_paren..)?.char_indices() {
5080 if escaped {
5081 escaped = false;
5082 continue;
5083 }
5084 if ch == '\\' && (in_single || in_double) {
5085 escaped = true;
5086 continue;
5087 }
5088 if ch == '\'' && !in_double {
5089 in_single = !in_single;
5090 continue;
5091 }
5092 if ch == '"' && !in_single {
5093 in_double = !in_double;
5094 continue;
5095 }
5096 if in_single || in_double {
5097 continue;
5098 }
5099
5100 match ch {
5101 '(' => depth += 1,
5102 ')' => {
5103 depth = depth.saturating_sub(1);
5104 if depth == 0 {
5105 return Some(open_paren + offset);
5106 }
5107 }
5108 _ => {}
5109 }
5110 }
5111
5112 None
5113}
5114
5115fn parse_function_style_args(input: &str) -> Map<String, Value> {
5116 let mut args = Map::new();
5117 if input.trim().is_empty() {
5118 return args;
5119 }
5120
5121 let mut parts = Vec::<String>::new();
5122 let mut current = String::new();
5123 let mut in_single = false;
5124 let mut in_double = false;
5125 let mut escaped = false;
5126 let mut depth_paren = 0usize;
5127 let mut depth_bracket = 0usize;
5128 let mut depth_brace = 0usize;
5129
5130 for ch in input.chars() {
5131 if escaped {
5132 current.push(ch);
5133 escaped = false;
5134 continue;
5135 }
5136 if ch == '\\' && (in_single || in_double) {
5137 current.push(ch);
5138 escaped = true;
5139 continue;
5140 }
5141 if ch == '\'' && !in_double {
5142 in_single = !in_single;
5143 current.push(ch);
5144 continue;
5145 }
5146 if ch == '"' && !in_single {
5147 in_double = !in_double;
5148 current.push(ch);
5149 continue;
5150 }
5151 if in_single || in_double {
5152 current.push(ch);
5153 continue;
5154 }
5155
5156 match ch {
5157 '(' => depth_paren += 1,
5158 ')' => depth_paren = depth_paren.saturating_sub(1),
5159 '[' => depth_bracket += 1,
5160 ']' => depth_bracket = depth_bracket.saturating_sub(1),
5161 '{' => depth_brace += 1,
5162 '}' => depth_brace = depth_brace.saturating_sub(1),
5163 ',' if depth_paren == 0 && depth_bracket == 0 && depth_brace == 0 => {
5164 let part = current.trim();
5165 if !part.is_empty() {
5166 parts.push(part.to_string());
5167 }
5168 current.clear();
5169 continue;
5170 }
5171 _ => {}
5172 }
5173 current.push(ch);
5174 }
5175 let tail = current.trim();
5176 if !tail.is_empty() {
5177 parts.push(tail.to_string());
5178 }
5179
5180 for part in parts {
5181 let Some((raw_key, raw_value)) = part
5182 .split_once('=')
5183 .or_else(|| part.split_once(':'))
5184 .map(|(k, v)| (k.trim(), v.trim()))
5185 else {
5186 continue;
5187 };
5188 let key = raw_key.trim_matches(|c| c == '"' || c == '\'' || c == '`');
5189 if key.is_empty() {
5190 continue;
5191 }
5192 let value = parse_scalar_like_value(raw_value);
5193 args.insert(key.to_string(), value);
5194 }
5195
5196 args
5197}
5198
5199fn parse_scalar_like_value(raw: &str) -> Value {
5200 let trimmed = raw.trim();
5201 if trimmed.is_empty() {
5202 return Value::Null;
5203 }
5204
5205 if (trimmed.starts_with('"') && trimmed.ends_with('"'))
5206 || (trimmed.starts_with('\'') && trimmed.ends_with('\''))
5207 {
5208 return Value::String(trimmed[1..trimmed.len().saturating_sub(1)].to_string());
5209 }
5210
5211 if trimmed.eq_ignore_ascii_case("true") {
5212 return Value::Bool(true);
5213 }
5214 if trimmed.eq_ignore_ascii_case("false") {
5215 return Value::Bool(false);
5216 }
5217 if trimmed.eq_ignore_ascii_case("null") {
5218 return Value::Null;
5219 }
5220
5221 if let Ok(v) = serde_json::from_str::<Value>(trimmed) {
5222 return v;
5223 }
5224 if let Ok(v) = trimmed.parse::<i64>() {
5225 return Value::Number(Number::from(v));
5226 }
5227 if let Ok(v) = trimmed.parse::<f64>() {
5228 if let Some(n) = Number::from_f64(v) {
5229 return Value::Number(n);
5230 }
5231 }
5232
5233 Value::String(trimmed.to_string())
5234}
5235
5236fn recover_write_args_from_malformed_json(raw: &str) -> Option<Value> {
5237 let content = extract_loose_json_string_field(raw, "content")?;
5238 let mut obj = Map::new();
5239 if let Some(path) = extract_loose_json_string_field(raw, "path") {
5240 obj.insert("path".to_string(), Value::String(path));
5241 }
5242 obj.insert("content".to_string(), Value::String(content));
5243 Some(Value::Object(obj))
5244}
5245
5246fn extract_loose_json_string_field(input: &str, key: &str) -> Option<String> {
5247 let pattern = format!("\"{key}\"");
5248 let start = input.find(&pattern)?;
5249 let remainder = input.get(start + pattern.len()..)?;
5250 let colon = remainder.find(':')?;
5251 let value = remainder.get(colon + 1..)?.trim_start();
5252 let value = value.strip_prefix('"')?;
5253 Some(parse_loose_json_string_value(value))
5254}
5255
5256fn parse_loose_json_string_value(input: &str) -> String {
5257 let mut out = String::new();
5258 let mut chars = input.chars().peekable();
5259 let mut closed = false;
5260
5261 while let Some(ch) = chars.next() {
5262 if ch == '"' {
5263 closed = true;
5264 break;
5265 }
5266 if ch != '\\' {
5267 out.push(ch);
5268 continue;
5269 }
5270
5271 let Some(escaped) = chars.next() else {
5272 out.push('\\');
5273 break;
5274 };
5275 match escaped {
5276 '"' => out.push('"'),
5277 '\\' => out.push('\\'),
5278 '/' => out.push('/'),
5279 'b' => out.push('\u{0008}'),
5280 'f' => out.push('\u{000C}'),
5281 'n' => out.push('\n'),
5282 'r' => out.push('\r'),
5283 't' => out.push('\t'),
5284 'u' => {
5285 let mut hex = String::new();
5286 for _ in 0..4 {
5287 let Some(next) = chars.next() else {
5288 break;
5289 };
5290 hex.push(next);
5291 }
5292 if hex.len() == 4 {
5293 if let Ok(codepoint) = u16::from_str_radix(&hex, 16) {
5294 if let Some(decoded) = char::from_u32(codepoint as u32) {
5295 out.push(decoded);
5296 continue;
5297 }
5298 }
5299 }
5300 out.push('\\');
5301 out.push('u');
5302 out.push_str(&hex);
5303 }
5304 other => {
5305 out.push('\\');
5306 out.push(other);
5307 }
5308 }
5309 }
5310
5311 if !closed {
5312 return out;
5313 }
5314 out
5315}
5316
5317fn normalize_todo_write_args(args: Value, completion: &str) -> Value {
5318 if is_todo_status_update_args(&args) {
5319 return args;
5320 }
5321
5322 let mut obj = match args {
5323 Value::Object(map) => map,
5324 Value::Array(items) => {
5325 return json!({ "todos": normalize_todo_arg_items(items) });
5326 }
5327 Value::String(text) => {
5328 let derived = extract_todo_candidates_from_text(&text);
5329 if !derived.is_empty() {
5330 return json!({ "todos": derived });
5331 }
5332 return json!({});
5333 }
5334 _ => return json!({}),
5335 };
5336
5337 if obj
5338 .get("todos")
5339 .and_then(|v| v.as_array())
5340 .map(|arr| !arr.is_empty())
5341 .unwrap_or(false)
5342 {
5343 return Value::Object(obj);
5344 }
5345
5346 for alias in ["tasks", "items", "list", "checklist"] {
5347 if let Some(items) = obj.get(alias).and_then(|v| v.as_array()) {
5348 let normalized = normalize_todo_arg_items(items.clone());
5349 if !normalized.is_empty() {
5350 obj.insert("todos".to_string(), Value::Array(normalized));
5351 return Value::Object(obj);
5352 }
5353 }
5354 }
5355
5356 let derived = extract_todo_candidates_from_text(completion);
5357 if !derived.is_empty() {
5358 obj.insert("todos".to_string(), Value::Array(derived));
5359 }
5360 Value::Object(obj)
5361}
5362
5363fn normalize_todo_arg_items(items: Vec<Value>) -> Vec<Value> {
5364 items
5365 .into_iter()
5366 .filter_map(|item| match item {
5367 Value::String(text) => {
5368 let content = text.trim();
5369 if content.is_empty() {
5370 None
5371 } else {
5372 Some(json!({"content": content}))
5373 }
5374 }
5375 Value::Object(mut obj) => {
5376 if !obj.contains_key("content") {
5377 if let Some(text) = obj.get("text").cloned() {
5378 obj.insert("content".to_string(), text);
5379 } else if let Some(title) = obj.get("title").cloned() {
5380 obj.insert("content".to_string(), title);
5381 } else if let Some(name) = obj.get("name").cloned() {
5382 obj.insert("content".to_string(), name);
5383 }
5384 }
5385 let content = obj
5386 .get("content")
5387 .and_then(|v| v.as_str())
5388 .map(str::trim)
5389 .unwrap_or("");
5390 if content.is_empty() {
5391 None
5392 } else {
5393 Some(Value::Object(obj))
5394 }
5395 }
5396 _ => None,
5397 })
5398 .collect()
5399}
5400
5401fn is_todo_status_update_args(args: &Value) -> bool {
5402 let Some(obj) = args.as_object() else {
5403 return false;
5404 };
5405 let has_status = obj
5406 .get("status")
5407 .and_then(|v| v.as_str())
5408 .map(|s| !s.trim().is_empty())
5409 .unwrap_or(false);
5410 let has_target =
5411 obj.get("task_id").is_some() || obj.get("todo_id").is_some() || obj.get("id").is_some();
5412 has_status && has_target
5413}
5414
5415fn is_empty_todo_write_args(args: &Value) -> bool {
5416 if is_todo_status_update_args(args) {
5417 return false;
5418 }
5419 let Some(obj) = args.as_object() else {
5420 return true;
5421 };
5422 !obj.get("todos")
5423 .and_then(|v| v.as_array())
5424 .map(|arr| !arr.is_empty())
5425 .unwrap_or(false)
5426}
5427
5428fn parse_streamed_tool_args(tool_name: &str, raw_args: &str) -> Value {
5429 let trimmed = raw_args.trim();
5430 if trimmed.is_empty() {
5431 return json!({});
5432 }
5433
5434 let normalized_tool = normalize_tool_name(tool_name);
5435 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
5436 return normalize_streamed_tool_args(&normalized_tool, parsed, trimmed);
5437 }
5438
5439 if normalized_tool == "write" {
5440 if let Some(recovered) = recover_write_args_from_malformed_json(trimmed) {
5441 return recovered;
5442 }
5443 }
5444
5445 let kv_args = parse_function_style_args(trimmed);
5448 if !kv_args.is_empty() {
5449 return normalize_streamed_tool_args(&normalized_tool, Value::Object(kv_args), trimmed);
5450 }
5451
5452 if normalized_tool == "websearch" {
5453 if let Some(query) = sanitize_websearch_query_candidate(trimmed) {
5454 return json!({ "query": query });
5455 }
5456 return json!({});
5457 }
5458
5459 Value::String(trimmed.to_string())
5460}
5461
5462fn normalize_streamed_tool_args(tool_name: &str, parsed: Value, raw: &str) -> Value {
5463 let normalized_tool = normalize_tool_name(tool_name);
5464 if normalized_tool != "websearch" {
5465 return parsed;
5466 }
5467
5468 match parsed {
5469 Value::Object(mut obj) => {
5470 if !has_websearch_query(&obj) && !raw.trim().is_empty() {
5471 if let Some(query) = sanitize_websearch_query_candidate(raw) {
5472 obj.insert("query".to_string(), Value::String(query));
5473 }
5474 }
5475 Value::Object(obj)
5476 }
5477 Value::String(s) => match sanitize_websearch_query_candidate(&s) {
5478 Some(query) => json!({ "query": query }),
5479 None => json!({}),
5480 },
5481 other => other,
5482 }
5483}
5484
5485fn has_websearch_query(obj: &Map<String, Value>) -> bool {
5486 const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
5487 QUERY_KEYS.iter().any(|key| {
5488 obj.get(*key)
5489 .and_then(|v| v.as_str())
5490 .map(|s| !s.trim().is_empty())
5491 .unwrap_or(false)
5492 })
5493}
5494
5495fn extract_tool_call_from_value(value: &Value) -> Option<(String, Value)> {
5496 if let Some(obj) = value.as_object() {
5497 if let Some(tool) = obj.get("tool").and_then(|v| v.as_str()) {
5498 return Some((
5499 normalize_tool_name(tool),
5500 obj.get("args").cloned().unwrap_or_else(|| json!({})),
5501 ));
5502 }
5503
5504 if let Some(tool) = obj.get("name").and_then(|v| v.as_str()) {
5505 let args = obj
5506 .get("args")
5507 .cloned()
5508 .or_else(|| obj.get("arguments").cloned())
5509 .unwrap_or_else(|| json!({}));
5510 let normalized_tool = normalize_tool_name(tool);
5511 let args = if let Some(raw) = args.as_str() {
5512 parse_streamed_tool_args(&normalized_tool, raw)
5513 } else {
5514 args
5515 };
5516 return Some((normalized_tool, args));
5517 }
5518
5519 for key in [
5520 "tool_call",
5521 "toolCall",
5522 "call",
5523 "function_call",
5524 "functionCall",
5525 ] {
5526 if let Some(nested) = obj.get(key) {
5527 if let Some(found) = extract_tool_call_from_value(nested) {
5528 return Some(found);
5529 }
5530 }
5531 }
5532
5533 if let Some(calls) = obj.get("tool_calls").and_then(|v| v.as_array()) {
5534 for call in calls {
5535 if let Some(found) = extract_tool_call_from_value(call) {
5536 return Some(found);
5537 }
5538 }
5539 }
5540 }
5541
5542 if let Some(items) = value.as_array() {
5543 for item in items {
5544 if let Some(found) = extract_tool_call_from_value(item) {
5545 return Some(found);
5546 }
5547 }
5548 }
5549
5550 None
5551}
5552
5553fn extract_first_json_object(input: &str) -> Option<String> {
5554 let mut start = None;
5555 let mut depth = 0usize;
5556 for (idx, ch) in input.char_indices() {
5557 if ch == '{' {
5558 if start.is_none() {
5559 start = Some(idx);
5560 }
5561 depth += 1;
5562 } else if ch == '}' {
5563 if depth == 0 {
5564 continue;
5565 }
5566 depth -= 1;
5567 if depth == 0 {
5568 let begin = start?;
5569 let block = input.get(begin..=idx)?;
5570 return Some(block.to_string());
5571 }
5572 }
5573 }
5574 None
5575}
5576
5577fn extract_todo_candidates_from_text(input: &str) -> Vec<Value> {
5578 let mut seen = HashSet::<String>::new();
5579 let mut todos = Vec::new();
5580
5581 for raw_line in input.lines() {
5582 let mut line = raw_line.trim();
5583 let mut structured_line = false;
5584 if line.is_empty() {
5585 continue;
5586 }
5587 if line.starts_with("```") {
5588 continue;
5589 }
5590 if line.ends_with(':') {
5591 continue;
5592 }
5593 if let Some(rest) = line
5594 .strip_prefix("- [ ]")
5595 .or_else(|| line.strip_prefix("* [ ]"))
5596 .or_else(|| line.strip_prefix("- [x]"))
5597 .or_else(|| line.strip_prefix("* [x]"))
5598 {
5599 line = rest.trim();
5600 structured_line = true;
5601 } else if let Some(rest) = line.strip_prefix("- ").or_else(|| line.strip_prefix("* ")) {
5602 line = rest.trim();
5603 structured_line = true;
5604 } else {
5605 let bytes = line.as_bytes();
5606 let mut i = 0usize;
5607 while i < bytes.len() && bytes[i].is_ascii_digit() {
5608 i += 1;
5609 }
5610 if i > 0 && i + 1 < bytes.len() && (bytes[i] == b'.' || bytes[i] == b')') {
5611 line = line[i + 1..].trim();
5612 structured_line = true;
5613 }
5614 }
5615 if !structured_line {
5616 continue;
5617 }
5618
5619 let content = line.trim_matches(|c: char| c.is_whitespace() || c == '-' || c == '*');
5620 if content.len() < 5 || content.len() > 180 {
5621 continue;
5622 }
5623 let key = content.to_lowercase();
5624 if seen.contains(&key) {
5625 continue;
5626 }
5627 seen.insert(key);
5628 todos.push(json!({ "content": content }));
5629 if todos.len() >= 25 {
5630 break;
5631 }
5632 }
5633
5634 todos
5635}
5636
5637async fn emit_plan_todo_fallback(
5638 storage: std::sync::Arc<Storage>,
5639 bus: &EventBus,
5640 session_id: &str,
5641 message_id: &str,
5642 completion: &str,
5643) {
5644 let todos = extract_todo_candidates_from_text(completion);
5645 if todos.is_empty() {
5646 return;
5647 }
5648
5649 let invoke_part = WireMessagePart::tool_invocation(
5650 session_id,
5651 message_id,
5652 "todo_write",
5653 json!({"todos": todos.clone()}),
5654 );
5655 let call_id = invoke_part.id.clone();
5656 bus.publish(EngineEvent::new(
5657 "message.part.updated",
5658 json!({"part": invoke_part}),
5659 ));
5660
5661 if storage.set_todos(session_id, todos.clone()).await.is_err() {
5662 let mut failed_part = WireMessagePart::tool_result(
5663 session_id,
5664 message_id,
5665 "todo_write",
5666 Some(json!({"todos": todos.clone()})),
5667 json!(null),
5668 );
5669 failed_part.id = call_id;
5670 failed_part.state = Some("failed".to_string());
5671 failed_part.error = Some("failed to persist plan todos".to_string());
5672 bus.publish(EngineEvent::new(
5673 "message.part.updated",
5674 json!({"part": failed_part}),
5675 ));
5676 return;
5677 }
5678
5679 let normalized = storage.get_todos(session_id).await;
5680 let mut result_part = WireMessagePart::tool_result(
5681 session_id,
5682 message_id,
5683 "todo_write",
5684 Some(json!({"todos": todos.clone()})),
5685 json!({ "todos": normalized }),
5686 );
5687 result_part.id = call_id;
5688 bus.publish(EngineEvent::new(
5689 "message.part.updated",
5690 json!({"part": result_part}),
5691 ));
5692 bus.publish(EngineEvent::new(
5693 "todo.updated",
5694 json!({
5695 "sessionID": session_id,
5696 "todos": normalized
5697 }),
5698 ));
5699}
5700
5701async fn emit_plan_question_fallback(
5702 storage: std::sync::Arc<Storage>,
5703 bus: &EventBus,
5704 session_id: &str,
5705 message_id: &str,
5706 completion: &str,
5707) {
5708 let trimmed = completion.trim();
5709 if trimmed.is_empty() {
5710 return;
5711 }
5712
5713 let hints = extract_todo_candidates_from_text(trimmed)
5714 .into_iter()
5715 .take(6)
5716 .filter_map(|v| {
5717 v.get("content")
5718 .and_then(|c| c.as_str())
5719 .map(ToString::to_string)
5720 })
5721 .collect::<Vec<_>>();
5722
5723 let mut options = hints
5724 .iter()
5725 .map(|label| json!({"label": label, "description": "Use this as a starting task"}))
5726 .collect::<Vec<_>>();
5727 if options.is_empty() {
5728 options = vec![
5729 json!({"label":"Define scope", "description":"Clarify the intended outcome"}),
5730 json!({"label":"Provide constraints", "description":"Budget, timeline, and constraints"}),
5731 json!({"label":"Draft a starter list", "description":"Generate a first-pass task list"}),
5732 ];
5733 }
5734
5735 let question_payload = vec![json!({
5736 "header":"Planning Input",
5737 "question":"I couldn't produce a concrete task list yet. Which tasks should I include first?",
5738 "options": options,
5739 "multiple": true,
5740 "custom": true
5741 })];
5742
5743 let request = storage
5744 .add_question_request(session_id, message_id, question_payload.clone())
5745 .await
5746 .ok();
5747 bus.publish(EngineEvent::new(
5748 "question.asked",
5749 json!({
5750 "id": request
5751 .as_ref()
5752 .map(|req| req.id.clone())
5753 .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
5754 "sessionID": session_id,
5755 "messageID": message_id,
5756 "questions": question_payload,
5757 "tool": request.and_then(|req| {
5758 req.tool.map(|tool| {
5759 json!({
5760 "callID": tool.call_id,
5761 "messageID": tool.message_id
5762 })
5763 })
5764 })
5765 }),
5766 ));
5767}
5768
5769#[derive(Debug, Clone, Copy)]
5770enum ChatHistoryProfile {
5771 Full,
5772 Standard,
5773 Compact,
5774}
5775
5776async fn load_chat_history(
5777 storage: std::sync::Arc<Storage>,
5778 session_id: &str,
5779 profile: ChatHistoryProfile,
5780) -> Vec<ChatMessage> {
5781 let Some(session) = storage.get_session(session_id).await else {
5782 return Vec::new();
5783 };
5784 let messages = session
5785 .messages
5786 .into_iter()
5787 .map(|m| {
5788 let role = format!("{:?}", m.role).to_lowercase();
5789 let content = m
5790 .parts
5791 .into_iter()
5792 .map(|part| match part {
5793 MessagePart::Text { text } => text,
5794 MessagePart::Reasoning { text } => text,
5795 MessagePart::ToolInvocation { tool, result, .. } => {
5796 format!("Tool {tool} => {}", result.unwrap_or_else(|| json!({})))
5797 }
5798 })
5799 .collect::<Vec<_>>()
5800 .join("\n");
5801 ChatMessage {
5802 role,
5803 content,
5804 attachments: Vec::new(),
5805 }
5806 })
5807 .collect::<Vec<_>>();
5808 compact_chat_history(messages, profile)
5809}
5810
5811fn attach_to_last_user_message(messages: &mut [ChatMessage], attachments: &[ChatAttachment]) {
5812 if attachments.is_empty() {
5813 return;
5814 }
5815 if let Some(message) = messages.iter_mut().rev().find(|m| m.role == "user") {
5816 message.attachments = attachments.to_vec();
5817 }
5818}
5819
5820async fn build_runtime_attachments(
5821 provider_id: &str,
5822 parts: &[MessagePartInput],
5823) -> Vec<ChatAttachment> {
5824 if !supports_image_attachments(provider_id) {
5825 return Vec::new();
5826 }
5827
5828 let mut attachments = Vec::new();
5829 for part in parts {
5830 let MessagePartInput::File { mime, url, .. } = part else {
5831 continue;
5832 };
5833 if !mime.to_ascii_lowercase().starts_with("image/") {
5834 continue;
5835 }
5836 if let Some(source_url) = normalize_attachment_source_url(url, mime).await {
5837 attachments.push(ChatAttachment::ImageUrl { url: source_url });
5838 }
5839 }
5840
5841 attachments
5842}
5843
5844fn supports_image_attachments(provider_id: &str) -> bool {
5845 matches!(
5846 provider_id,
5847 "openai"
5848 | "openrouter"
5849 | "ollama"
5850 | "groq"
5851 | "mistral"
5852 | "together"
5853 | "azure"
5854 | "bedrock"
5855 | "vertex"
5856 | "copilot"
5857 )
5858}
5859
5860async fn normalize_attachment_source_url(url: &str, mime: &str) -> Option<String> {
5861 let trimmed = url.trim();
5862 if trimmed.is_empty() {
5863 return None;
5864 }
5865 if trimmed.starts_with("http://")
5866 || trimmed.starts_with("https://")
5867 || trimmed.starts_with("data:")
5868 {
5869 return Some(trimmed.to_string());
5870 }
5871
5872 let file_path = trimmed
5873 .strip_prefix("file://")
5874 .map(PathBuf::from)
5875 .unwrap_or_else(|| PathBuf::from(trimmed));
5876 if !file_path.exists() {
5877 return None;
5878 }
5879
5880 let max_bytes = std::env::var("TANDEM_CHANNEL_MAX_ATTACHMENT_BYTES")
5881 .ok()
5882 .and_then(|v| v.parse::<usize>().ok())
5883 .unwrap_or(20 * 1024 * 1024);
5884
5885 let bytes = match tokio::fs::read(&file_path).await {
5886 Ok(bytes) => bytes,
5887 Err(err) => {
5888 tracing::warn!(
5889 "failed reading local attachment '{}': {}",
5890 file_path.to_string_lossy(),
5891 err
5892 );
5893 return None;
5894 }
5895 };
5896 if bytes.len() > max_bytes {
5897 tracing::warn!(
5898 "local attachment '{}' exceeds max bytes ({} > {})",
5899 file_path.to_string_lossy(),
5900 bytes.len(),
5901 max_bytes
5902 );
5903 return None;
5904 }
5905
5906 use base64::Engine as _;
5907 let b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
5908 Some(format!("data:{mime};base64,{b64}"))
5909}
5910
5911struct ToolSideEventContext<'a> {
5912 session_id: &'a str,
5913 message_id: &'a str,
5914 tool: &'a str,
5915 args: &'a serde_json::Value,
5916 metadata: &'a serde_json::Value,
5917 workspace_root: Option<&'a str>,
5918 effective_cwd: Option<&'a str>,
5919}
5920
5921async fn emit_tool_side_events(
5922 storage: std::sync::Arc<Storage>,
5923 bus: &EventBus,
5924 ctx: ToolSideEventContext<'_>,
5925) {
5926 let ToolSideEventContext {
5927 session_id,
5928 message_id,
5929 tool,
5930 args,
5931 metadata,
5932 workspace_root,
5933 effective_cwd,
5934 } = ctx;
5935 if tool == "todo_write" {
5936 let todos_from_metadata = metadata
5937 .get("todos")
5938 .and_then(|v| v.as_array())
5939 .cloned()
5940 .unwrap_or_default();
5941
5942 if !todos_from_metadata.is_empty() {
5943 let _ = storage.set_todos(session_id, todos_from_metadata).await;
5944 } else {
5945 let current = storage.get_todos(session_id).await;
5946 if let Some(updated) = apply_todo_updates_from_args(current, args) {
5947 let _ = storage.set_todos(session_id, updated).await;
5948 }
5949 }
5950
5951 let normalized = storage.get_todos(session_id).await;
5952 bus.publish(EngineEvent::new(
5953 "todo.updated",
5954 json!({
5955 "sessionID": session_id,
5956 "todos": normalized,
5957 "workspaceRoot": workspace_root,
5958 "effectiveCwd": effective_cwd
5959 }),
5960 ));
5961 }
5962 if tool == "question" {
5963 let questions = metadata
5964 .get("questions")
5965 .and_then(|v| v.as_array())
5966 .cloned()
5967 .unwrap_or_default();
5968 if questions.is_empty() {
5969 tracing::warn!(
5970 "question tool produced empty questions payload; skipping question.asked event session_id={} message_id={}",
5971 session_id,
5972 message_id
5973 );
5974 } else {
5975 let request = storage
5976 .add_question_request(session_id, message_id, questions.clone())
5977 .await
5978 .ok();
5979 bus.publish(EngineEvent::new(
5980 "question.asked",
5981 json!({
5982 "id": request
5983 .as_ref()
5984 .map(|req| req.id.clone())
5985 .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
5986 "sessionID": session_id,
5987 "messageID": message_id,
5988 "questions": questions,
5989 "tool": request.and_then(|req| {
5990 req.tool.map(|tool| {
5991 json!({
5992 "callID": tool.call_id,
5993 "messageID": tool.message_id
5994 })
5995 })
5996 }),
5997 "workspaceRoot": workspace_root,
5998 "effectiveCwd": effective_cwd
5999 }),
6000 ));
6001 }
6002 }
6003 if let Some(events) = metadata.get("events").and_then(|v| v.as_array()) {
6004 for event in events {
6005 let Some(event_type) = event.get("type").and_then(|v| v.as_str()) else {
6006 continue;
6007 };
6008 if !event_type.starts_with("agent_team.") {
6009 continue;
6010 }
6011 let mut properties = event
6012 .get("properties")
6013 .and_then(|v| v.as_object())
6014 .cloned()
6015 .unwrap_or_default();
6016 properties
6017 .entry("sessionID".to_string())
6018 .or_insert(json!(session_id));
6019 properties
6020 .entry("messageID".to_string())
6021 .or_insert(json!(message_id));
6022 properties
6023 .entry("workspaceRoot".to_string())
6024 .or_insert(json!(workspace_root));
6025 properties
6026 .entry("effectiveCwd".to_string())
6027 .or_insert(json!(effective_cwd));
6028 bus.publish(EngineEvent::new(event_type, Value::Object(properties)));
6029 }
6030 }
6031}
6032
6033fn apply_todo_updates_from_args(current: Vec<Value>, args: &Value) -> Option<Vec<Value>> {
6034 let obj = args.as_object()?;
6035 let mut todos = current;
6036 let mut changed = false;
6037
6038 if let Some(items) = obj.get("todos").and_then(|v| v.as_array()) {
6039 for item in items {
6040 let Some(item_obj) = item.as_object() else {
6041 continue;
6042 };
6043 let status = item_obj
6044 .get("status")
6045 .and_then(|v| v.as_str())
6046 .map(normalize_todo_status);
6047 let target = item_obj
6048 .get("task_id")
6049 .or_else(|| item_obj.get("todo_id"))
6050 .or_else(|| item_obj.get("id"));
6051
6052 if let (Some(status), Some(target)) = (status, target) {
6053 changed |= apply_single_todo_status_update(&mut todos, target, &status);
6054 }
6055 }
6056 }
6057
6058 let status = obj
6059 .get("status")
6060 .and_then(|v| v.as_str())
6061 .map(normalize_todo_status);
6062 let target = obj
6063 .get("task_id")
6064 .or_else(|| obj.get("todo_id"))
6065 .or_else(|| obj.get("id"));
6066 if let (Some(status), Some(target)) = (status, target) {
6067 changed |= apply_single_todo_status_update(&mut todos, target, &status);
6068 }
6069
6070 if changed {
6071 Some(todos)
6072 } else {
6073 None
6074 }
6075}
6076
6077fn apply_single_todo_status_update(todos: &mut [Value], target: &Value, status: &str) -> bool {
6078 let idx_from_value = match target {
6079 Value::Number(n) => n.as_u64().map(|v| v.saturating_sub(1) as usize),
6080 Value::String(s) => {
6081 let trimmed = s.trim();
6082 trimmed
6083 .parse::<usize>()
6084 .ok()
6085 .map(|v| v.saturating_sub(1))
6086 .or_else(|| {
6087 let digits = trimmed
6088 .chars()
6089 .rev()
6090 .take_while(|c| c.is_ascii_digit())
6091 .collect::<String>()
6092 .chars()
6093 .rev()
6094 .collect::<String>();
6095 digits.parse::<usize>().ok().map(|v| v.saturating_sub(1))
6096 })
6097 }
6098 _ => None,
6099 };
6100
6101 if let Some(idx) = idx_from_value {
6102 if idx < todos.len() {
6103 if let Some(obj) = todos[idx].as_object_mut() {
6104 obj.insert("status".to_string(), Value::String(status.to_string()));
6105 return true;
6106 }
6107 }
6108 }
6109
6110 let id_target = target.as_str().map(|s| s.trim()).filter(|s| !s.is_empty());
6111 if let Some(id_target) = id_target {
6112 for todo in todos.iter_mut() {
6113 if let Some(obj) = todo.as_object_mut() {
6114 if obj.get("id").and_then(|v| v.as_str()) == Some(id_target) {
6115 obj.insert("status".to_string(), Value::String(status.to_string()));
6116 return true;
6117 }
6118 }
6119 }
6120 }
6121
6122 false
6123}
6124
6125fn normalize_todo_status(raw: &str) -> String {
6126 match raw.trim().to_lowercase().as_str() {
6127 "in_progress" | "inprogress" | "running" | "working" => "in_progress".to_string(),
6128 "done" | "complete" | "completed" => "completed".to_string(),
6129 "cancelled" | "canceled" | "aborted" | "skipped" => "cancelled".to_string(),
6130 "open" | "todo" | "pending" => "pending".to_string(),
6131 other => other.to_string(),
6132 }
6133}
6134
6135fn compact_chat_history(
6136 messages: Vec<ChatMessage>,
6137 profile: ChatHistoryProfile,
6138) -> Vec<ChatMessage> {
6139 let (max_context_chars, keep_recent_messages) = match profile {
6140 ChatHistoryProfile::Full => (usize::MAX, usize::MAX),
6141 ChatHistoryProfile::Standard => (80_000usize, 40usize),
6142 ChatHistoryProfile::Compact => (12_000usize, 12usize),
6143 };
6144
6145 if messages.len() <= keep_recent_messages {
6146 let total_chars = messages.iter().map(|m| m.content.len()).sum::<usize>();
6147 if total_chars <= max_context_chars {
6148 return messages;
6149 }
6150 }
6151
6152 let mut kept = messages;
6153 let mut dropped_count = 0usize;
6154 let mut total_chars = kept.iter().map(|m| m.content.len()).sum::<usize>();
6155
6156 while kept.len() > keep_recent_messages || total_chars > max_context_chars {
6157 if kept.is_empty() {
6158 break;
6159 }
6160 let removed = kept.remove(0);
6161 total_chars = total_chars.saturating_sub(removed.content.len());
6162 dropped_count += 1;
6163 }
6164
6165 if dropped_count > 0 {
6166 kept.insert(
6167 0,
6168 ChatMessage {
6169 role: "system".to_string(),
6170 content: format!(
6171 "[history compacted: omitted {} older messages to fit context window]",
6172 dropped_count
6173 ),
6174 attachments: Vec::new(),
6175 },
6176 );
6177 }
6178 kept
6179}
6180
6181#[cfg(test)]
6182mod tests {
6183 use super::*;
6184 use crate::{EventBus, Storage};
6185 use std::sync::{Mutex, OnceLock};
6186 use uuid::Uuid;
6187
6188 fn env_test_lock() -> std::sync::MutexGuard<'static, ()> {
6189 static ENV_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
6190 ENV_TEST_LOCK
6191 .get_or_init(|| Mutex::new(()))
6192 .lock()
6193 .expect("env test lock")
6194 }
6195
6196 #[tokio::test]
6197 async fn todo_updated_event_is_normalized() {
6198 let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
6199 let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
6200 let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
6201 let session_id = session.id.clone();
6202 storage.save_session(session).await.expect("save session");
6203
6204 let bus = EventBus::new();
6205 let mut rx = bus.subscribe();
6206 emit_tool_side_events(
6207 storage.clone(),
6208 &bus,
6209 ToolSideEventContext {
6210 session_id: &session_id,
6211 message_id: "m1",
6212 tool: "todo_write",
6213 args: &json!({"todos":[{"content":"ship parity"}]}),
6214 metadata: &json!({"todos":[{"content":"ship parity"}]}),
6215 workspace_root: Some("."),
6216 effective_cwd: Some("."),
6217 },
6218 )
6219 .await;
6220
6221 let event = rx.recv().await.expect("event");
6222 assert_eq!(event.event_type, "todo.updated");
6223 let todos = event
6224 .properties
6225 .get("todos")
6226 .and_then(|v| v.as_array())
6227 .cloned()
6228 .unwrap_or_default();
6229 assert_eq!(todos.len(), 1);
6230 assert!(todos[0].get("id").and_then(|v| v.as_str()).is_some());
6231 assert_eq!(
6232 todos[0].get("content").and_then(|v| v.as_str()),
6233 Some("ship parity")
6234 );
6235 assert!(todos[0].get("status").and_then(|v| v.as_str()).is_some());
6236 }
6237
6238 #[tokio::test]
6239 async fn question_asked_event_contains_tool_reference() {
6240 let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
6241 let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
6242 let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
6243 let session_id = session.id.clone();
6244 storage.save_session(session).await.expect("save session");
6245
6246 let bus = EventBus::new();
6247 let mut rx = bus.subscribe();
6248 emit_tool_side_events(
6249 storage,
6250 &bus,
6251 ToolSideEventContext {
6252 session_id: &session_id,
6253 message_id: "msg-1",
6254 tool: "question",
6255 args: &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
6256 metadata: &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
6257 workspace_root: Some("."),
6258 effective_cwd: Some("."),
6259 },
6260 )
6261 .await;
6262
6263 let event = rx.recv().await.expect("event");
6264 assert_eq!(event.event_type, "question.asked");
6265 assert_eq!(
6266 event
6267 .properties
6268 .get("sessionID")
6269 .and_then(|v| v.as_str())
6270 .unwrap_or(""),
6271 session_id
6272 );
6273 let tool = event
6274 .properties
6275 .get("tool")
6276 .cloned()
6277 .unwrap_or_else(|| json!({}));
6278 assert!(tool.get("callID").and_then(|v| v.as_str()).is_some());
6279 assert_eq!(
6280 tool.get("messageID").and_then(|v| v.as_str()),
6281 Some("msg-1")
6282 );
6283 }
6284
6285 #[test]
6286 fn compact_chat_history_keeps_recent_and_inserts_summary() {
6287 let mut messages = Vec::new();
6288 for i in 0..60 {
6289 messages.push(ChatMessage {
6290 role: "user".to_string(),
6291 content: format!("message-{i}"),
6292 attachments: Vec::new(),
6293 });
6294 }
6295 let compacted = compact_chat_history(messages, ChatHistoryProfile::Standard);
6296 assert!(compacted.len() <= 41);
6297 assert_eq!(compacted[0].role, "system");
6298 assert!(compacted[0].content.contains("history compacted"));
6299 assert!(compacted.iter().any(|m| m.content.contains("message-59")));
6300 }
6301
6302 #[test]
6303 fn extracts_todos_from_checklist_and_numbered_lines() {
6304 let input = r#"
6305Plan:
6306- [ ] Audit current implementation
6307- [ ] Add planner fallback
63081. Add regression test coverage
6309"#;
6310 let todos = extract_todo_candidates_from_text(input);
6311 assert_eq!(todos.len(), 3);
6312 assert_eq!(
6313 todos[0].get("content").and_then(|v| v.as_str()),
6314 Some("Audit current implementation")
6315 );
6316 }
6317
6318 #[test]
6319 fn does_not_extract_todos_from_plain_prose_lines() {
6320 let input = r#"
6321I need more information to proceed.
6322Can you tell me the event size and budget?
6323Once I have that, I can provide a detailed plan.
6324"#;
6325 let todos = extract_todo_candidates_from_text(input);
6326 assert!(todos.is_empty());
6327 }
6328
6329 #[test]
6330 fn parses_wrapped_tool_call_from_markdown_response() {
6331 let input = r#"
6332Here is the tool call:
6333```json
6334{"tool_call":{"name":"todo_write","arguments":{"todos":[{"content":"a"}]}}}
6335```
6336"#;
6337 let parsed = parse_tool_invocation_from_response(input).expect("tool call");
6338 assert_eq!(parsed.0, "todo_write");
6339 assert!(parsed.1.get("todos").is_some());
6340 }
6341
6342 #[test]
6343 fn parses_top_level_name_args_tool_call() {
6344 let input = r#"{"name":"bash","args":{"command":"echo hi"}}"#;
6345 let parsed = parse_tool_invocation_from_response(input).expect("top-level tool call");
6346 assert_eq!(parsed.0, "bash");
6347 assert_eq!(
6348 parsed.1.get("command").and_then(|v| v.as_str()),
6349 Some("echo hi")
6350 );
6351 }
6352
6353 #[test]
6354 fn parses_function_style_todowrite_call() {
6355 let input = r#"Status: Completed
6356Call: todowrite(task_id=2, status="completed")"#;
6357 let parsed = parse_tool_invocation_from_response(input).expect("function-style tool call");
6358 assert_eq!(parsed.0, "todo_write");
6359 assert_eq!(parsed.1.get("task_id").and_then(|v| v.as_i64()), Some(2));
6360 assert_eq!(
6361 parsed.1.get("status").and_then(|v| v.as_str()),
6362 Some("completed")
6363 );
6364 }
6365
6366 #[test]
6367 fn parses_multiple_function_style_todowrite_calls() {
6368 let input = r#"
6369Call: todowrite(task_id=2, status="completed")
6370Call: todowrite(task_id=3, status="in_progress")
6371"#;
6372 let parsed = parse_tool_invocations_from_response(input);
6373 assert_eq!(parsed.len(), 2);
6374 assert_eq!(parsed[0].0, "todo_write");
6375 assert_eq!(parsed[0].1.get("task_id").and_then(|v| v.as_i64()), Some(2));
6376 assert_eq!(
6377 parsed[0].1.get("status").and_then(|v| v.as_str()),
6378 Some("completed")
6379 );
6380 assert_eq!(parsed[1].1.get("task_id").and_then(|v| v.as_i64()), Some(3));
6381 assert_eq!(
6382 parsed[1].1.get("status").and_then(|v| v.as_str()),
6383 Some("in_progress")
6384 );
6385 }
6386
6387 #[test]
6388 fn applies_todo_status_update_from_task_id_args() {
6389 let current = vec![
6390 json!({"id":"todo-1","content":"a","status":"pending"}),
6391 json!({"id":"todo-2","content":"b","status":"pending"}),
6392 json!({"id":"todo-3","content":"c","status":"pending"}),
6393 ];
6394 let updated =
6395 apply_todo_updates_from_args(current, &json!({"task_id":2, "status":"completed"}))
6396 .expect("status update");
6397 assert_eq!(
6398 updated[1].get("status").and_then(|v| v.as_str()),
6399 Some("completed")
6400 );
6401 }
6402
6403 #[test]
6404 fn normalizes_todo_write_tasks_alias() {
6405 let normalized = normalize_todo_write_args(
6406 json!({"tasks":[{"title":"Book venue"},{"name":"Send invites"}]}),
6407 "",
6408 );
6409 let todos = normalized
6410 .get("todos")
6411 .and_then(|v| v.as_array())
6412 .cloned()
6413 .unwrap_or_default();
6414 assert_eq!(todos.len(), 2);
6415 assert_eq!(
6416 todos[0].get("content").and_then(|v| v.as_str()),
6417 Some("Book venue")
6418 );
6419 assert_eq!(
6420 todos[1].get("content").and_then(|v| v.as_str()),
6421 Some("Send invites")
6422 );
6423 }
6424
6425 #[test]
6426 fn normalizes_todo_write_from_completion_when_args_empty() {
6427 let completion = "Plan:\n1. Secure venue\n2. Create playlist\n3. Send invites";
6428 let normalized = normalize_todo_write_args(json!({}), completion);
6429 let todos = normalized
6430 .get("todos")
6431 .and_then(|v| v.as_array())
6432 .cloned()
6433 .unwrap_or_default();
6434 assert_eq!(todos.len(), 3);
6435 assert!(!is_empty_todo_write_args(&normalized));
6436 }
6437
6438 #[test]
6439 fn empty_todo_write_args_allows_status_updates() {
6440 let args = json!({"task_id": 2, "status":"completed"});
6441 assert!(!is_empty_todo_write_args(&args));
6442 }
6443
6444 #[test]
6445 fn streamed_websearch_args_fallback_to_query_string() {
6446 let parsed = parse_streamed_tool_args("websearch", "meaning of life");
6447 assert_eq!(
6448 parsed.get("query").and_then(|v| v.as_str()),
6449 Some("meaning of life")
6450 );
6451 }
6452
6453 #[test]
6454 fn streamed_websearch_stringified_json_args_are_unwrapped() {
6455 let parsed = parse_streamed_tool_args("websearch", r#""donkey gestation period""#);
6456 assert_eq!(
6457 parsed.get("query").and_then(|v| v.as_str()),
6458 Some("donkey gestation period")
6459 );
6460 }
6461
6462 #[test]
6463 fn streamed_websearch_args_strip_arg_key_value_wrappers() {
6464 let parsed = parse_streamed_tool_args(
6465 "websearch",
6466 "query</arg_key><arg_value>taj card what is it benefits how to apply</arg_value>",
6467 );
6468 assert_eq!(
6469 parsed.get("query").and_then(|v| v.as_str()),
6470 Some("taj card what is it benefits how to apply")
6471 );
6472 }
6473
6474 #[test]
6475 fn normalize_tool_args_websearch_infers_from_user_text() {
6476 let normalized =
6477 normalize_tool_args("websearch", json!({}), "web search meaning of life", "");
6478 assert_eq!(
6479 normalized.args.get("query").and_then(|v| v.as_str()),
6480 Some("meaning of life")
6481 );
6482 assert_eq!(normalized.args_source, "inferred_from_user");
6483 assert_eq!(normalized.args_integrity, "recovered");
6484 }
6485
6486 #[test]
6487 fn normalize_tool_args_websearch_keeps_existing_query() {
6488 let normalized = normalize_tool_args(
6489 "websearch",
6490 json!({"query":"already set"}),
6491 "web search should not override",
6492 "",
6493 );
6494 assert_eq!(
6495 normalized.args.get("query").and_then(|v| v.as_str()),
6496 Some("already set")
6497 );
6498 assert_eq!(normalized.args_source, "provider_json");
6499 assert_eq!(normalized.args_integrity, "ok");
6500 }
6501
6502 #[test]
6503 fn normalize_tool_args_websearch_fails_when_unrecoverable() {
6504 let normalized = normalize_tool_args("websearch", json!({}), "search", "");
6505 assert!(normalized.query.is_none());
6506 assert!(normalized.missing_terminal);
6507 assert_eq!(normalized.args_source, "missing");
6508 assert_eq!(normalized.args_integrity, "empty");
6509 }
6510
6511 #[test]
6512 fn normalize_tool_args_webfetch_infers_url_from_user_prompt() {
6513 let normalized = normalize_tool_args(
6514 "webfetch",
6515 json!({}),
6516 "Please fetch `https://tandem.frumu.ai/docs/` in markdown mode",
6517 "",
6518 );
6519 assert!(!normalized.missing_terminal);
6520 assert_eq!(
6521 normalized.args.get("url").and_then(|v| v.as_str()),
6522 Some("https://tandem.frumu.ai/docs/")
6523 );
6524 assert_eq!(normalized.args_source, "inferred_from_user");
6525 assert_eq!(normalized.args_integrity, "recovered");
6526 }
6527
6528 #[test]
6529 fn normalize_tool_args_webfetch_recovers_nested_url_alias() {
6530 let normalized = normalize_tool_args(
6531 "webfetch",
6532 json!({"args":{"uri":"https://example.com/page"}}),
6533 "",
6534 "",
6535 );
6536 assert!(!normalized.missing_terminal);
6537 assert_eq!(
6538 normalized.args.get("url").and_then(|v| v.as_str()),
6539 Some("https://example.com/page")
6540 );
6541 assert_eq!(normalized.args_source, "provider_json");
6542 }
6543
6544 #[test]
6545 fn normalize_tool_args_webfetch_fails_when_url_unrecoverable() {
6546 let normalized = normalize_tool_args("webfetch", json!({}), "fetch the site", "");
6547 assert!(normalized.missing_terminal);
6548 assert_eq!(
6549 normalized.missing_terminal_reason.as_deref(),
6550 Some("WEBFETCH_URL_MISSING")
6551 );
6552 }
6553
6554 #[test]
6555 fn normalize_tool_args_pack_builder_infers_goal_from_user_prompt() {
6556 let user_text =
6557 "Create a pack that checks latest headline news every day at 8 AM and emails me.";
6558 let normalized = normalize_tool_args("pack_builder", json!({}), user_text, "");
6559 assert!(!normalized.missing_terminal);
6560 assert_eq!(
6561 normalized.args.get("goal").and_then(|v| v.as_str()),
6562 Some(user_text)
6563 );
6564 assert_eq!(
6565 normalized.args.get("mode").and_then(|v| v.as_str()),
6566 Some("preview")
6567 );
6568 assert_eq!(normalized.args_source, "inferred_from_user");
6569 assert_eq!(normalized.args_integrity, "recovered");
6570 }
6571
6572 #[test]
6573 fn normalize_tool_args_pack_builder_keeps_existing_goal_and_mode() {
6574 let normalized = normalize_tool_args(
6575 "pack_builder",
6576 json!({"mode":"apply","goal":"existing goal","plan_id":"plan-1"}),
6577 "new goal should not override",
6578 "",
6579 );
6580 assert!(!normalized.missing_terminal);
6581 assert_eq!(
6582 normalized.args.get("goal").and_then(|v| v.as_str()),
6583 Some("existing goal")
6584 );
6585 assert_eq!(
6586 normalized.args.get("mode").and_then(|v| v.as_str()),
6587 Some("apply")
6588 );
6589 assert_eq!(normalized.args_source, "provider_json");
6590 assert_eq!(normalized.args_integrity, "ok");
6591 }
6592
6593 #[test]
6594 fn normalize_tool_args_pack_builder_confirm_reuses_plan_from_context() {
6595 let assistant_context =
6596 "Pack Builder Preview\n- Plan ID: plan-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
6597 let normalized =
6598 normalize_tool_args("pack_builder", json!({}), "confirm", assistant_context);
6599 assert!(!normalized.missing_terminal);
6600 assert_eq!(
6601 normalized.args.get("mode").and_then(|v| v.as_str()),
6602 Some("apply")
6603 );
6604 assert_eq!(
6605 normalized.args.get("plan_id").and_then(|v| v.as_str()),
6606 Some("plan-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")
6607 );
6608 assert_eq!(
6609 normalized
6610 .args
6611 .get("approve_pack_install")
6612 .and_then(|v| v.as_bool()),
6613 Some(true)
6614 );
6615 assert_eq!(normalized.args_source, "recovered_from_context");
6616 }
6617
6618 #[test]
6619 fn normalize_tool_args_pack_builder_apply_recovers_missing_plan_id() {
6620 let assistant_context =
6621 "{\"mode\":\"preview\",\"plan_id\":\"plan-11111111-2222-3333-4444-555555555555\"}";
6622 let normalized = normalize_tool_args(
6623 "pack_builder",
6624 json!({"mode":"apply"}),
6625 "yes",
6626 assistant_context,
6627 );
6628 assert!(!normalized.missing_terminal);
6629 assert_eq!(
6630 normalized.args.get("mode").and_then(|v| v.as_str()),
6631 Some("apply")
6632 );
6633 assert_eq!(
6634 normalized.args.get("plan_id").and_then(|v| v.as_str()),
6635 Some("plan-11111111-2222-3333-4444-555555555555")
6636 );
6637 }
6638
6639 #[test]
6640 fn normalize_tool_args_pack_builder_short_new_goal_does_not_force_apply() {
6641 let assistant_context =
6642 "Pack Builder Preview\n- Plan ID: plan-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
6643 let normalized = normalize_tool_args(
6644 "pack_builder",
6645 json!({}),
6646 "create jira sync",
6647 assistant_context,
6648 );
6649 assert!(!normalized.missing_terminal);
6650 assert_eq!(
6651 normalized.args.get("mode").and_then(|v| v.as_str()),
6652 Some("preview")
6653 );
6654 assert_eq!(
6655 normalized.args.get("goal").and_then(|v| v.as_str()),
6656 Some("create jira sync")
6657 );
6658 }
6659
6660 #[test]
6661 fn normalize_tool_args_write_requires_path() {
6662 let normalized = normalize_tool_args("write", json!({}), "", "");
6663 assert!(normalized.missing_terminal);
6664 assert_eq!(
6665 normalized.missing_terminal_reason.as_deref(),
6666 Some("FILE_PATH_MISSING")
6667 );
6668 }
6669
6670 #[test]
6671 fn normalize_tool_args_write_recovers_alias_path_key() {
6672 let normalized = normalize_tool_args(
6673 "write",
6674 json!({"filePath":"docs/CONCEPT.md","content":"hello"}),
6675 "",
6676 "",
6677 );
6678 assert!(!normalized.missing_terminal);
6679 assert_eq!(
6680 normalized.args.get("path").and_then(|v| v.as_str()),
6681 Some("docs/CONCEPT.md")
6682 );
6683 assert_eq!(
6684 normalized.args.get("content").and_then(|v| v.as_str()),
6685 Some("hello")
6686 );
6687 }
6688
6689 #[test]
6690 fn normalize_tool_args_write_recovers_html_output_target_path() {
6691 let normalized = normalize_tool_args_with_mode(
6692 "write",
6693 json!({"content":"<html></html>"}),
6694 "Execute task.\n\nRequired output target:\n{\n \"path\": \"game.html\",\n \"kind\": \"source\",\n \"operation\": \"create_or_update\"\n}\n",
6695 "",
6696 WritePathRecoveryMode::OutputTargetOnly,
6697 );
6698 assert!(!normalized.missing_terminal);
6699 assert_eq!(
6700 normalized.args.get("path").and_then(|v| v.as_str()),
6701 Some("game.html")
6702 );
6703 }
6704
6705 #[test]
6706 fn normalize_tool_args_read_infers_path_from_user_prompt() {
6707 let normalized = normalize_tool_args(
6708 "read",
6709 json!({}),
6710 "Please inspect `FEATURE_LIST.md` and summarize key sections.",
6711 "",
6712 );
6713 assert!(!normalized.missing_terminal);
6714 assert_eq!(
6715 normalized.args.get("path").and_then(|v| v.as_str()),
6716 Some("FEATURE_LIST.md")
6717 );
6718 assert_eq!(normalized.args_source, "inferred_from_user");
6719 assert_eq!(normalized.args_integrity, "recovered");
6720 }
6721
6722 #[test]
6723 fn normalize_tool_args_read_does_not_infer_path_from_assistant_context() {
6724 let normalized = normalize_tool_args(
6725 "read",
6726 json!({}),
6727 "generic instruction",
6728 "I will read src-tauri/src/orchestrator/engine.rs first.",
6729 );
6730 assert!(normalized.missing_terminal);
6731 assert_eq!(
6732 normalized.missing_terminal_reason.as_deref(),
6733 Some("FILE_PATH_MISSING")
6734 );
6735 }
6736
6737 #[test]
6738 fn normalize_tool_args_write_recovers_path_from_nested_array_payload() {
6739 let normalized = normalize_tool_args(
6740 "write",
6741 json!({"args":[{"file_path":"docs/CONCEPT.md"}],"content":"hello"}),
6742 "",
6743 "",
6744 );
6745 assert!(!normalized.missing_terminal);
6746 assert_eq!(
6747 normalized.args.get("path").and_then(|v| v.as_str()),
6748 Some("docs/CONCEPT.md")
6749 );
6750 }
6751
6752 #[test]
6753 fn normalize_tool_args_write_recovers_content_alias() {
6754 let normalized = normalize_tool_args(
6755 "write",
6756 json!({"path":"docs/FEATURES.md","body":"feature notes"}),
6757 "",
6758 "",
6759 );
6760 assert!(!normalized.missing_terminal);
6761 assert_eq!(
6762 normalized.args.get("content").and_then(|v| v.as_str()),
6763 Some("feature notes")
6764 );
6765 }
6766
6767 #[test]
6768 fn normalize_tool_args_write_fails_when_content_missing() {
6769 let normalized = normalize_tool_args("write", json!({"path":"docs/FEATURES.md"}), "", "");
6770 assert!(normalized.missing_terminal);
6771 assert_eq!(
6772 normalized.missing_terminal_reason.as_deref(),
6773 Some("WRITE_CONTENT_MISSING")
6774 );
6775 }
6776
6777 #[test]
6778 fn normalize_tool_args_write_output_target_only_rejects_freeform_guess() {
6779 let normalized = normalize_tool_args_with_mode(
6780 "write",
6781 json!({}),
6782 "Please implement the screen/state structure in the workspace.",
6783 "",
6784 WritePathRecoveryMode::OutputTargetOnly,
6785 );
6786 assert!(normalized.missing_terminal);
6787 assert_eq!(
6788 normalized.missing_terminal_reason.as_deref(),
6789 Some("FILE_PATH_MISSING")
6790 );
6791 }
6792
6793 #[test]
6794 fn normalize_tool_args_write_recovers_content_from_assistant_context() {
6795 let normalized = normalize_tool_args(
6796 "write",
6797 json!({"path":"docs/FEATURES.md"}),
6798 "",
6799 "## Features\n\n- Neon arcade gameplay\n- Single-file HTML structure\n",
6800 );
6801 assert!(!normalized.missing_terminal);
6802 assert_eq!(
6803 normalized.args.get("path").and_then(|v| v.as_str()),
6804 Some("docs/FEATURES.md")
6805 );
6806 assert_eq!(
6807 normalized.args.get("content").and_then(|v| v.as_str()),
6808 Some("## Features\n\n- Neon arcade gameplay\n- Single-file HTML structure")
6809 );
6810 assert_eq!(normalized.args_source, "recovered_from_context");
6811 assert_eq!(normalized.args_integrity, "recovered");
6812 }
6813
6814 #[test]
6815 fn normalize_tool_args_write_recovers_raw_nested_string_content() {
6816 let normalized = normalize_tool_args(
6817 "write",
6818 json!({"path":"docs/FEATURES.md","args":"Line 1\nLine 2"}),
6819 "",
6820 "",
6821 );
6822 assert!(!normalized.missing_terminal);
6823 assert_eq!(
6824 normalized.args.get("path").and_then(|v| v.as_str()),
6825 Some("docs/FEATURES.md")
6826 );
6827 assert_eq!(
6828 normalized.args.get("content").and_then(|v| v.as_str()),
6829 Some("Line 1\nLine 2")
6830 );
6831 }
6832
6833 #[test]
6834 fn normalize_tool_args_write_does_not_treat_path_as_content() {
6835 let normalized = normalize_tool_args("write", json!("docs/FEATURES.md"), "", "");
6836 assert!(normalized.missing_terminal);
6837 assert_eq!(
6838 normalized.missing_terminal_reason.as_deref(),
6839 Some("WRITE_CONTENT_MISSING")
6840 );
6841 }
6842
6843 #[test]
6844 fn normalize_tool_args_gmail_send_email_omits_empty_attachment() {
6845 let normalized = normalize_tool_args(
6846 "mcp.composio_1.gmail_send_email",
6847 json!({
6848 "to": "evan@example.com",
6849 "subject": "Test",
6850 "body": "Hello",
6851 "attachment": {
6852 "s3key": ""
6853 }
6854 }),
6855 "",
6856 "",
6857 );
6858 assert!(normalized.args.get("attachment").is_none());
6859 assert_eq!(normalized.args_source, "sanitized_attachment");
6860 }
6861
6862 #[test]
6863 fn normalize_tool_args_gmail_send_email_keeps_valid_attachment() {
6864 let normalized = normalize_tool_args(
6865 "mcp.composio_1.gmail_send_email",
6866 json!({
6867 "to": "evan@example.com",
6868 "subject": "Test",
6869 "body": "Hello",
6870 "attachment": {
6871 "s3key": "file_123"
6872 }
6873 }),
6874 "",
6875 "",
6876 );
6877 assert_eq!(
6878 normalized
6879 .args
6880 .get("attachment")
6881 .and_then(|value| value.get("s3key"))
6882 .and_then(|value| value.as_str()),
6883 Some("file_123")
6884 );
6885 }
6886
6887 #[test]
6888 fn classify_required_tool_failure_detects_empty_provider_write_args() {
6889 let reason = classify_required_tool_failure(
6890 &[String::from("WRITE_ARGS_EMPTY_FROM_PROVIDER")],
6891 true,
6892 1,
6893 false,
6894 false,
6895 );
6896 assert_eq!(reason, RequiredToolFailureKind::WriteArgsEmptyFromProvider);
6897 }
6898
6899 #[test]
6900 fn normalize_tool_args_read_infers_path_from_bold_markdown() {
6901 let normalized = normalize_tool_args(
6902 "read",
6903 json!({}),
6904 "Please read **FEATURE_LIST.md** and summarize.",
6905 "",
6906 );
6907 assert!(!normalized.missing_terminal);
6908 assert_eq!(
6909 normalized.args.get("path").and_then(|v| v.as_str()),
6910 Some("FEATURE_LIST.md")
6911 );
6912 }
6913
6914 #[test]
6915 fn normalize_tool_args_shell_infers_command_from_user_prompt() {
6916 let normalized = normalize_tool_args("bash", json!({}), "Run `rg -n \"TODO\" .`", "");
6917 assert!(!normalized.missing_terminal);
6918 assert_eq!(
6919 normalized.args.get("command").and_then(|v| v.as_str()),
6920 Some("rg -n \"TODO\" .")
6921 );
6922 assert_eq!(normalized.args_source, "inferred_from_user");
6923 assert_eq!(normalized.args_integrity, "recovered");
6924 }
6925
6926 #[test]
6927 fn normalize_tool_args_read_rejects_root_only_path() {
6928 let normalized = normalize_tool_args("read", json!({"path":"/"}), "", "");
6929 assert!(normalized.missing_terminal);
6930 assert_eq!(
6931 normalized.missing_terminal_reason.as_deref(),
6932 Some("FILE_PATH_MISSING")
6933 );
6934 }
6935
6936 #[test]
6937 fn normalize_tool_args_read_recovers_when_provider_path_is_root_only() {
6938 let normalized =
6939 normalize_tool_args("read", json!({"path":"/"}), "Please open `CONCEPT.md`", "");
6940 assert!(!normalized.missing_terminal);
6941 assert_eq!(
6942 normalized.args.get("path").and_then(|v| v.as_str()),
6943 Some("CONCEPT.md")
6944 );
6945 assert_eq!(normalized.args_source, "inferred_from_user");
6946 assert_eq!(normalized.args_integrity, "recovered");
6947 }
6948
6949 #[test]
6950 fn normalize_tool_args_read_rejects_tool_call_markup_path() {
6951 let normalized = normalize_tool_args(
6952 "read",
6953 json!({
6954 "path":"<tool_call>\n<function=glob>\n<parameter=pattern>**/*</parameter>\n</function>\n</tool_call>"
6955 }),
6956 "",
6957 "",
6958 );
6959 assert!(normalized.missing_terminal);
6960 assert_eq!(
6961 normalized.missing_terminal_reason.as_deref(),
6962 Some("FILE_PATH_MISSING")
6963 );
6964 }
6965
6966 #[test]
6967 fn normalize_tool_args_read_rejects_glob_pattern_path() {
6968 let normalized = normalize_tool_args("read", json!({"path":"**/*"}), "", "");
6969 assert!(normalized.missing_terminal);
6970 assert_eq!(
6971 normalized.missing_terminal_reason.as_deref(),
6972 Some("FILE_PATH_MISSING")
6973 );
6974 }
6975
6976 #[test]
6977 fn normalize_tool_args_read_rejects_placeholder_path() {
6978 let normalized = normalize_tool_args("read", json!({"path":"files/directories"}), "", "");
6979 assert!(normalized.missing_terminal);
6980 assert_eq!(
6981 normalized.missing_terminal_reason.as_deref(),
6982 Some("FILE_PATH_MISSING")
6983 );
6984 }
6985
6986 #[test]
6987 fn normalize_tool_args_read_rejects_tool_policy_placeholder_path() {
6988 let normalized = normalize_tool_args("read", json!({"path":"tool/policy"}), "", "");
6989 assert!(normalized.missing_terminal);
6990 assert_eq!(
6991 normalized.missing_terminal_reason.as_deref(),
6992 Some("FILE_PATH_MISSING")
6993 );
6994 }
6995
6996 #[test]
6997 fn normalize_tool_args_read_recovers_pdf_path_from_user_text() {
6998 let normalized = normalize_tool_args(
6999 "read",
7000 json!({"path":"tool/policy"}),
7001 "Read `T1011U kitöltési útmutató.pdf` and summarize.",
7002 "",
7003 );
7004 assert!(!normalized.missing_terminal);
7005 assert_eq!(
7006 normalized.args.get("path").and_then(|v| v.as_str()),
7007 Some("T1011U kitöltési útmutató.pdf")
7008 );
7009 assert_eq!(normalized.args_source, "inferred_from_user");
7010 assert_eq!(normalized.args_integrity, "recovered");
7011 }
7012
7013 #[test]
7014 fn normalize_tool_name_strips_default_api_namespace() {
7015 assert_eq!(normalize_tool_name("default_api:read"), "read");
7016 assert_eq!(normalize_tool_name("functions.shell"), "bash");
7017 }
7018
7019 #[test]
7020 fn mcp_server_from_tool_name_parses_server_segment() {
7021 assert_eq!(
7022 mcp_server_from_tool_name("mcp.arcade.jira_getboards"),
7023 Some("arcade")
7024 );
7025 assert_eq!(mcp_server_from_tool_name("read"), None);
7026 assert_eq!(mcp_server_from_tool_name("mcp"), None);
7027 }
7028
7029 #[test]
7030 fn batch_helpers_use_name_when_tool_is_wrapper() {
7031 let args = json!({
7032 "tool_calls":[
7033 {"tool":"default_api","name":"read","args":{"path":"CONCEPT.md"}},
7034 {"tool":"default_api:glob","args":{"pattern":"*.md"}}
7035 ]
7036 });
7037 let calls = extract_batch_calls(&args);
7038 assert_eq!(calls.len(), 2);
7039 assert_eq!(calls[0].0, "read");
7040 assert_eq!(calls[1].0, "glob");
7041 assert!(is_read_only_batch_call(&args));
7042 let sig = batch_tool_signature(&args).unwrap_or_default();
7043 assert!(sig.contains("read:"));
7044 assert!(sig.contains("glob:"));
7045 }
7046
7047 #[test]
7048 fn batch_helpers_resolve_nested_function_name() {
7049 let args = json!({
7050 "tool_calls":[
7051 {"tool":"default_api","function":{"name":"read"},"args":{"path":"CONCEPT.md"}}
7052 ]
7053 });
7054 let calls = extract_batch_calls(&args);
7055 assert_eq!(calls.len(), 1);
7056 assert_eq!(calls[0].0, "read");
7057 assert!(is_read_only_batch_call(&args));
7058 }
7059
7060 #[test]
7061 fn batch_output_classifier_detects_non_productive_unknown_results() {
7062 let output = r#"
7063[
7064 {"tool":"default_api","output":"Unknown tool: default_api","metadata":{}},
7065 {"tool":"default_api","output":"Unknown tool: default_api","metadata":{}}
7066]
7067"#;
7068 assert!(is_non_productive_batch_output(output));
7069 }
7070
7071 #[test]
7072 fn runtime_prompt_includes_execution_environment_block() {
7073 let prompt = tandem_runtime_system_prompt(
7074 &HostRuntimeContext {
7075 os: HostOs::Windows,
7076 arch: "x86_64".to_string(),
7077 shell_family: ShellFamily::Powershell,
7078 path_style: PathStyle::Windows,
7079 },
7080 &[],
7081 );
7082 assert!(prompt.contains("[Execution Environment]"));
7083 assert!(prompt.contains("Host OS: windows"));
7084 assert!(prompt.contains("Shell: powershell"));
7085 assert!(prompt.contains("Path style: windows"));
7086 }
7087
7088 #[test]
7089 fn runtime_prompt_includes_connected_integrations_block() {
7090 let prompt = tandem_runtime_system_prompt(
7091 &HostRuntimeContext {
7092 os: HostOs::Linux,
7093 arch: "x86_64".to_string(),
7094 shell_family: ShellFamily::Posix,
7095 path_style: PathStyle::Posix,
7096 },
7097 &["notion".to_string(), "github".to_string()],
7098 );
7099 assert!(prompt.contains("[Connected Integrations]"));
7100 assert!(prompt.contains("- notion"));
7101 assert!(prompt.contains("- github"));
7102 }
7103
7104 #[test]
7105 fn detects_web_research_prompt_keywords() {
7106 assert!(requires_web_research_prompt(
7107 "research todays top news stories and include links"
7108 ));
7109 assert!(!requires_web_research_prompt(
7110 "say hello and summarize this text"
7111 ));
7112 }
7113
7114 #[test]
7115 fn detects_email_delivery_prompt_keywords() {
7116 assert!(requires_email_delivery_prompt(
7117 "send a full report with links to evan@example.com"
7118 ));
7119 assert!(!requires_email_delivery_prompt("draft a summary for later"));
7120 }
7121
7122 #[test]
7123 fn completion_claim_detector_flags_sent_language() {
7124 assert!(completion_claims_email_sent(
7125 "Email Status: Sent to evan@example.com."
7126 ));
7127 assert!(!completion_claims_email_sent(
7128 "I could not send email in this run."
7129 ));
7130 }
7131
7132 #[test]
7133 fn email_tool_detector_finds_mcp_gmail_tools() {
7134 let schemas = vec![
7135 ToolSchema {
7136 name: "read".to_string(),
7137 description: String::new(),
7138 input_schema: json!({}),
7139 },
7140 ToolSchema {
7141 name: "mcp.composio.gmail_send_email".to_string(),
7142 description: String::new(),
7143 input_schema: json!({}),
7144 },
7145 ];
7146 assert!(has_email_action_tools(&schemas));
7147 }
7148
7149 #[test]
7150 fn extract_mcp_auth_required_metadata_parses_expected_shape() {
7151 let metadata = json!({
7152 "server": "arcade",
7153 "mcpAuth": {
7154 "required": true,
7155 "challengeId": "abc123",
7156 "authorizationUrl": "https://example.com/oauth",
7157 "message": "Authorize first",
7158 "pending": true,
7159 "blocked": true,
7160 "retryAfterMs": 8000
7161 }
7162 });
7163 let parsed = extract_mcp_auth_required_metadata(&metadata).expect("expected metadata");
7164 assert_eq!(parsed.challenge_id, "abc123");
7165 assert_eq!(parsed.authorization_url, "https://example.com/oauth");
7166 assert_eq!(parsed.message, "Authorize first");
7167 assert_eq!(parsed.server.as_deref(), Some("arcade"));
7168 assert!(parsed.pending);
7169 assert!(parsed.blocked);
7170 assert_eq!(parsed.retry_after_ms, Some(8000));
7171 }
7172
7173 #[test]
7174 fn auth_required_output_detector_matches_auth_text() {
7175 assert!(is_auth_required_tool_output(
7176 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com"
7177 ));
7178 assert!(is_auth_required_tool_output(
7179 "Authorization pending for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com\nRetry after 8s."
7180 ));
7181 assert!(!is_auth_required_tool_output("Tool `read` result: ok"));
7182 }
7183
7184 #[test]
7185 fn productive_tool_output_detector_rejects_missing_terminal_write_errors() {
7186 assert!(!is_productive_tool_output("write", "WRITE_CONTENT_MISSING"));
7187 assert!(!is_productive_tool_output("write", "FILE_PATH_MISSING"));
7188 assert!(!is_productive_tool_output(
7189 "write",
7190 "Tool `write` result:\nWRITE_CONTENT_MISSING"
7191 ));
7192 assert!(!is_productive_tool_output(
7193 "edit",
7194 "Tool `edit` result:\nFILE_PATH_MISSING"
7195 ));
7196 assert!(!is_productive_tool_output(
7197 "write",
7198 "Tool `write` result:\ninvalid_function_parameters"
7199 ));
7200 }
7201
7202 #[test]
7203 fn productive_tool_output_detector_accepts_real_tool_results() {
7204 assert!(is_productive_tool_output(
7205 "write",
7206 "Tool `write` result:\nWrote /tmp/probe.html"
7207 ));
7208 assert!(!is_productive_tool_output(
7209 "write",
7210 "Authorization required for `write`.\nAuthorize here: https://example.com"
7211 ));
7212 }
7213
7214 #[test]
7215 fn guard_budget_output_detector_matches_expected_text() {
7216 assert!(is_guard_budget_tool_output(
7217 "Tool `mcp.arcade.gmail_sendemail` call skipped: per-run guard budget exceeded (10)."
7218 ));
7219 assert!(!is_guard_budget_tool_output("Tool `read` result: ok"));
7220 }
7221
7222 #[test]
7223 fn summarize_guard_budget_outputs_returns_run_scoped_message() {
7224 let outputs = vec![
7225 "Tool `mcp.arcade.gmail_sendemail` call skipped: per-run guard budget exceeded (10)."
7226 .to_string(),
7227 "Tool `mcp.arcade.jira_getboards` call skipped: per-run guard budget exceeded (10)."
7228 .to_string(),
7229 ];
7230 let summary = summarize_guard_budget_outputs(&outputs).expect("expected summary");
7231 assert!(summary.contains("per-run tool guard budget"));
7232 assert!(summary.contains("fresh run"));
7233 }
7234
7235 #[test]
7236 fn duplicate_signature_output_detector_matches_expected_text() {
7237 assert!(is_duplicate_signature_limit_output(
7238 "Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
7239 ));
7240 assert!(!is_duplicate_signature_limit_output(
7241 "Tool `read` result: ok"
7242 ));
7243 }
7244
7245 #[test]
7246 fn summarize_duplicate_signature_outputs_returns_run_scoped_message() {
7247 let outputs = vec![
7248 "Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
7249 .to_string(),
7250 "Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
7251 .to_string(),
7252 ];
7253 let summary =
7254 summarize_duplicate_signature_outputs(&outputs).expect("expected duplicate summary");
7255 assert!(summary.contains("same tool call kept repeating"));
7256 assert!(summary.contains("clearer command target"));
7257 }
7258
7259 #[test]
7260 fn required_tool_mode_unsatisfied_completion_includes_marker() {
7261 let message =
7262 required_tool_mode_unsatisfied_completion(RequiredToolFailureKind::NoToolCallEmitted);
7263 assert!(message.contains(REQUIRED_TOOL_MODE_UNSATISFIED_REASON));
7264 assert!(message.contains("NO_TOOL_CALL_EMITTED"));
7265 assert!(message.contains("tool_mode=required"));
7266 }
7267
7268 #[test]
7269 fn required_tool_retry_context_mentions_offered_tools() {
7270 let prompt = build_required_tool_retry_context(
7271 "read, write, apply_patch",
7272 RequiredToolFailureKind::ToolCallInvalidArgs,
7273 );
7274 assert!(prompt.contains("Tool access is mandatory"));
7275 assert!(prompt.contains("TOOL_CALL_INVALID_ARGS"));
7276 assert!(prompt.contains("full `content`"));
7277 assert!(prompt.contains("write, edit, or apply_patch"));
7278 }
7279
7280 #[test]
7281 fn required_tool_retry_context_requires_write_after_read_only_pass() {
7282 let prompt = build_required_tool_retry_context(
7283 "glob, read, write, edit, apply_patch",
7284 RequiredToolFailureKind::WriteRequiredNotSatisfied,
7285 );
7286 assert!(prompt.contains("WRITE_REQUIRED_NOT_SATISFIED"));
7287 assert!(prompt.contains("Inspection is complete"));
7288 assert!(prompt.contains("write, edit, or apply_patch"));
7289 }
7290
7291 #[test]
7292 fn classify_required_tool_failure_detects_invalid_args() {
7293 let reason = classify_required_tool_failure(
7294 &[String::from("WRITE_CONTENT_MISSING")],
7295 true,
7296 1,
7297 false,
7298 false,
7299 );
7300 assert_eq!(reason, RequiredToolFailureKind::ToolCallInvalidArgs);
7301 }
7302
7303 #[test]
7304 fn looks_like_unparsed_tool_payload_detects_tool_call_json() {
7305 assert!(looks_like_unparsed_tool_payload(
7306 r#"{"content":[{"type":"tool_call","name":"write"}]}"#
7307 ));
7308 assert!(!looks_like_unparsed_tool_payload("Updated README.md"));
7309 }
7310
7311 #[test]
7312 fn workspace_write_tool_detection_is_limited_to_mutations() {
7313 assert!(is_workspace_write_tool("write"));
7314 assert!(is_workspace_write_tool("edit"));
7315 assert!(is_workspace_write_tool("apply_patch"));
7316 assert!(!is_workspace_write_tool("read"));
7317 assert!(!is_workspace_write_tool("glob"));
7318 }
7319
7320 #[test]
7321 fn infer_required_output_target_path_reads_prompt_json_block() {
7322 let prompt = r#"Execute task.
7323
7324Required output target:
7325{
7326 "path": "src/game.html",
7327 "kind": "source",
7328 "operation": "create"
7329}
7330"#;
7331 assert_eq!(
7332 infer_required_output_target_path_from_text(prompt).as_deref(),
7333 Some("src/game.html")
7334 );
7335 }
7336
7337 #[test]
7338 fn infer_required_output_target_path_accepts_extensionless_target() {
7339 let prompt = r#"Execute task.
7340
7341Required output target:
7342{
7343 "path": "Dockerfile",
7344 "kind": "source",
7345 "operation": "create"
7346}
7347"#;
7348 assert_eq!(
7349 infer_required_output_target_path_from_text(prompt).as_deref(),
7350 Some("Dockerfile")
7351 );
7352 }
7353
7354 #[test]
7355 fn infer_write_file_path_from_text_rejects_workspace_root() {
7356 let prompt = "Workspace: /home/evan/game\nCreate the scaffold in the workspace now.";
7357 assert_eq!(infer_write_file_path_from_text(prompt), None);
7358 }
7359
7360 #[test]
7361 fn duplicate_signature_limit_defaults_to_200_for_all_tools() {
7362 let _guard = env_test_lock();
7363 unsafe {
7364 std::env::remove_var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT");
7365 }
7366 assert_eq!(duplicate_signature_limit_for("pack_builder"), 200);
7367 assert_eq!(duplicate_signature_limit_for("bash"), 200);
7368 assert_eq!(duplicate_signature_limit_for("write"), 200);
7369 }
7370
7371 #[test]
7372 fn parse_streamed_tool_args_preserves_unparseable_write_payload() {
7373 let parsed = parse_streamed_tool_args("write", "path=game.html content");
7374 assert_ne!(parsed, json!({}));
7375 }
7376
7377 #[test]
7378 fn parse_streamed_tool_args_preserves_large_write_payload() {
7379 let content = "x".repeat(4096);
7380 let raw_args = format!(r#"{{"path":"game.html","content":"{}"}}"#, content);
7381 let parsed = parse_streamed_tool_args("write", &raw_args);
7382 assert_eq!(
7383 parsed.get("path").and_then(|value| value.as_str()),
7384 Some("game.html")
7385 );
7386 assert_eq!(
7387 parsed.get("content").and_then(|value| value.as_str()),
7388 Some(content.as_str())
7389 );
7390 }
7391
7392 #[test]
7393 fn parse_streamed_tool_args_recovers_truncated_write_json() {
7394 let raw_args = concat!(
7395 r#"{"path":"game.html","allow_empty":false,"content":"<!DOCTYPE html>\n"#,
7396 r#"<html lang=\"en\"><body>Neon Drift"#
7397 );
7398 let parsed = parse_streamed_tool_args("write", raw_args);
7399 assert_eq!(
7400 parsed,
7401 json!({
7402 "path": "game.html",
7403 "content": "<!DOCTYPE html>\n<html lang=\"en\"><body>Neon Drift"
7404 })
7405 );
7406 }
7407
7408 #[test]
7409 fn parse_streamed_tool_args_recovers_truncated_write_json_without_path() {
7410 let raw_args = concat!(
7411 r#"{"allow_empty":false,"content":"<!DOCTYPE html>\n"#,
7412 r#"<html lang=\"en\"><body>Neon Drift"#
7413 );
7414 let parsed = parse_streamed_tool_args("write", raw_args);
7415 assert_eq!(parsed.get("path"), None);
7416 assert_eq!(
7417 parsed.get("content").and_then(|value| value.as_str()),
7418 Some("<!DOCTYPE html>\n<html lang=\"en\"><body>Neon Drift")
7419 );
7420 }
7421
7422 #[test]
7423 fn duplicate_signature_limit_env_override_respects_minimum_floor() {
7424 let _guard = env_test_lock();
7425 unsafe {
7426 std::env::set_var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT", "9");
7427 }
7428 assert_eq!(duplicate_signature_limit_for("write"), 200);
7429 assert_eq!(duplicate_signature_limit_for("bash"), 200);
7430 unsafe {
7431 std::env::set_var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT", "250");
7432 }
7433 assert_eq!(duplicate_signature_limit_for("bash"), 250);
7434 unsafe {
7435 std::env::remove_var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT");
7436 }
7437 }
7438
7439 #[test]
7440 fn websearch_duplicate_signature_limit_is_unset_by_default() {
7441 let _guard = env_test_lock();
7442 unsafe {
7443 std::env::remove_var("TANDEM_WEBSEARCH_DUPLICATE_SIGNATURE_LIMIT");
7444 }
7445 assert_eq!(websearch_duplicate_signature_limit(), None);
7446 }
7447
7448 #[test]
7449 fn websearch_duplicate_signature_limit_reads_env() {
7450 let _guard = env_test_lock();
7451 unsafe {
7452 std::env::set_var("TANDEM_WEBSEARCH_DUPLICATE_SIGNATURE_LIMIT", "5");
7453 }
7454 assert_eq!(websearch_duplicate_signature_limit(), Some(200));
7455 unsafe {
7456 std::env::set_var("TANDEM_WEBSEARCH_DUPLICATE_SIGNATURE_LIMIT", "300");
7457 }
7458 assert_eq!(websearch_duplicate_signature_limit(), Some(300));
7459 unsafe {
7460 std::env::remove_var("TANDEM_WEBSEARCH_DUPLICATE_SIGNATURE_LIMIT");
7461 }
7462 }
7463
7464 #[test]
7465 fn summarize_auth_pending_outputs_returns_summary_when_all_are_auth_related() {
7466 let outputs = vec![
7467 "Authorization pending for `mcp.arcade.gmail_sendemail`.\nAuthorize here: https://example.com/a".to_string(),
7468 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com/b".to_string(),
7469 ];
7470 let summary = summarize_auth_pending_outputs(&outputs).expect("summary expected");
7471 assert!(summary.contains("Authorization is required before I can continue"));
7472 assert!(summary.contains("gmail_sendemail"));
7473 assert!(summary.contains("gmail_whoami"));
7474 }
7475
7476 #[test]
7477 fn summarize_auth_pending_outputs_returns_none_for_mixed_outputs() {
7478 let outputs = vec![
7479 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com".to_string(),
7480 "Tool `read` result:\nok".to_string(),
7481 ];
7482 assert!(summarize_auth_pending_outputs(&outputs).is_none());
7483 }
7484
7485 #[test]
7486 fn parse_budget_override_zero_disables_budget() {
7487 unsafe {
7488 std::env::set_var("TANDEM_TOOL_BUDGET_DEFAULT", "0");
7489 }
7490 assert_eq!(
7491 parse_budget_override("TANDEM_TOOL_BUDGET_DEFAULT"),
7492 Some(usize::MAX)
7493 );
7494 unsafe {
7495 std::env::remove_var("TANDEM_TOOL_BUDGET_DEFAULT");
7496 }
7497 }
7498
7499 #[test]
7500 fn disable_tool_guard_budgets_env_overrides_all_budgets() {
7501 unsafe {
7502 std::env::set_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS", "1");
7503 }
7504 assert_eq!(tool_budget_for("mcp.arcade.gmail_sendemail"), usize::MAX);
7505 assert_eq!(tool_budget_for("websearch"), usize::MAX);
7506 unsafe {
7507 std::env::remove_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS");
7508 }
7509 }
7510
7511 #[test]
7512 fn tool_budget_defaults_to_200_calls() {
7513 let _guard = env_test_lock();
7514 unsafe {
7515 std::env::remove_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS");
7516 std::env::remove_var("TANDEM_TOOL_BUDGET_DEFAULT");
7517 std::env::remove_var("TANDEM_TOOL_BUDGET_WEBSEARCH");
7518 std::env::remove_var("TANDEM_TOOL_BUDGET_READ");
7519 }
7520 assert_eq!(tool_budget_for("bash"), 200);
7521 assert_eq!(tool_budget_for("websearch"), 200);
7522 assert_eq!(tool_budget_for("read"), 200);
7523 }
7524
7525 #[test]
7526 fn tool_budget_env_override_respects_minimum_floor() {
7527 let _guard = env_test_lock();
7528 unsafe {
7529 std::env::remove_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS");
7530 std::env::set_var("TANDEM_TOOL_BUDGET_DEFAULT", "17");
7531 std::env::set_var("TANDEM_TOOL_BUDGET_WEBSEARCH", "250");
7532 }
7533 assert_eq!(tool_budget_for("bash"), 200);
7534 assert_eq!(tool_budget_for("websearch"), 250);
7535 unsafe {
7536 std::env::remove_var("TANDEM_TOOL_BUDGET_DEFAULT");
7537 std::env::remove_var("TANDEM_TOOL_BUDGET_WEBSEARCH");
7538 }
7539 }
7540}