1use chrono::Utc;
2use futures::future::BoxFuture;
3use futures::StreamExt;
4use serde_json::{json, Map, Number, Value};
5use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
6use std::hash::{Hash, Hasher};
7use std::path::{Path, PathBuf};
8use std::time::Duration;
9use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
10use tandem_providers::{ChatAttachment, ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
11use tandem_tools::{validate_tool_schemas, ToolRegistry};
12use tandem_types::{
13 ContextMode, EngineEvent, HostOs, HostRuntimeContext, Message, MessagePart, MessagePartInput,
14 MessageRole, ModelSpec, PathStyle, SendMessageRequest, ShellFamily, ToolMode,
15};
16use tandem_wire::WireMessagePart;
17use tokio_util::sync::CancellationToken;
18use tracing::Level;
19
20use crate::tool_router::{
21 classify_intent, default_mode_name, is_short_simple_prompt, select_tool_subset,
22 should_escalate_auto_tools, tool_router_enabled, ToolIntent, ToolRoutingDecision,
23};
24use crate::{
25 any_policy_matches, derive_session_title_from_prompt, title_needs_repair,
26 tool_name_matches_policy, AgentDefinition, AgentRegistry, CancellationRegistry, EventBus,
27 PermissionAction, PermissionManager, PluginRegistry, Storage,
28};
29use tokio::sync::RwLock;
30
31#[derive(Default)]
32struct StreamedToolCall {
33 name: String,
34 args: String,
35}
36
37#[derive(Debug, Clone)]
38pub struct SpawnAgentToolContext {
39 pub session_id: String,
40 pub message_id: String,
41 pub tool_call_id: Option<String>,
42 pub args: Value,
43}
44
45#[derive(Debug, Clone)]
46pub struct SpawnAgentToolResult {
47 pub output: String,
48 pub metadata: Value,
49}
50
51#[derive(Debug, Clone)]
52pub struct ToolPolicyContext {
53 pub session_id: String,
54 pub message_id: String,
55 pub tool: String,
56 pub args: Value,
57}
58
59#[derive(Debug, Clone)]
60pub struct ToolPolicyDecision {
61 pub allowed: bool,
62 pub reason: Option<String>,
63}
64
65pub trait SpawnAgentHook: Send + Sync {
66 fn spawn_agent(
67 &self,
68 ctx: SpawnAgentToolContext,
69 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>>;
70}
71
72pub trait ToolPolicyHook: Send + Sync {
73 fn evaluate_tool(
74 &self,
75 ctx: ToolPolicyContext,
76 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>>;
77}
78
79#[derive(Debug, Clone)]
80pub struct PromptContextHookContext {
81 pub session_id: String,
82 pub message_id: String,
83 pub provider_id: String,
84 pub model_id: String,
85 pub iteration: usize,
86}
87
88pub trait PromptContextHook: Send + Sync {
89 fn augment_provider_messages(
90 &self,
91 ctx: PromptContextHookContext,
92 messages: Vec<ChatMessage>,
93 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>>;
94}
95
96#[derive(Clone)]
97pub struct EngineLoop {
98 storage: std::sync::Arc<Storage>,
99 event_bus: EventBus,
100 providers: ProviderRegistry,
101 plugins: PluginRegistry,
102 agents: AgentRegistry,
103 permissions: PermissionManager,
104 tools: ToolRegistry,
105 cancellations: CancellationRegistry,
106 host_runtime_context: HostRuntimeContext,
107 workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
108 session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
109 spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
110 tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
111 prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
112}
113
114impl EngineLoop {
115 #[allow(clippy::too_many_arguments)]
116 pub fn new(
117 storage: std::sync::Arc<Storage>,
118 event_bus: EventBus,
119 providers: ProviderRegistry,
120 plugins: PluginRegistry,
121 agents: AgentRegistry,
122 permissions: PermissionManager,
123 tools: ToolRegistry,
124 cancellations: CancellationRegistry,
125 host_runtime_context: HostRuntimeContext,
126 ) -> Self {
127 Self {
128 storage,
129 event_bus,
130 providers,
131 plugins,
132 agents,
133 permissions,
134 tools,
135 cancellations,
136 host_runtime_context,
137 workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
138 session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
139 spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
140 tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
141 prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
142 }
143 }
144
145 pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
146 *self.spawn_agent_hook.write().await = Some(hook);
147 }
148
149 pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
150 *self.tool_policy_hook.write().await = Some(hook);
151 }
152
153 pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
154 *self.prompt_context_hook.write().await = Some(hook);
155 }
156
157 pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
158 let normalized = allowed_tools
159 .into_iter()
160 .map(|tool| normalize_tool_name(&tool))
161 .filter(|tool| !tool.trim().is_empty())
162 .collect::<Vec<_>>();
163 self.session_allowed_tools
164 .write()
165 .await
166 .insert(session_id.to_string(), normalized);
167 }
168
169 pub async fn clear_session_allowed_tools(&self, session_id: &str) {
170 self.session_allowed_tools.write().await.remove(session_id);
171 }
172
173 pub async fn grant_workspace_override_for_session(
174 &self,
175 session_id: &str,
176 ttl_seconds: u64,
177 ) -> u64 {
178 let expires_at = chrono::Utc::now()
179 .timestamp_millis()
180 .max(0)
181 .saturating_add((ttl_seconds as i64).saturating_mul(1000))
182 as u64;
183 self.workspace_overrides
184 .write()
185 .await
186 .insert(session_id.to_string(), expires_at);
187 expires_at
188 }
189
190 pub async fn run_prompt_async(
191 &self,
192 session_id: String,
193 req: SendMessageRequest,
194 ) -> anyhow::Result<()> {
195 self.run_prompt_async_with_context(session_id, req, None)
196 .await
197 }
198
199 pub async fn run_prompt_async_with_context(
200 &self,
201 session_id: String,
202 req: SendMessageRequest,
203 correlation_id: Option<String>,
204 ) -> anyhow::Result<()> {
205 let session_model = self
206 .storage
207 .get_session(&session_id)
208 .await
209 .and_then(|s| s.model);
210 let (provider_id, model_id_value) =
211 resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
212 anyhow::anyhow!(
213 "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
214 )
215 })?;
216 let correlation_ref = correlation_id.as_deref();
217 let model_id = Some(model_id_value.as_str());
218 let cancel = self.cancellations.create(&session_id).await;
219 emit_event(
220 Level::INFO,
221 ProcessKind::Engine,
222 ObservabilityEvent {
223 event: "provider.call.start",
224 component: "engine.loop",
225 correlation_id: correlation_ref,
226 session_id: Some(&session_id),
227 run_id: None,
228 message_id: None,
229 provider_id: Some(provider_id.as_str()),
230 model_id,
231 status: Some("start"),
232 error_code: None,
233 detail: Some("run_prompt_async dispatch"),
234 },
235 );
236 self.event_bus.publish(EngineEvent::new(
237 "session.status",
238 json!({"sessionID": session_id, "status":"running"}),
239 ));
240 let request_parts = req.parts.clone();
241 let requested_tool_mode = req.tool_mode.clone().unwrap_or(ToolMode::Auto);
242 let requested_context_mode = req.context_mode.clone().unwrap_or(ContextMode::Auto);
243 let request_tool_allowlist = req
244 .tool_allowlist
245 .clone()
246 .unwrap_or_default()
247 .into_iter()
248 .map(|tool| normalize_tool_name(&tool))
249 .filter(|tool| !tool.trim().is_empty())
250 .collect::<HashSet<_>>();
251 let text = req
252 .parts
253 .iter()
254 .map(|p| match p {
255 MessagePartInput::Text { text } => text.clone(),
256 MessagePartInput::File {
257 mime,
258 filename,
259 url,
260 } => format!(
261 "[file mime={} name={} url={}]",
262 mime,
263 filename.clone().unwrap_or_else(|| "unknown".to_string()),
264 url
265 ),
266 })
267 .collect::<Vec<_>>()
268 .join("\n");
269 let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
270 self.auto_rename_session_from_user_text(&session_id, &text)
271 .await;
272 let active_agent = self.agents.get(req.agent.as_deref()).await;
273 let mut user_message_id = self
274 .find_recent_matching_user_message_id(&session_id, &text)
275 .await;
276 if user_message_id.is_none() {
277 let user_message = Message::new(
278 MessageRole::User,
279 vec![MessagePart::Text { text: text.clone() }],
280 );
281 let created_message_id = user_message.id.clone();
282 self.storage
283 .append_message(&session_id, user_message)
284 .await?;
285
286 let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
287 self.event_bus.publish(EngineEvent::new(
288 "message.part.updated",
289 json!({
290 "part": user_part,
291 "delta": text,
292 "agent": active_agent.name
293 }),
294 ));
295 user_message_id = Some(created_message_id);
296 }
297 let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
298
299 if cancel.is_cancelled() {
300 self.event_bus.publish(EngineEvent::new(
301 "session.status",
302 json!({"sessionID": session_id, "status":"cancelled"}),
303 ));
304 self.cancellations.remove(&session_id).await;
305 return Ok(());
306 }
307
308 let mut question_tool_used = false;
309 let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
310 if normalize_tool_name(&tool) == "question" {
311 question_tool_used = true;
312 }
313 if !agent_can_use_tool(&active_agent, &tool) {
314 format!(
315 "Tool `{tool}` is not enabled for agent `{}`.",
316 active_agent.name
317 )
318 } else {
319 self.execute_tool_with_permission(
320 &session_id,
321 &user_message_id,
322 tool.clone(),
323 args,
324 active_agent.skills.as_deref(),
325 &text,
326 None,
327 cancel.clone(),
328 )
329 .await?
330 .unwrap_or_default()
331 }
332 } else {
333 let mut completion = String::new();
334 let mut max_iterations = max_tool_iterations();
335 let mut followup_context: Option<String> = None;
336 let mut last_tool_outputs: Vec<String> = Vec::new();
337 let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
338 let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
339 let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
340 let mut mutable_signature_counts: HashMap<String, usize> = HashMap::new();
341 let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
342 let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
343 let mut websearch_query_blocked = false;
344 let mut auto_workspace_probe_attempted = false;
345 let intent = classify_intent(&text);
346 let router_enabled = tool_router_enabled();
347 let mut auto_tools_escalated = matches!(requested_tool_mode, ToolMode::Required);
348 let context_is_auto_compact = matches!(requested_context_mode, ContextMode::Auto)
349 && runtime_attachments.is_empty()
350 && is_short_simple_prompt(&text)
351 && matches!(intent, ToolIntent::Chitchat | ToolIntent::Knowledge);
352
353 while max_iterations > 0 && !cancel.is_cancelled() {
354 let iteration = 26usize.saturating_sub(max_iterations);
355 max_iterations -= 1;
356 let context_profile = if matches!(requested_context_mode, ContextMode::Full) {
357 ChatHistoryProfile::Full
358 } else if matches!(requested_context_mode, ContextMode::Compact)
359 || context_is_auto_compact
360 {
361 ChatHistoryProfile::Compact
362 } else {
363 ChatHistoryProfile::Standard
364 };
365 let mut messages =
366 load_chat_history(self.storage.clone(), &session_id, context_profile).await;
367 if iteration == 1 && !runtime_attachments.is_empty() {
368 attach_to_last_user_message(&mut messages, &runtime_attachments);
369 }
370 let history_char_count = messages.iter().map(|m| m.content.len()).sum::<usize>();
371 self.event_bus.publish(EngineEvent::new(
372 "context.profile.selected",
373 json!({
374 "sessionID": session_id,
375 "messageID": user_message_id,
376 "iteration": iteration,
377 "contextMode": format_context_mode(&requested_context_mode, context_is_auto_compact),
378 "historyMessageCount": messages.len(),
379 "historyCharCount": history_char_count,
380 "memoryInjected": false
381 }),
382 ));
383 let mut system_parts =
384 vec![tandem_runtime_system_prompt(&self.host_runtime_context)];
385 if let Some(system) = active_agent.system_prompt.as_ref() {
386 system_parts.push(system.clone());
387 }
388 messages.insert(
389 0,
390 ChatMessage {
391 role: "system".to_string(),
392 content: system_parts.join("\n\n"),
393 attachments: Vec::new(),
394 },
395 );
396 if let Some(extra) = followup_context.take() {
397 messages.push(ChatMessage {
398 role: "user".to_string(),
399 content: extra,
400 attachments: Vec::new(),
401 });
402 }
403 if let Some(hook) = self.prompt_context_hook.read().await.clone() {
404 let ctx = PromptContextHookContext {
405 session_id: session_id.clone(),
406 message_id: user_message_id.clone(),
407 provider_id: provider_id.clone(),
408 model_id: model_id_value.clone(),
409 iteration,
410 };
411 let hook_timeout =
412 Duration::from_millis(prompt_context_hook_timeout_ms() as u64);
413 match tokio::time::timeout(
414 hook_timeout,
415 hook.augment_provider_messages(ctx, messages.clone()),
416 )
417 .await
418 {
419 Ok(Ok(augmented)) => {
420 messages = augmented;
421 }
422 Ok(Err(err)) => {
423 self.event_bus.publish(EngineEvent::new(
424 "memory.context.error",
425 json!({
426 "sessionID": session_id,
427 "messageID": user_message_id,
428 "iteration": iteration,
429 "error": truncate_text(&err.to_string(), 500),
430 }),
431 ));
432 }
433 Err(_) => {
434 self.event_bus.publish(EngineEvent::new(
435 "memory.context.error",
436 json!({
437 "sessionID": session_id,
438 "messageID": user_message_id,
439 "iteration": iteration,
440 "error": format!(
441 "prompt context hook timeout after {} ms",
442 hook_timeout.as_millis()
443 ),
444 }),
445 ));
446 }
447 }
448 }
449 let all_tools = self.tools.list().await;
450 let mut tool_schemas = if !router_enabled {
451 all_tools
452 } else {
453 match requested_tool_mode {
454 ToolMode::None => Vec::new(),
455 ToolMode::Required => select_tool_subset(
456 all_tools,
457 intent,
458 &request_tool_allowlist,
459 iteration > 1,
460 ),
461 ToolMode::Auto => {
462 if !auto_tools_escalated {
463 Vec::new()
464 } else {
465 select_tool_subset(
466 all_tools,
467 intent,
468 &request_tool_allowlist,
469 iteration > 1,
470 )
471 }
472 }
473 }
474 };
475 if !request_tool_allowlist.is_empty() {
476 tool_schemas.retain(|schema| {
477 let tool = normalize_tool_name(&schema.name);
478 request_tool_allowlist
479 .iter()
480 .any(|pattern| tool_name_matches_policy(pattern, &tool))
481 });
482 }
483 if active_agent.tools.is_some() {
484 tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
485 }
486 tool_schemas.retain(|schema| {
487 let normalized = normalize_tool_name(&schema.name);
488 if let Some(server) = mcp_server_from_tool_name(&normalized) {
489 !blocked_mcp_servers.contains(server)
490 } else {
491 true
492 }
493 });
494 if let Some(allowed_tools) = self
495 .session_allowed_tools
496 .read()
497 .await
498 .get(&session_id)
499 .cloned()
500 {
501 if !allowed_tools.is_empty() {
502 tool_schemas.retain(|schema| {
503 let normalized = normalize_tool_name(&schema.name);
504 any_policy_matches(&allowed_tools, &normalized)
505 });
506 }
507 }
508 if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
509 let detail = validation_err.to_string();
510 emit_event(
511 Level::ERROR,
512 ProcessKind::Engine,
513 ObservabilityEvent {
514 event: "provider.call.error",
515 component: "engine.loop",
516 correlation_id: correlation_ref,
517 session_id: Some(&session_id),
518 run_id: None,
519 message_id: Some(&user_message_id),
520 provider_id: Some(provider_id.as_str()),
521 model_id,
522 status: Some("failed"),
523 error_code: Some("TOOL_SCHEMA_INVALID"),
524 detail: Some(&detail),
525 },
526 );
527 anyhow::bail!("{detail}");
528 }
529 let routing_decision = ToolRoutingDecision {
530 pass: if auto_tools_escalated { 2 } else { 1 },
531 mode: match requested_tool_mode {
532 ToolMode::Auto => default_mode_name(),
533 ToolMode::None => "none",
534 ToolMode::Required => "required",
535 },
536 intent,
537 selected_count: tool_schemas.len(),
538 total_available_count: self.tools.list().await.len(),
539 mcp_included: tool_schemas
540 .iter()
541 .any(|schema| normalize_tool_name(&schema.name).starts_with("mcp.")),
542 };
543 self.event_bus.publish(EngineEvent::new(
544 "tool.routing.decision",
545 json!({
546 "sessionID": session_id,
547 "messageID": user_message_id,
548 "iteration": iteration,
549 "pass": routing_decision.pass,
550 "mode": routing_decision.mode,
551 "intent": format!("{:?}", routing_decision.intent).to_ascii_lowercase(),
552 "selectedToolCount": routing_decision.selected_count,
553 "totalAvailableTools": routing_decision.total_available_count,
554 "mcpIncluded": routing_decision.mcp_included
555 }),
556 ));
557 let allowed_tool_names = tool_schemas
558 .iter()
559 .map(|schema| normalize_tool_name(&schema.name))
560 .collect::<HashSet<_>>();
561 self.event_bus.publish(EngineEvent::new(
562 "provider.call.iteration.start",
563 json!({
564 "sessionID": session_id,
565 "messageID": user_message_id,
566 "iteration": iteration,
567 "selectedToolCount": allowed_tool_names.len(),
568 }),
569 ));
570 let provider_connect_timeout =
571 Duration::from_millis(provider_stream_connect_timeout_ms() as u64);
572 let stream_result = tokio::time::timeout(
573 provider_connect_timeout,
574 self.providers.stream_for_provider(
575 Some(provider_id.as_str()),
576 Some(model_id_value.as_str()),
577 messages,
578 Some(tool_schemas),
579 cancel.clone(),
580 ),
581 )
582 .await
583 .map_err(|_| {
584 anyhow::anyhow!(
585 "provider stream connect timeout after {} ms",
586 provider_connect_timeout.as_millis()
587 )
588 })
589 .and_then(|result| result);
590 let stream = match stream_result {
591 Ok(stream) => stream,
592 Err(err) => {
593 let error_text = err.to_string();
594 let error_code = provider_error_code(&error_text);
595 let detail = truncate_text(&error_text, 500);
596 emit_event(
597 Level::ERROR,
598 ProcessKind::Engine,
599 ObservabilityEvent {
600 event: "provider.call.error",
601 component: "engine.loop",
602 correlation_id: correlation_ref,
603 session_id: Some(&session_id),
604 run_id: None,
605 message_id: Some(&user_message_id),
606 provider_id: Some(provider_id.as_str()),
607 model_id,
608 status: Some("failed"),
609 error_code: Some(error_code),
610 detail: Some(&detail),
611 },
612 );
613 self.event_bus.publish(EngineEvent::new(
614 "provider.call.iteration.error",
615 json!({
616 "sessionID": session_id,
617 "messageID": user_message_id,
618 "iteration": iteration,
619 "error": detail,
620 }),
621 ));
622 return Err(err);
623 }
624 };
625 tokio::pin!(stream);
626 completion.clear();
627 let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
628 let mut provider_usage: Option<TokenUsage> = None;
629 let mut accepted_tool_calls_in_cycle = 0usize;
630 let provider_idle_timeout =
631 Duration::from_millis(provider_stream_idle_timeout_ms() as u64);
632 loop {
633 let next_chunk_result =
634 tokio::time::timeout(provider_idle_timeout, stream.next())
635 .await
636 .map_err(|_| {
637 anyhow::anyhow!(
638 "provider stream idle timeout after {} ms",
639 provider_idle_timeout.as_millis()
640 )
641 });
642 let next_chunk = match next_chunk_result {
643 Ok(next_chunk) => next_chunk,
644 Err(err) => {
645 self.event_bus.publish(EngineEvent::new(
646 "provider.call.iteration.error",
647 json!({
648 "sessionID": session_id,
649 "messageID": user_message_id,
650 "iteration": iteration,
651 "error": truncate_text(&err.to_string(), 500),
652 }),
653 ));
654 return Err(err);
655 }
656 };
657 let Some(chunk) = next_chunk else {
658 break;
659 };
660 let chunk = match chunk {
661 Ok(chunk) => chunk,
662 Err(err) => {
663 let error_text = err.to_string();
664 let error_code = provider_error_code(&error_text);
665 let detail = truncate_text(&error_text, 500);
666 emit_event(
667 Level::ERROR,
668 ProcessKind::Engine,
669 ObservabilityEvent {
670 event: "provider.call.error",
671 component: "engine.loop",
672 correlation_id: correlation_ref,
673 session_id: Some(&session_id),
674 run_id: None,
675 message_id: Some(&user_message_id),
676 provider_id: Some(provider_id.as_str()),
677 model_id,
678 status: Some("failed"),
679 error_code: Some(error_code),
680 detail: Some(&detail),
681 },
682 );
683 self.event_bus.publish(EngineEvent::new(
684 "provider.call.iteration.error",
685 json!({
686 "sessionID": session_id,
687 "messageID": user_message_id,
688 "iteration": iteration,
689 "error": detail,
690 }),
691 ));
692 return Err(anyhow::anyhow!(
693 "provider stream chunk error: {error_text}"
694 ));
695 }
696 };
697 match chunk {
698 StreamChunk::TextDelta(delta) => {
699 let delta = strip_model_control_markers(&delta);
700 if delta.trim().is_empty() {
701 continue;
702 }
703 if completion.is_empty() {
704 emit_event(
705 Level::INFO,
706 ProcessKind::Engine,
707 ObservabilityEvent {
708 event: "provider.call.first_byte",
709 component: "engine.loop",
710 correlation_id: correlation_ref,
711 session_id: Some(&session_id),
712 run_id: None,
713 message_id: Some(&user_message_id),
714 provider_id: Some(provider_id.as_str()),
715 model_id,
716 status: Some("streaming"),
717 error_code: None,
718 detail: Some("first text delta"),
719 },
720 );
721 }
722 completion.push_str(&delta);
723 let delta = truncate_text(&delta, 4_000);
724 let delta_part =
725 WireMessagePart::text(&session_id, &user_message_id, delta.clone());
726 self.event_bus.publish(EngineEvent::new(
727 "message.part.updated",
728 json!({"part": delta_part, "delta": delta}),
729 ));
730 }
731 StreamChunk::ReasoningDelta(_reasoning) => {}
732 StreamChunk::Done {
733 finish_reason: _,
734 usage,
735 } => {
736 if usage.is_some() {
737 provider_usage = usage;
738 }
739 break;
740 }
741 StreamChunk::ToolCallStart { id, name } => {
742 let entry = streamed_tool_calls.entry(id).or_default();
743 if entry.name.is_empty() {
744 entry.name = name;
745 }
746 }
747 StreamChunk::ToolCallDelta { id, args_delta } => {
748 let entry = streamed_tool_calls.entry(id.clone()).or_default();
749 entry.args.push_str(&args_delta);
750 let tool_name = if entry.name.trim().is_empty() {
751 "tool".to_string()
752 } else {
753 normalize_tool_name(&entry.name)
754 };
755 let parsed_preview = if entry.name.trim().is_empty() {
756 Value::String(truncate_text(&entry.args, 1_000))
757 } else {
758 parse_streamed_tool_args(&tool_name, &entry.args)
759 };
760 let mut tool_part = WireMessagePart::tool_invocation(
761 &session_id,
762 &user_message_id,
763 tool_name.clone(),
764 json!({}),
765 );
766 tool_part.id = Some(id.clone());
767 self.event_bus.publish(EngineEvent::new(
768 "message.part.updated",
769 json!({
770 "part": tool_part,
771 "toolCallDelta": {
772 "id": id,
773 "tool": tool_name,
774 "argsDelta": truncate_text(&args_delta, 1_000),
775 "parsedArgsPreview": parsed_preview
776 }
777 }),
778 ));
779 }
780 StreamChunk::ToolCallEnd { id: _ } => {}
781 }
782 if cancel.is_cancelled() {
783 break;
784 }
785 }
786
787 let mut tool_calls = streamed_tool_calls
788 .into_values()
789 .filter_map(|call| {
790 if call.name.trim().is_empty() {
791 return None;
792 }
793 let tool_name = normalize_tool_name(&call.name);
794 let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
795 Some((tool_name, parsed_args))
796 })
797 .collect::<Vec<_>>();
798 if tool_calls.is_empty() {
799 tool_calls = parse_tool_invocations_from_response(&completion);
800 }
801 if router_enabled
802 && matches!(requested_tool_mode, ToolMode::Auto)
803 && !auto_tools_escalated
804 && iteration == 1
805 && should_escalate_auto_tools(intent, &text, &completion)
806 {
807 auto_tools_escalated = true;
808 followup_context = Some(
809 "Tool access is now enabled for this request. Use only necessary tools and then answer concisely."
810 .to_string(),
811 );
812 self.event_bus.publish(EngineEvent::new(
813 "provider.call.iteration.finish",
814 json!({
815 "sessionID": session_id,
816 "messageID": user_message_id,
817 "iteration": iteration,
818 "finishReason": "auto_escalate",
819 "acceptedToolCalls": accepted_tool_calls_in_cycle,
820 "rejectedToolCalls": 0,
821 }),
822 ));
823 continue;
824 }
825 if tool_calls.is_empty()
826 && !auto_workspace_probe_attempted
827 && should_force_workspace_probe(&text, &completion)
828 && allowed_tool_names.contains("glob")
829 {
830 auto_workspace_probe_attempted = true;
831 tool_calls = vec![("glob".to_string(), json!({ "pattern": "*" }))];
832 }
833 if !tool_calls.is_empty() {
834 let mut outputs = Vec::new();
835 let mut executed_productive_tool = false;
836 let mut auth_required_hit_in_cycle = false;
837 let mut guard_budget_hit_in_cycle = false;
838 let mut duplicate_signature_hit_in_cycle = false;
839 for (tool, args) in tool_calls {
840 if !agent_can_use_tool(&active_agent, &tool) {
841 continue;
842 }
843 let tool_key = normalize_tool_name(&tool);
844 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
845 if blocked_mcp_servers.contains(server) {
846 outputs.push(format!(
847 "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
848 tool_key, server
849 ));
850 continue;
851 }
852 }
853 if tool_key == "question" {
854 question_tool_used = true;
855 }
856 if websearch_query_blocked && tool_key == "websearch" {
857 outputs.push(
858 "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
859 .to_string(),
860 );
861 continue;
862 }
863 let mut effective_args = args.clone();
864 if tool_key == "todo_write" {
865 effective_args = normalize_todo_write_args(effective_args, &completion);
866 if is_empty_todo_write_args(&effective_args) {
867 outputs.push(
868 "Tool `todo_write` call skipped: empty todo payload."
869 .to_string(),
870 );
871 continue;
872 }
873 }
874 let signature = if tool_key == "batch" {
875 batch_tool_signature(&args)
876 .unwrap_or_else(|| tool_signature(&tool_key, &args))
877 } else {
878 tool_signature(&tool_key, &args)
879 };
880 if is_shell_tool_name(&tool_key)
881 && shell_mismatch_signatures.contains(&signature)
882 {
883 outputs.push(
884 "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
885 .to_string(),
886 );
887 continue;
888 }
889 let mut signature_count = 1usize;
890 if is_read_only_tool(&tool_key)
891 || (tool_key == "batch" && is_read_only_batch_call(&args))
892 {
893 let count = readonly_signature_counts
894 .entry(signature.clone())
895 .and_modify(|v| *v = v.saturating_add(1))
896 .or_insert(1);
897 signature_count = *count;
898 if tool_key == "websearch" && *count > 2 {
899 self.event_bus.publish(EngineEvent::new(
900 "tool.loop_guard.triggered",
901 json!({
902 "sessionID": session_id,
903 "messageID": user_message_id,
904 "tool": tool_key,
905 "reason": "duplicate_signature_retry_exhausted",
906 "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
907 "loop_guard_triggered": true
908 }),
909 ));
910 outputs.push(
911 "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
912 .to_string(),
913 );
914 continue;
915 }
916 if tool_key != "websearch" && *count > 1 {
917 if let Some(cached) = readonly_tool_cache.get(&signature) {
918 outputs.push(cached.clone());
919 } else {
920 outputs.push(format!(
921 "Tool `{}` call skipped: duplicate call signature detected.",
922 tool_key
923 ));
924 }
925 continue;
926 }
927 }
928 let is_read_only_signature = is_read_only_tool(&tool_key)
929 || (tool_key == "batch" && is_read_only_batch_call(&args));
930 if !is_read_only_signature {
931 let duplicate_limit = duplicate_signature_limit_for(&tool_key);
932 let seen = mutable_signature_counts
933 .entry(signature.clone())
934 .and_modify(|v| *v = v.saturating_add(1))
935 .or_insert(1);
936 if *seen > duplicate_limit {
937 self.event_bus.publish(EngineEvent::new(
938 "tool.loop_guard.triggered",
939 json!({
940 "sessionID": session_id,
941 "messageID": user_message_id,
942 "tool": tool_key,
943 "reason": "duplicate_signature_retry_exhausted",
944 "signatureHash": stable_hash(&signature),
945 "duplicateLimit": duplicate_limit,
946 "loop_guard_triggered": true
947 }),
948 ));
949 outputs.push(format!(
950 "Tool `{}` call skipped: duplicate call signature retry limit reached ({}).",
951 tool_key, duplicate_limit
952 ));
953 duplicate_signature_hit_in_cycle = true;
954 continue;
955 }
956 }
957 let budget = tool_budget_for(&tool_key);
958 let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
959 if *entry >= budget {
960 outputs.push(format!(
961 "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
962 tool_key, budget
963 ));
964 guard_budget_hit_in_cycle = true;
965 continue;
966 }
967 *entry += 1;
968 accepted_tool_calls_in_cycle =
969 accepted_tool_calls_in_cycle.saturating_add(1);
970 if let Some(output) = self
971 .execute_tool_with_permission(
972 &session_id,
973 &user_message_id,
974 tool,
975 effective_args,
976 active_agent.skills.as_deref(),
977 &text,
978 Some(&completion),
979 cancel.clone(),
980 )
981 .await?
982 {
983 let productive = !(tool_key == "batch"
984 && is_non_productive_batch_output(&output))
985 && !is_auth_required_tool_output(&output);
986 if output.contains("WEBSEARCH_QUERY_MISSING") {
987 websearch_query_blocked = true;
988 }
989 if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
990 {
991 shell_mismatch_signatures.insert(signature.clone());
992 }
993 if is_read_only_tool(&tool_key)
994 && tool_key != "websearch"
995 && signature_count == 1
996 {
997 readonly_tool_cache.insert(signature, output.clone());
998 }
999 if productive {
1000 executed_productive_tool = true;
1001 }
1002 if is_auth_required_tool_output(&output) {
1003 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
1004 blocked_mcp_servers.insert(server.to_string());
1005 }
1006 auth_required_hit_in_cycle = true;
1007 }
1008 outputs.push(output);
1009 if auth_required_hit_in_cycle {
1010 break;
1011 }
1012 if guard_budget_hit_in_cycle {
1013 break;
1014 }
1015 }
1016 }
1017 if !outputs.is_empty() {
1018 last_tool_outputs = outputs.clone();
1019 let guard_budget_hit =
1020 outputs.iter().any(|o| is_guard_budget_tool_output(o));
1021 if executed_productive_tool {
1022 followup_context = Some(format!(
1023 "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
1024 summarize_tool_outputs(&outputs)
1025 ));
1026 self.event_bus.publish(EngineEvent::new(
1027 "provider.call.iteration.finish",
1028 json!({
1029 "sessionID": session_id,
1030 "messageID": user_message_id,
1031 "iteration": iteration,
1032 "finishReason": "tool_followup",
1033 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1034 "rejectedToolCalls": 0,
1035 }),
1036 ));
1037 continue;
1038 }
1039 if guard_budget_hit {
1040 completion = summarize_guard_budget_outputs(&outputs)
1041 .unwrap_or_else(|| {
1042 "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
1043 });
1044 } else if duplicate_signature_hit_in_cycle {
1045 completion = summarize_duplicate_signature_outputs(&outputs)
1046 .unwrap_or_else(|| {
1047 "This run paused because the same tool call kept repeating. Rephrase the request or provide a different command target and retry.".to_string()
1048 });
1049 } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
1050 completion = summary;
1051 } else {
1052 completion.clear();
1053 }
1054 self.event_bus.publish(EngineEvent::new(
1055 "provider.call.iteration.finish",
1056 json!({
1057 "sessionID": session_id,
1058 "messageID": user_message_id,
1059 "iteration": iteration,
1060 "finishReason": "tool_summary",
1061 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1062 "rejectedToolCalls": 0,
1063 }),
1064 ));
1065 break;
1066 }
1067 }
1068
1069 if let Some(usage) = provider_usage {
1070 self.event_bus.publish(EngineEvent::new(
1071 "provider.usage",
1072 json!({
1073 "sessionID": session_id,
1074 "messageID": user_message_id,
1075 "promptTokens": usage.prompt_tokens,
1076 "completionTokens": usage.completion_tokens,
1077 "totalTokens": usage.total_tokens,
1078 }),
1079 ));
1080 }
1081
1082 self.event_bus.publish(EngineEvent::new(
1083 "provider.call.iteration.finish",
1084 json!({
1085 "sessionID": session_id,
1086 "messageID": user_message_id,
1087 "iteration": iteration,
1088 "finishReason": "provider_completion",
1089 "acceptedToolCalls": accepted_tool_calls_in_cycle,
1090 "rejectedToolCalls": 0,
1091 }),
1092 ));
1093 break;
1094 }
1095 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
1096 if let Some(narrative) = self
1097 .generate_final_narrative_without_tools(
1098 &session_id,
1099 &active_agent,
1100 Some(provider_id.as_str()),
1101 Some(model_id_value.as_str()),
1102 cancel.clone(),
1103 &last_tool_outputs,
1104 )
1105 .await
1106 {
1107 completion = narrative;
1108 }
1109 }
1110 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
1111 let preview = last_tool_outputs
1112 .iter()
1113 .take(3)
1114 .map(|o| truncate_text(o, 240))
1115 .collect::<Vec<_>>()
1116 .join("\n");
1117 completion = format!(
1118 "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
1119 preview
1120 );
1121 }
1122 if completion.trim().is_empty() {
1123 completion =
1124 "I couldn't produce a final response for that run. Please retry your request."
1125 .to_string();
1126 }
1127 completion = strip_model_control_markers(&completion);
1128 truncate_text(&completion, 16_000)
1129 };
1130 emit_event(
1131 Level::INFO,
1132 ProcessKind::Engine,
1133 ObservabilityEvent {
1134 event: "provider.call.finish",
1135 component: "engine.loop",
1136 correlation_id: correlation_ref,
1137 session_id: Some(&session_id),
1138 run_id: None,
1139 message_id: Some(&user_message_id),
1140 provider_id: Some(provider_id.as_str()),
1141 model_id,
1142 status: Some("ok"),
1143 error_code: None,
1144 detail: Some("provider stream complete"),
1145 },
1146 );
1147 if active_agent.name.eq_ignore_ascii_case("plan") {
1148 emit_plan_todo_fallback(
1149 self.storage.clone(),
1150 &self.event_bus,
1151 &session_id,
1152 &user_message_id,
1153 &completion,
1154 )
1155 .await;
1156 let todos_after_fallback = self.storage.get_todos(&session_id).await;
1157 if todos_after_fallback.is_empty() && !question_tool_used {
1158 emit_plan_question_fallback(
1159 self.storage.clone(),
1160 &self.event_bus,
1161 &session_id,
1162 &user_message_id,
1163 &completion,
1164 )
1165 .await;
1166 }
1167 }
1168 if cancel.is_cancelled() {
1169 self.event_bus.publish(EngineEvent::new(
1170 "session.status",
1171 json!({"sessionID": session_id, "status":"cancelled"}),
1172 ));
1173 self.cancellations.remove(&session_id).await;
1174 return Ok(());
1175 }
1176 let assistant = Message::new(
1177 MessageRole::Assistant,
1178 vec![MessagePart::Text {
1179 text: completion.clone(),
1180 }],
1181 );
1182 let assistant_message_id = assistant.id.clone();
1183 self.storage.append_message(&session_id, assistant).await?;
1184 let final_part = WireMessagePart::text(
1185 &session_id,
1186 &assistant_message_id,
1187 truncate_text(&completion, 16_000),
1188 );
1189 self.event_bus.publish(EngineEvent::new(
1190 "message.part.updated",
1191 json!({"part": final_part}),
1192 ));
1193 self.event_bus.publish(EngineEvent::new(
1194 "session.updated",
1195 json!({"sessionID": session_id, "status":"idle"}),
1196 ));
1197 self.event_bus.publish(EngineEvent::new(
1198 "session.status",
1199 json!({"sessionID": session_id, "status":"idle"}),
1200 ));
1201 self.cancellations.remove(&session_id).await;
1202 Ok(())
1203 }
1204
1205 pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
1206 self.providers.default_complete(&prompt).await
1207 }
1208
1209 pub async fn run_oneshot_for_provider(
1210 &self,
1211 prompt: String,
1212 provider_id: Option<&str>,
1213 ) -> anyhow::Result<String> {
1214 self.providers
1215 .complete_for_provider(provider_id, &prompt, None)
1216 .await
1217 }
1218
1219 #[allow(clippy::too_many_arguments)]
1220 async fn execute_tool_with_permission(
1221 &self,
1222 session_id: &str,
1223 message_id: &str,
1224 tool: String,
1225 args: Value,
1226 equipped_skills: Option<&[String]>,
1227 latest_user_text: &str,
1228 latest_assistant_context: Option<&str>,
1229 cancel: CancellationToken,
1230 ) -> anyhow::Result<Option<String>> {
1231 let tool = normalize_tool_name(&tool);
1232 let normalized = normalize_tool_args(
1233 &tool,
1234 args,
1235 latest_user_text,
1236 latest_assistant_context.unwrap_or_default(),
1237 );
1238 self.event_bus.publish(EngineEvent::new(
1239 "tool.args.normalized",
1240 json!({
1241 "sessionID": session_id,
1242 "messageID": message_id,
1243 "tool": tool,
1244 "argsSource": normalized.args_source,
1245 "argsIntegrity": normalized.args_integrity,
1246 "query": normalized.query,
1247 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
1248 "requestID": Value::Null
1249 }),
1250 ));
1251 if normalized.args_integrity == "recovered" {
1252 self.event_bus.publish(EngineEvent::new(
1253 "tool.args.recovered",
1254 json!({
1255 "sessionID": session_id,
1256 "messageID": message_id,
1257 "tool": tool,
1258 "argsSource": normalized.args_source,
1259 "query": normalized.query,
1260 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
1261 "requestID": Value::Null
1262 }),
1263 ));
1264 }
1265 if normalized.missing_terminal {
1266 let missing_reason = normalized
1267 .missing_terminal_reason
1268 .clone()
1269 .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
1270 self.event_bus.publish(EngineEvent::new(
1271 "tool.args.missing_terminal",
1272 json!({
1273 "sessionID": session_id,
1274 "messageID": message_id,
1275 "tool": tool,
1276 "argsSource": normalized.args_source,
1277 "argsIntegrity": normalized.args_integrity,
1278 "requestID": Value::Null,
1279 "error": missing_reason
1280 }),
1281 ));
1282 let mut failed_part =
1283 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1284 failed_part.state = Some("failed".to_string());
1285 failed_part.error = Some(missing_reason.clone());
1286 self.event_bus.publish(EngineEvent::new(
1287 "message.part.updated",
1288 json!({"part": failed_part}),
1289 ));
1290 return Ok(Some(missing_reason));
1291 }
1292
1293 let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
1294 Ok(args) => args,
1295 Err(message) => return Ok(Some(message)),
1296 };
1297 if let Some(allowed_tools) = self
1298 .session_allowed_tools
1299 .read()
1300 .await
1301 .get(session_id)
1302 .cloned()
1303 {
1304 if !allowed_tools.is_empty() && !any_policy_matches(&allowed_tools, &tool) {
1305 return Ok(Some(format!("Tool `{tool}` is not allowed for this run.")));
1306 }
1307 }
1308 if let Some(hook) = self.tool_policy_hook.read().await.clone() {
1309 let decision = hook
1310 .evaluate_tool(ToolPolicyContext {
1311 session_id: session_id.to_string(),
1312 message_id: message_id.to_string(),
1313 tool: tool.clone(),
1314 args: args.clone(),
1315 })
1316 .await?;
1317 if !decision.allowed {
1318 let reason = decision
1319 .reason
1320 .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
1321 let mut blocked_part =
1322 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1323 blocked_part.state = Some("failed".to_string());
1324 blocked_part.error = Some(reason.clone());
1325 self.event_bus.publish(EngineEvent::new(
1326 "message.part.updated",
1327 json!({"part": blocked_part}),
1328 ));
1329 return Ok(Some(reason));
1330 }
1331 }
1332 let mut tool_call_id: Option<String> = None;
1333 if let Some(violation) = self
1334 .workspace_sandbox_violation(session_id, &tool, &args)
1335 .await
1336 {
1337 let mut blocked_part =
1338 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1339 blocked_part.state = Some("failed".to_string());
1340 blocked_part.error = Some(violation.clone());
1341 self.event_bus.publish(EngineEvent::new(
1342 "message.part.updated",
1343 json!({"part": blocked_part}),
1344 ));
1345 return Ok(Some(violation));
1346 }
1347 let rule = self
1348 .plugins
1349 .permission_override(&tool)
1350 .await
1351 .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
1352 if matches!(rule, PermissionAction::Deny) {
1353 return Ok(Some(format!(
1354 "Permission denied for tool `{tool}` by policy."
1355 )));
1356 }
1357
1358 let mut effective_args = args.clone();
1359 if matches!(rule, PermissionAction::Ask) {
1360 let pending = self
1361 .permissions
1362 .ask_for_session_with_context(
1363 Some(session_id),
1364 &tool,
1365 args.clone(),
1366 Some(crate::PermissionArgsContext {
1367 args_source: normalized.args_source.clone(),
1368 args_integrity: normalized.args_integrity.clone(),
1369 query: normalized.query.clone(),
1370 }),
1371 )
1372 .await;
1373 let mut pending_part = WireMessagePart::tool_invocation(
1374 session_id,
1375 message_id,
1376 tool.clone(),
1377 args.clone(),
1378 );
1379 pending_part.id = Some(pending.id.clone());
1380 tool_call_id = Some(pending.id.clone());
1381 pending_part.state = Some("pending".to_string());
1382 self.event_bus.publish(EngineEvent::new(
1383 "message.part.updated",
1384 json!({"part": pending_part}),
1385 ));
1386 let reply = self
1387 .permissions
1388 .wait_for_reply_with_timeout(
1389 &pending.id,
1390 cancel.clone(),
1391 Some(Duration::from_millis(permission_wait_timeout_ms() as u64)),
1392 )
1393 .await;
1394 let (reply, timed_out) = reply;
1395 if cancel.is_cancelled() {
1396 return Ok(None);
1397 }
1398 if timed_out {
1399 let timeout_ms = permission_wait_timeout_ms();
1400 self.event_bus.publish(EngineEvent::new(
1401 "permission.wait.timeout",
1402 json!({
1403 "sessionID": session_id,
1404 "messageID": message_id,
1405 "tool": tool,
1406 "requestID": pending.id,
1407 "timeoutMs": timeout_ms,
1408 }),
1409 ));
1410 let mut timeout_part =
1411 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1412 timeout_part.id = Some(pending.id);
1413 timeout_part.state = Some("failed".to_string());
1414 timeout_part.error = Some(format!(
1415 "Permission request timed out after {} ms",
1416 timeout_ms
1417 ));
1418 self.event_bus.publish(EngineEvent::new(
1419 "message.part.updated",
1420 json!({"part": timeout_part}),
1421 ));
1422 return Ok(Some(format!(
1423 "Permission request for tool `{tool}` timed out after {timeout_ms} ms."
1424 )));
1425 }
1426 let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
1427 if !approved {
1428 let mut denied_part =
1429 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1430 denied_part.id = Some(pending.id);
1431 denied_part.state = Some("denied".to_string());
1432 denied_part.error = Some("Permission denied by user".to_string());
1433 self.event_bus.publish(EngineEvent::new(
1434 "message.part.updated",
1435 json!({"part": denied_part}),
1436 ));
1437 return Ok(Some(format!(
1438 "Permission denied for tool `{tool}` by user."
1439 )));
1440 }
1441 effective_args = args;
1442 }
1443
1444 let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
1445 let tool_context = self.resolve_tool_execution_context(session_id).await;
1446 if let Some((workspace_root, effective_cwd)) = tool_context.as_ref() {
1447 if let Some(obj) = args.as_object_mut() {
1448 obj.insert(
1449 "__workspace_root".to_string(),
1450 Value::String(workspace_root.clone()),
1451 );
1452 obj.insert(
1453 "__effective_cwd".to_string(),
1454 Value::String(effective_cwd.clone()),
1455 );
1456 obj.insert(
1457 "__session_id".to_string(),
1458 Value::String(session_id.to_string()),
1459 );
1460 }
1461 tracing::info!(
1462 "tool execution context session_id={} tool={} workspace_root={} effective_cwd={}",
1463 session_id,
1464 tool,
1465 workspace_root,
1466 effective_cwd
1467 );
1468 }
1469 let mut invoke_part =
1470 WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
1471 if let Some(call_id) = tool_call_id.clone() {
1472 invoke_part.id = Some(call_id);
1473 }
1474 let invoke_part_id = invoke_part.id.clone();
1475 self.event_bus.publish(EngineEvent::new(
1476 "message.part.updated",
1477 json!({"part": invoke_part}),
1478 ));
1479 let args_for_side_events = args.clone();
1480 if tool == "spawn_agent" {
1481 let hook = self.spawn_agent_hook.read().await.clone();
1482 if let Some(hook) = hook {
1483 let spawned = hook
1484 .spawn_agent(SpawnAgentToolContext {
1485 session_id: session_id.to_string(),
1486 message_id: message_id.to_string(),
1487 tool_call_id: invoke_part_id.clone(),
1488 args: args_for_side_events.clone(),
1489 })
1490 .await?;
1491 let output = self.plugins.transform_tool_output(spawned.output).await;
1492 let output = truncate_text(&output, 16_000);
1493 emit_tool_side_events(
1494 self.storage.clone(),
1495 &self.event_bus,
1496 ToolSideEventContext {
1497 session_id,
1498 message_id,
1499 tool: &tool,
1500 args: &args_for_side_events,
1501 metadata: &spawned.metadata,
1502 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1503 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1504 },
1505 )
1506 .await;
1507 let mut result_part = WireMessagePart::tool_result(
1508 session_id,
1509 message_id,
1510 tool.clone(),
1511 json!(output.clone()),
1512 );
1513 result_part.id = invoke_part_id;
1514 self.event_bus.publish(EngineEvent::new(
1515 "message.part.updated",
1516 json!({"part": result_part}),
1517 ));
1518 return Ok(Some(truncate_text(
1519 &format!("Tool `{tool}` result:\n{output}"),
1520 16_000,
1521 )));
1522 }
1523 let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
1524 let mut failed_part =
1525 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1526 failed_part.id = invoke_part_id.clone();
1527 failed_part.state = Some("failed".to_string());
1528 failed_part.error = Some(output.to_string());
1529 self.event_bus.publish(EngineEvent::new(
1530 "message.part.updated",
1531 json!({"part": failed_part}),
1532 ));
1533 return Ok(Some(output.to_string()));
1534 }
1535 let result = match self
1536 .execute_tool_with_timeout(&tool, args, cancel.clone())
1537 .await
1538 {
1539 Ok(result) => result,
1540 Err(err) => {
1541 let err_text = err.to_string();
1542 if err_text.contains("TOOL_EXEC_TIMEOUT_MS_EXCEEDED(") {
1543 let timeout_ms = tool_exec_timeout_ms();
1544 let timeout_output = format!(
1545 "Tool `{tool}` timed out after {timeout_ms} ms. It was stopped to keep this run responsive."
1546 );
1547 let mut failed_part = WireMessagePart::tool_result(
1548 session_id,
1549 message_id,
1550 tool.clone(),
1551 json!(null),
1552 );
1553 failed_part.id = invoke_part_id.clone();
1554 failed_part.state = Some("failed".to_string());
1555 failed_part.error = Some(timeout_output.clone());
1556 self.event_bus.publish(EngineEvent::new(
1557 "message.part.updated",
1558 json!({"part": failed_part}),
1559 ));
1560 return Ok(Some(timeout_output));
1561 }
1562 if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
1563 self.event_bus.publish(EngineEvent::new(
1564 "mcp.auth.required",
1565 json!({
1566 "sessionID": session_id,
1567 "messageID": message_id,
1568 "tool": tool.clone(),
1569 "server": auth.server,
1570 "authorizationUrl": auth.authorization_url,
1571 "message": auth.message,
1572 "challengeId": auth.challenge_id
1573 }),
1574 ));
1575 let auth_output = format!(
1576 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1577 tool, auth.message, auth.authorization_url
1578 );
1579 let mut result_part = WireMessagePart::tool_result(
1580 session_id,
1581 message_id,
1582 tool.clone(),
1583 json!(auth_output.clone()),
1584 );
1585 result_part.id = invoke_part_id.clone();
1586 self.event_bus.publish(EngineEvent::new(
1587 "message.part.updated",
1588 json!({"part": result_part}),
1589 ));
1590 return Ok(Some(truncate_text(
1591 &format!("Tool `{tool}` result:\n{auth_output}"),
1592 16_000,
1593 )));
1594 }
1595 let mut failed_part =
1596 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1597 failed_part.id = invoke_part_id.clone();
1598 failed_part.state = Some("failed".to_string());
1599 failed_part.error = Some(err_text.clone());
1600 self.event_bus.publish(EngineEvent::new(
1601 "message.part.updated",
1602 json!({"part": failed_part}),
1603 ));
1604 return Err(err);
1605 }
1606 };
1607 if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1608 let event_name = if auth.pending && auth.blocked {
1609 "mcp.auth.pending"
1610 } else {
1611 "mcp.auth.required"
1612 };
1613 self.event_bus.publish(EngineEvent::new(
1614 event_name,
1615 json!({
1616 "sessionID": session_id,
1617 "messageID": message_id,
1618 "tool": tool.clone(),
1619 "server": auth.server,
1620 "authorizationUrl": auth.authorization_url,
1621 "message": auth.message,
1622 "challengeId": auth.challenge_id,
1623 "pending": auth.pending,
1624 "blocked": auth.blocked,
1625 "retryAfterMs": auth.retry_after_ms
1626 }),
1627 ));
1628 }
1629 emit_tool_side_events(
1630 self.storage.clone(),
1631 &self.event_bus,
1632 ToolSideEventContext {
1633 session_id,
1634 message_id,
1635 tool: &tool,
1636 args: &args_for_side_events,
1637 metadata: &result.metadata,
1638 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1639 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1640 },
1641 )
1642 .await;
1643 let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1644 if auth.pending && auth.blocked {
1645 let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
1646 format!(
1647 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1648 tool, auth.message, auth.authorization_url, retry_after_secs
1649 )
1650 } else {
1651 format!(
1652 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1653 tool, auth.message, auth.authorization_url
1654 )
1655 }
1656 } else {
1657 self.plugins.transform_tool_output(result.output).await
1658 };
1659 let output = truncate_text(&output, 16_000);
1660 let mut result_part = WireMessagePart::tool_result(
1661 session_id,
1662 message_id,
1663 tool.clone(),
1664 json!(output.clone()),
1665 );
1666 result_part.id = invoke_part_id;
1667 self.event_bus.publish(EngineEvent::new(
1668 "message.part.updated",
1669 json!({"part": result_part}),
1670 ));
1671 Ok(Some(truncate_text(
1672 &format!("Tool `{tool}` result:\n{output}"),
1673 16_000,
1674 )))
1675 }
1676
1677 async fn execute_tool_with_timeout(
1678 &self,
1679 tool: &str,
1680 args: Value,
1681 cancel: CancellationToken,
1682 ) -> anyhow::Result<tandem_types::ToolResult> {
1683 let timeout_ms = tool_exec_timeout_ms() as u64;
1684 match tokio::time::timeout(
1685 Duration::from_millis(timeout_ms),
1686 self.tools.execute_with_cancel(tool, args, cancel),
1687 )
1688 .await
1689 {
1690 Ok(result) => result,
1691 Err(_) => anyhow::bail!("TOOL_EXEC_TIMEOUT_MS_EXCEEDED({timeout_ms})"),
1692 }
1693 }
1694
1695 async fn find_recent_matching_user_message_id(
1696 &self,
1697 session_id: &str,
1698 text: &str,
1699 ) -> Option<String> {
1700 let session = self.storage.get_session(session_id).await?;
1701 let last = session.messages.last()?;
1702 if !matches!(last.role, MessageRole::User) {
1703 return None;
1704 }
1705 let age_ms = (Utc::now() - last.created_at).num_milliseconds().max(0) as u64;
1706 if age_ms > 10_000 {
1707 return None;
1708 }
1709 let last_text = last
1710 .parts
1711 .iter()
1712 .filter_map(|part| match part {
1713 MessagePart::Text { text } => Some(text.clone()),
1714 _ => None,
1715 })
1716 .collect::<Vec<_>>()
1717 .join("\n");
1718 if last_text == text {
1719 return Some(last.id.clone());
1720 }
1721 None
1722 }
1723
1724 async fn auto_rename_session_from_user_text(&self, session_id: &str, fallback_text: &str) {
1725 let Some(mut session) = self.storage.get_session(session_id).await else {
1726 return;
1727 };
1728 if !title_needs_repair(&session.title) {
1729 return;
1730 }
1731
1732 let first_user_text = session.messages.iter().find_map(|message| {
1733 if !matches!(message.role, MessageRole::User) {
1734 return None;
1735 }
1736 message.parts.iter().find_map(|part| match part {
1737 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.clone()),
1738 _ => None,
1739 })
1740 });
1741
1742 let source = first_user_text.unwrap_or_else(|| fallback_text.to_string());
1743 let Some(title) = derive_session_title_from_prompt(&source, 60) else {
1744 return;
1745 };
1746
1747 session.title = title;
1748 session.time.updated = Utc::now();
1749 let _ = self.storage.save_session(session).await;
1750 }
1751
1752 async fn workspace_sandbox_violation(
1753 &self,
1754 session_id: &str,
1755 tool: &str,
1756 args: &Value,
1757 ) -> Option<String> {
1758 if self.workspace_override_active(session_id).await {
1759 return None;
1760 }
1761 let session = self.storage.get_session(session_id).await?;
1762 let workspace = session
1763 .workspace_root
1764 .or_else(|| crate::normalize_workspace_path(&session.directory))?;
1765 let workspace_path = PathBuf::from(&workspace);
1766 let candidate_paths = extract_tool_candidate_paths(tool, args);
1767 if candidate_paths.is_empty() {
1768 if is_shell_tool_name(tool) {
1769 if let Some(command) = extract_shell_command(args) {
1770 if shell_command_targets_sensitive_path(&command) {
1771 return Some(format!(
1772 "Sandbox blocked `{tool}` command targeting sensitive paths."
1773 ));
1774 }
1775 }
1776 }
1777 return None;
1778 }
1779 if let Some(sensitive) = candidate_paths.iter().find(|path| {
1780 let raw = Path::new(path);
1781 let resolved = if raw.is_absolute() {
1782 raw.to_path_buf()
1783 } else {
1784 workspace_path.join(raw)
1785 };
1786 is_sensitive_path_candidate(&resolved)
1787 }) {
1788 return Some(format!(
1789 "Sandbox blocked `{tool}` path `{sensitive}` (sensitive path policy)."
1790 ));
1791 }
1792
1793 let outside = candidate_paths.iter().find(|path| {
1794 let raw = Path::new(path);
1795 let resolved = if raw.is_absolute() {
1796 raw.to_path_buf()
1797 } else {
1798 workspace_path.join(raw)
1799 };
1800 !crate::is_within_workspace_root(&resolved, &workspace_path)
1801 })?;
1802 Some(format!(
1803 "Sandbox blocked `{tool}` path `{outside}` (workspace root: `{workspace}`)"
1804 ))
1805 }
1806
1807 async fn resolve_tool_execution_context(&self, session_id: &str) -> Option<(String, String)> {
1808 let session = self.storage.get_session(session_id).await?;
1809 let workspace_root = session
1810 .workspace_root
1811 .or_else(|| crate::normalize_workspace_path(&session.directory))?;
1812 let effective_cwd = if session.directory.trim().is_empty()
1813 || session.directory.trim() == "."
1814 {
1815 workspace_root.clone()
1816 } else {
1817 crate::normalize_workspace_path(&session.directory).unwrap_or(workspace_root.clone())
1818 };
1819 Some((workspace_root, effective_cwd))
1820 }
1821
1822 async fn workspace_override_active(&self, session_id: &str) -> bool {
1823 let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
1824 let mut overrides = self.workspace_overrides.write().await;
1825 overrides.retain(|_, expires_at| *expires_at > now);
1826 overrides
1827 .get(session_id)
1828 .map(|expires_at| *expires_at > now)
1829 .unwrap_or(false)
1830 }
1831
1832 async fn generate_final_narrative_without_tools(
1833 &self,
1834 session_id: &str,
1835 active_agent: &AgentDefinition,
1836 provider_hint: Option<&str>,
1837 model_id: Option<&str>,
1838 cancel: CancellationToken,
1839 tool_outputs: &[String],
1840 ) -> Option<String> {
1841 if cancel.is_cancelled() {
1842 return None;
1843 }
1844 let mut messages = load_chat_history(
1845 self.storage.clone(),
1846 session_id,
1847 ChatHistoryProfile::Standard,
1848 )
1849 .await;
1850 let mut system_parts = vec![tandem_runtime_system_prompt(&self.host_runtime_context)];
1851 if let Some(system) = active_agent.system_prompt.as_ref() {
1852 system_parts.push(system.clone());
1853 }
1854 messages.insert(
1855 0,
1856 ChatMessage {
1857 role: "system".to_string(),
1858 content: system_parts.join("\n\n"),
1859 attachments: Vec::new(),
1860 },
1861 );
1862 messages.push(ChatMessage {
1863 role: "user".to_string(),
1864 content: format!(
1865 "Tool observations:\n{}\n\nProvide a direct final answer now. Do not call tools.",
1866 summarize_tool_outputs(tool_outputs)
1867 ),
1868 attachments: Vec::new(),
1869 });
1870 let stream = self
1871 .providers
1872 .stream_for_provider(provider_hint, model_id, messages, None, cancel.clone())
1873 .await
1874 .ok()?;
1875 tokio::pin!(stream);
1876 let mut completion = String::new();
1877 while let Some(chunk) = stream.next().await {
1878 if cancel.is_cancelled() {
1879 return None;
1880 }
1881 match chunk {
1882 Ok(StreamChunk::TextDelta(delta)) => {
1883 let delta = strip_model_control_markers(&delta);
1884 if !delta.trim().is_empty() {
1885 completion.push_str(&delta);
1886 }
1887 }
1888 Ok(StreamChunk::Done { .. }) => break,
1889 Ok(_) => {}
1890 Err(_) => return None,
1891 }
1892 }
1893 let completion = truncate_text(&strip_model_control_markers(&completion), 16_000);
1894 if completion.trim().is_empty() {
1895 None
1896 } else {
1897 Some(completion)
1898 }
1899 }
1900}
1901
1902fn resolve_model_route(
1903 request_model: Option<&ModelSpec>,
1904 session_model: Option<&ModelSpec>,
1905) -> Option<(String, String)> {
1906 fn normalize(spec: &ModelSpec) -> Option<(String, String)> {
1907 let provider_id = spec.provider_id.trim();
1908 let model_id = spec.model_id.trim();
1909 if provider_id.is_empty() || model_id.is_empty() {
1910 return None;
1911 }
1912 Some((provider_id.to_string(), model_id.to_string()))
1913 }
1914
1915 request_model
1916 .and_then(normalize)
1917 .or_else(|| session_model.and_then(normalize))
1918}
1919
1920fn strip_model_control_markers(input: &str) -> String {
1921 let mut cleaned = input.to_string();
1922 for marker in ["<|eom|>", "<|eot_id|>", "<|im_end|>", "<|end|>"] {
1923 if cleaned.contains(marker) {
1924 cleaned = cleaned.replace(marker, "");
1925 }
1926 }
1927 cleaned
1928}
1929
1930fn truncate_text(input: &str, max_len: usize) -> String {
1931 if input.len() <= max_len {
1932 return input.to_string();
1933 }
1934 let mut out = input[..max_len].to_string();
1935 out.push_str("...<truncated>");
1936 out
1937}
1938
1939fn provider_error_code(error_text: &str) -> &'static str {
1940 let lower = error_text.to_lowercase();
1941 if lower.contains("invalid_function_parameters")
1942 || lower.contains("array schema missing items")
1943 || lower.contains("tool schema")
1944 {
1945 return "TOOL_SCHEMA_INVALID";
1946 }
1947 if lower.contains("rate limit") || lower.contains("too many requests") || lower.contains("429")
1948 {
1949 return "RATE_LIMIT_EXCEEDED";
1950 }
1951 if lower.contains("context length")
1952 || lower.contains("max tokens")
1953 || lower.contains("token limit")
1954 {
1955 return "CONTEXT_LENGTH_EXCEEDED";
1956 }
1957 if lower.contains("unauthorized")
1958 || lower.contains("authentication")
1959 || lower.contains("401")
1960 || lower.contains("403")
1961 {
1962 return "AUTHENTICATION_ERROR";
1963 }
1964 if lower.contains("timeout") || lower.contains("timed out") {
1965 return "TIMEOUT";
1966 }
1967 if lower.contains("server error")
1968 || lower.contains("500")
1969 || lower.contains("502")
1970 || lower.contains("503")
1971 || lower.contains("504")
1972 {
1973 return "PROVIDER_SERVER_ERROR";
1974 }
1975 "PROVIDER_REQUEST_FAILED"
1976}
1977
1978fn normalize_tool_name(name: &str) -> String {
1979 let mut normalized = name.trim().to_ascii_lowercase().replace('-', "_");
1980 for prefix in [
1981 "default_api:",
1982 "default_api.",
1983 "functions.",
1984 "function.",
1985 "tools.",
1986 "tool.",
1987 "builtin:",
1988 "builtin.",
1989 ] {
1990 if let Some(rest) = normalized.strip_prefix(prefix) {
1991 let trimmed = rest.trim();
1992 if !trimmed.is_empty() {
1993 normalized = trimmed.to_string();
1994 break;
1995 }
1996 }
1997 }
1998 match normalized.as_str() {
1999 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
2000 "run_command" | "shell" | "powershell" | "cmd" => "bash".to_string(),
2001 other => other.to_string(),
2002 }
2003}
2004
2005fn mcp_server_from_tool_name(tool_name: &str) -> Option<&str> {
2006 let mut parts = tool_name.split('.');
2007 let prefix = parts.next()?;
2008 if prefix != "mcp" {
2009 return None;
2010 }
2011 parts.next().filter(|server| !server.is_empty())
2012}
2013
2014fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
2015 let Some(obj) = args.as_object() else {
2016 return Vec::new();
2017 };
2018 let keys: &[&str] = match tool {
2019 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
2020 "glob" => &["pattern"],
2021 "lsp" => &["filePath", "path"],
2022 "bash" => &["cwd"],
2023 "apply_patch" => &[],
2024 _ => &["path", "cwd"],
2025 };
2026 keys.iter()
2027 .filter_map(|key| obj.get(*key))
2028 .filter_map(|value| value.as_str())
2029 .filter(|s| !s.trim().is_empty())
2030 .map(ToString::to_string)
2031 .collect()
2032}
2033
2034fn agent_can_use_tool(agent: &AgentDefinition, tool_name: &str) -> bool {
2035 let target = normalize_tool_name(tool_name);
2036 match agent.tools.as_ref() {
2037 None => true,
2038 Some(list) => {
2039 let normalized = list
2040 .iter()
2041 .map(|t| normalize_tool_name(t))
2042 .collect::<Vec<_>>();
2043 any_policy_matches(&normalized, &target)
2044 }
2045 }
2046}
2047
2048fn enforce_skill_scope(
2049 tool_name: &str,
2050 args: Value,
2051 equipped_skills: Option<&[String]>,
2052) -> Result<Value, String> {
2053 if normalize_tool_name(tool_name) != "skill" {
2054 return Ok(args);
2055 }
2056 let Some(configured) = equipped_skills else {
2057 return Ok(args);
2058 };
2059
2060 let mut allowed = configured
2061 .iter()
2062 .map(|s| s.trim().to_string())
2063 .filter(|s| !s.is_empty())
2064 .collect::<Vec<_>>();
2065 if allowed
2066 .iter()
2067 .any(|s| s == "*" || s.eq_ignore_ascii_case("all"))
2068 {
2069 return Ok(args);
2070 }
2071 allowed.sort();
2072 allowed.dedup();
2073 if allowed.is_empty() {
2074 return Err("No skills are equipped for this agent.".to_string());
2075 }
2076
2077 let requested = args
2078 .get("name")
2079 .and_then(|v| v.as_str())
2080 .map(|v| v.trim().to_string())
2081 .unwrap_or_default();
2082 if !requested.is_empty() && !allowed.iter().any(|s| s == &requested) {
2083 return Err(format!(
2084 "Skill '{}' is not equipped for this agent. Equipped skills: {}",
2085 requested,
2086 allowed.join(", ")
2087 ));
2088 }
2089
2090 let mut out = if let Some(obj) = args.as_object() {
2091 Value::Object(obj.clone())
2092 } else {
2093 json!({})
2094 };
2095 if let Some(obj) = out.as_object_mut() {
2096 obj.insert("allowed_skills".to_string(), json!(allowed));
2097 }
2098 Ok(out)
2099}
2100
2101fn is_read_only_tool(tool_name: &str) -> bool {
2102 matches!(
2103 normalize_tool_name(tool_name).as_str(),
2104 "glob"
2105 | "read"
2106 | "grep"
2107 | "search"
2108 | "codesearch"
2109 | "list"
2110 | "ls"
2111 | "lsp"
2112 | "websearch"
2113 | "webfetch"
2114 | "webfetch_html"
2115 )
2116}
2117
2118fn is_batch_wrapper_tool_name(name: &str) -> bool {
2119 matches!(
2120 normalize_tool_name(name).as_str(),
2121 "default_api" | "default" | "api" | "function" | "functions" | "tool" | "tools"
2122 )
2123}
2124
2125fn non_empty_string_at<'a>(obj: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
2126 obj.get(key)
2127 .and_then(|v| v.as_str())
2128 .map(str::trim)
2129 .filter(|s| !s.is_empty())
2130}
2131
2132fn nested_non_empty_string_at<'a>(
2133 obj: &'a Map<String, Value>,
2134 parent: &str,
2135 key: &str,
2136) -> Option<&'a str> {
2137 obj.get(parent)
2138 .and_then(|v| v.as_object())
2139 .and_then(|nested| nested.get(key))
2140 .and_then(|v| v.as_str())
2141 .map(str::trim)
2142 .filter(|s| !s.is_empty())
2143}
2144
2145fn extract_batch_calls(args: &Value) -> Vec<(String, Value)> {
2146 let calls = args
2147 .get("tool_calls")
2148 .and_then(|v| v.as_array())
2149 .cloned()
2150 .unwrap_or_default();
2151 calls
2152 .into_iter()
2153 .filter_map(|call| {
2154 let obj = call.as_object()?;
2155 let tool_raw = non_empty_string_at(obj, "tool")
2156 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
2157 .or_else(|| nested_non_empty_string_at(obj, "function", "tool"))
2158 .or_else(|| nested_non_empty_string_at(obj, "function_call", "tool"))
2159 .or_else(|| nested_non_empty_string_at(obj, "call", "tool"));
2160 let name_raw = non_empty_string_at(obj, "name")
2161 .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
2162 .or_else(|| nested_non_empty_string_at(obj, "function_call", "name"))
2163 .or_else(|| nested_non_empty_string_at(obj, "call", "name"))
2164 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"));
2165 let effective = match (tool_raw, name_raw) {
2166 (Some(t), Some(n)) if is_batch_wrapper_tool_name(t) => n,
2167 (Some(t), _) => t,
2168 (None, Some(n)) => n,
2169 (None, None) => return None,
2170 };
2171 let normalized = normalize_tool_name(effective);
2172 let call_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
2173 Some((normalized, call_args))
2174 })
2175 .collect()
2176}
2177
2178fn is_read_only_batch_call(args: &Value) -> bool {
2179 let calls = extract_batch_calls(args);
2180 !calls.is_empty() && calls.iter().all(|(tool, _)| is_read_only_tool(tool))
2181}
2182
2183fn batch_tool_signature(args: &Value) -> Option<String> {
2184 let calls = extract_batch_calls(args);
2185 if calls.is_empty() {
2186 return None;
2187 }
2188 let parts = calls
2189 .into_iter()
2190 .map(|(tool, call_args)| tool_signature(&tool, &call_args))
2191 .collect::<Vec<_>>();
2192 Some(format!("batch:{}", parts.join("|")))
2193}
2194
2195fn is_non_productive_batch_output(output: &str) -> bool {
2196 let Ok(value) = serde_json::from_str::<Value>(output.trim()) else {
2197 return false;
2198 };
2199 let Some(items) = value.as_array() else {
2200 return false;
2201 };
2202 if items.is_empty() {
2203 return true;
2204 }
2205 items.iter().all(|item| {
2206 let text = item
2207 .get("output")
2208 .and_then(|v| v.as_str())
2209 .map(str::trim)
2210 .unwrap_or_default()
2211 .to_ascii_lowercase();
2212 text.is_empty()
2213 || text.starts_with("unknown tool:")
2214 || text.contains("call skipped")
2215 || text.contains("guard budget exceeded")
2216 })
2217}
2218
2219fn is_auth_required_tool_output(output: &str) -> bool {
2220 let lower = output.to_ascii_lowercase();
2221 (lower.contains("authorization required")
2222 || lower.contains("requires authorization")
2223 || lower.contains("authorization pending"))
2224 && (lower.contains("authorize here") || lower.contains("http"))
2225}
2226
2227#[derive(Debug, Clone)]
2228struct McpAuthRequiredMetadata {
2229 challenge_id: String,
2230 authorization_url: String,
2231 message: String,
2232 server: Option<String>,
2233 pending: bool,
2234 blocked: bool,
2235 retry_after_ms: Option<u64>,
2236}
2237
2238fn extract_mcp_auth_required_metadata(metadata: &Value) -> Option<McpAuthRequiredMetadata> {
2239 let auth = metadata.get("mcpAuth")?;
2240 if !auth
2241 .get("required")
2242 .and_then(|v| v.as_bool())
2243 .unwrap_or(false)
2244 {
2245 return None;
2246 }
2247 let authorization_url = auth
2248 .get("authorizationUrl")
2249 .and_then(|v| v.as_str())
2250 .map(str::trim)
2251 .filter(|v| !v.is_empty())?
2252 .to_string();
2253 let message = auth
2254 .get("message")
2255 .and_then(|v| v.as_str())
2256 .map(str::trim)
2257 .filter(|v| !v.is_empty())
2258 .unwrap_or("This tool requires authorization before it can run.")
2259 .to_string();
2260 let challenge_id = auth
2261 .get("challengeId")
2262 .and_then(|v| v.as_str())
2263 .map(str::trim)
2264 .filter(|v| !v.is_empty())
2265 .unwrap_or("unknown")
2266 .to_string();
2267 let server = metadata
2268 .get("server")
2269 .and_then(|v| v.as_str())
2270 .map(str::trim)
2271 .filter(|v| !v.is_empty())
2272 .map(ToString::to_string);
2273 let pending = auth
2274 .get("pending")
2275 .and_then(|v| v.as_bool())
2276 .unwrap_or(false);
2277 let blocked = auth
2278 .get("blocked")
2279 .and_then(|v| v.as_bool())
2280 .unwrap_or(false);
2281 let retry_after_ms = auth.get("retryAfterMs").and_then(|v| v.as_u64());
2282 Some(McpAuthRequiredMetadata {
2283 challenge_id,
2284 authorization_url,
2285 message,
2286 server,
2287 pending,
2288 blocked,
2289 retry_after_ms,
2290 })
2291}
2292
2293fn extract_mcp_auth_required_from_error_text(
2294 tool_name: &str,
2295 error_text: &str,
2296) -> Option<McpAuthRequiredMetadata> {
2297 let lower = error_text.to_ascii_lowercase();
2298 let auth_hint = lower.contains("authorization")
2299 || lower.contains("oauth")
2300 || lower.contains("invalid oauth token")
2301 || lower.contains("requires authorization");
2302 if !auth_hint {
2303 return None;
2304 }
2305 let authorization_url = find_first_url(error_text)?;
2306 let challenge_id = stable_hash(&format!("{tool_name}:{authorization_url}"));
2307 let server = tool_name
2308 .strip_prefix("mcp.")
2309 .and_then(|rest| rest.split('.').next())
2310 .filter(|s| !s.is_empty())
2311 .map(ToString::to_string);
2312 Some(McpAuthRequiredMetadata {
2313 challenge_id,
2314 authorization_url,
2315 message: "This integration requires authorization before this action can run.".to_string(),
2316 server,
2317 pending: false,
2318 blocked: false,
2319 retry_after_ms: None,
2320 })
2321}
2322
2323fn summarize_auth_pending_outputs(outputs: &[String]) -> Option<String> {
2324 if outputs.is_empty()
2325 || !outputs
2326 .iter()
2327 .all(|output| is_auth_required_tool_output(output))
2328 {
2329 return None;
2330 }
2331 let mut auth_lines = outputs
2332 .iter()
2333 .filter_map(|output| {
2334 let trimmed = output.trim();
2335 if trimmed.is_empty() {
2336 None
2337 } else {
2338 Some(trimmed.to_string())
2339 }
2340 })
2341 .collect::<Vec<_>>();
2342 auth_lines.sort();
2343 auth_lines.dedup();
2344 if auth_lines.is_empty() {
2345 return None;
2346 }
2347 Some(format!(
2348 "Authorization is required before I can continue with this action.\n\n{}",
2349 auth_lines.join("\n\n")
2350 ))
2351}
2352
2353fn summarize_guard_budget_outputs(outputs: &[String]) -> Option<String> {
2354 if outputs.is_empty()
2355 || !outputs
2356 .iter()
2357 .all(|output| is_guard_budget_tool_output(output))
2358 {
2359 return None;
2360 }
2361 let mut lines = outputs
2362 .iter()
2363 .filter_map(|output| {
2364 let trimmed = output.trim();
2365 if trimmed.is_empty() {
2366 None
2367 } else {
2368 Some(trimmed.to_string())
2369 }
2370 })
2371 .collect::<Vec<_>>();
2372 lines.sort();
2373 lines.dedup();
2374 if lines.is_empty() {
2375 return None;
2376 }
2377 Some(format!(
2378 "This run hit the per-run tool guard budget, so I paused tool execution to avoid runaway retries.\n\n{}\n\nSend a new message to start a fresh run.",
2379 lines.join("\n")
2380 ))
2381}
2382
2383fn summarize_duplicate_signature_outputs(outputs: &[String]) -> Option<String> {
2384 if outputs.is_empty()
2385 || !outputs
2386 .iter()
2387 .all(|output| is_duplicate_signature_limit_output(output))
2388 {
2389 return None;
2390 }
2391 let mut lines = outputs
2392 .iter()
2393 .filter_map(|output| {
2394 let trimmed = output.trim();
2395 if trimmed.is_empty() {
2396 None
2397 } else {
2398 Some(trimmed.to_string())
2399 }
2400 })
2401 .collect::<Vec<_>>();
2402 lines.sort();
2403 lines.dedup();
2404 if lines.is_empty() {
2405 return None;
2406 }
2407 Some(format!(
2408 "This run paused because the same tool call kept repeating.\n\n{}\n\nRephrase the request or start a new message with a clearer command target.",
2409 lines.join("\n")
2410 ))
2411}
2412
2413fn find_first_url(text: &str) -> Option<String> {
2414 text.split_whitespace().find_map(|token| {
2415 if token.starts_with("https://") || token.starts_with("http://") {
2416 let cleaned = token.trim_end_matches(&[')', ']', '}', '"', '\'', ',', '.'][..]);
2417 if cleaned.len() > "https://".len() {
2418 return Some(cleaned.to_string());
2419 }
2420 }
2421 None
2422 })
2423}
2424
2425fn tool_budget_for(tool_name: &str) -> usize {
2426 if env_budget_guards_disabled() {
2427 return usize::MAX;
2428 }
2429 let normalized = normalize_tool_name(tool_name);
2430 let (default_budget, env_key) = match normalized.as_str() {
2431 "glob" => (4usize, "TANDEM_TOOL_BUDGET_GLOB"),
2432 "read" => (8usize, "TANDEM_TOOL_BUDGET_READ"),
2433 "websearch" => (8usize, "TANDEM_TOOL_BUDGET_WEBSEARCH"),
2434 "batch" => (10usize, "TANDEM_TOOL_BUDGET_BATCH"),
2435 "grep" | "search" | "codesearch" => (6usize, "TANDEM_TOOL_BUDGET_SEARCH"),
2436 _ => (10usize, "TANDEM_TOOL_BUDGET_DEFAULT"),
2437 };
2438 if let Some(override_budget) = parse_budget_override(env_key) {
2439 return override_budget;
2440 }
2441 default_budget
2442}
2443
2444fn max_tool_iterations() -> usize {
2445 let default_iterations = 25usize;
2446 std::env::var("TANDEM_MAX_TOOL_ITERATIONS")
2447 .ok()
2448 .and_then(|raw| raw.trim().parse::<usize>().ok())
2449 .filter(|value| *value > 0)
2450 .unwrap_or(default_iterations)
2451}
2452
2453fn provider_stream_connect_timeout_ms() -> usize {
2454 std::env::var("TANDEM_PROVIDER_STREAM_CONNECT_TIMEOUT_MS")
2455 .ok()
2456 .and_then(|raw| raw.trim().parse::<usize>().ok())
2457 .filter(|value| *value > 0)
2458 .unwrap_or(30_000)
2459}
2460
2461fn provider_stream_idle_timeout_ms() -> usize {
2462 std::env::var("TANDEM_PROVIDER_STREAM_IDLE_TIMEOUT_MS")
2463 .ok()
2464 .and_then(|raw| raw.trim().parse::<usize>().ok())
2465 .filter(|value| *value > 0)
2466 .unwrap_or(90_000)
2467}
2468
2469fn prompt_context_hook_timeout_ms() -> usize {
2470 std::env::var("TANDEM_PROMPT_CONTEXT_HOOK_TIMEOUT_MS")
2471 .ok()
2472 .and_then(|raw| raw.trim().parse::<usize>().ok())
2473 .filter(|value| *value > 0)
2474 .unwrap_or(5_000)
2475}
2476
2477fn permission_wait_timeout_ms() -> usize {
2478 std::env::var("TANDEM_PERMISSION_WAIT_TIMEOUT_MS")
2479 .ok()
2480 .and_then(|raw| raw.trim().parse::<usize>().ok())
2481 .filter(|value| *value > 0)
2482 .unwrap_or(15_000)
2483}
2484
2485fn tool_exec_timeout_ms() -> usize {
2486 std::env::var("TANDEM_TOOL_EXEC_TIMEOUT_MS")
2487 .ok()
2488 .and_then(|raw| raw.trim().parse::<usize>().ok())
2489 .filter(|value| *value > 0)
2490 .unwrap_or(45_000)
2491}
2492
2493fn duplicate_signature_limit_for(tool_name: &str) -> usize {
2494 if let Ok(raw) = std::env::var("TANDEM_TOOL_LOOP_DUPLICATE_SIGNATURE_LIMIT") {
2495 if let Ok(parsed) = raw.trim().parse::<usize>() {
2496 if parsed > 0 {
2497 return parsed;
2498 }
2499 }
2500 }
2501 if is_shell_tool_name(tool_name) {
2502 2
2503 } else {
2504 3
2505 }
2506}
2507
2508fn env_budget_guards_disabled() -> bool {
2509 std::env::var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS")
2510 .ok()
2511 .map(|raw| {
2512 matches!(
2513 raw.trim().to_ascii_lowercase().as_str(),
2514 "1" | "true" | "yes" | "on"
2515 )
2516 })
2517 .unwrap_or(false)
2518}
2519
2520fn parse_budget_override(env_key: &str) -> Option<usize> {
2521 let raw = std::env::var(env_key).ok()?;
2522 let trimmed = raw.trim().to_ascii_lowercase();
2523 if matches!(
2524 trimmed.as_str(),
2525 "0" | "inf" | "infinite" | "unlimited" | "none"
2526 ) {
2527 return Some(usize::MAX);
2528 }
2529 trimmed
2530 .parse::<usize>()
2531 .ok()
2532 .and_then(|value| if value > 0 { Some(value) } else { None })
2533}
2534
2535fn is_guard_budget_tool_output(output: &str) -> bool {
2536 output
2537 .to_ascii_lowercase()
2538 .contains("per-run guard budget exceeded")
2539}
2540
2541fn is_duplicate_signature_limit_output(output: &str) -> bool {
2542 output
2543 .to_ascii_lowercase()
2544 .contains("duplicate call signature retry limit reached")
2545}
2546
2547fn is_sensitive_path_candidate(path: &Path) -> bool {
2548 let lowered = path.to_string_lossy().to_ascii_lowercase();
2549 if lowered.contains("/.ssh/")
2550 || lowered.ends_with("/.ssh")
2551 || lowered.contains("/.gnupg/")
2552 || lowered.ends_with("/.gnupg")
2553 {
2554 return true;
2555 }
2556 if lowered.contains("/.aws/credentials")
2557 || lowered.ends_with("/.npmrc")
2558 || lowered.ends_with("/.netrc")
2559 || lowered.ends_with("/.pypirc")
2560 {
2561 return true;
2562 }
2563 if lowered.contains("id_rsa")
2564 || lowered.contains("id_ed25519")
2565 || lowered.contains("id_ecdsa")
2566 || lowered.contains(".pem")
2567 || lowered.contains(".p12")
2568 || lowered.contains(".pfx")
2569 || lowered.contains(".key")
2570 {
2571 return true;
2572 }
2573 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
2574 let n = name.to_ascii_lowercase();
2575 if n == ".env" || n.starts_with(".env.") {
2576 return true;
2577 }
2578 }
2579 false
2580}
2581
2582fn shell_command_targets_sensitive_path(command: &str) -> bool {
2583 let lower = command.to_ascii_lowercase();
2584 let patterns = [
2585 ".env",
2586 ".ssh",
2587 ".gnupg",
2588 ".aws/credentials",
2589 "id_rsa",
2590 "id_ed25519",
2591 ".pem",
2592 ".p12",
2593 ".pfx",
2594 ".key",
2595 ];
2596 patterns.iter().any(|p| lower.contains(p))
2597}
2598
2599#[derive(Debug, Clone)]
2600struct NormalizedToolArgs {
2601 args: Value,
2602 args_source: String,
2603 args_integrity: String,
2604 query: Option<String>,
2605 missing_terminal: bool,
2606 missing_terminal_reason: Option<String>,
2607}
2608
2609fn normalize_tool_args(
2610 tool_name: &str,
2611 raw_args: Value,
2612 latest_user_text: &str,
2613 latest_assistant_context: &str,
2614) -> NormalizedToolArgs {
2615 let normalized_tool = normalize_tool_name(tool_name);
2616 let mut args = raw_args;
2617 let mut args_source = if args.is_string() {
2618 "provider_string".to_string()
2619 } else {
2620 "provider_json".to_string()
2621 };
2622 let mut args_integrity = "ok".to_string();
2623 let mut query = None;
2624 let mut missing_terminal = false;
2625 let mut missing_terminal_reason = None;
2626
2627 if normalized_tool == "websearch" {
2628 if let Some(found) = extract_websearch_query(&args) {
2629 query = Some(found);
2630 args = set_websearch_query_and_source(args, query.clone(), "tool_args");
2631 } else if let Some(inferred) = infer_websearch_query_from_text(latest_user_text) {
2632 args_source = "inferred_from_user".to_string();
2633 args_integrity = "recovered".to_string();
2634 query = Some(inferred);
2635 args = set_websearch_query_and_source(args, query.clone(), "inferred_from_user");
2636 } else if let Some(recovered) = infer_websearch_query_from_text(latest_assistant_context) {
2637 args_source = "recovered_from_context".to_string();
2638 args_integrity = "recovered".to_string();
2639 query = Some(recovered);
2640 args = set_websearch_query_and_source(args, query.clone(), "recovered_from_context");
2641 } else {
2642 args_source = "missing".to_string();
2643 args_integrity = "empty".to_string();
2644 missing_terminal = true;
2645 missing_terminal_reason = Some("WEBSEARCH_QUERY_MISSING".to_string());
2646 }
2647 } else if is_shell_tool_name(&normalized_tool) {
2648 if let Some(command) = extract_shell_command(&args) {
2649 args = set_shell_command(args, command);
2650 } else if let Some(inferred) = infer_shell_command_from_text(latest_assistant_context) {
2651 args_source = "inferred_from_context".to_string();
2652 args_integrity = "recovered".to_string();
2653 args = set_shell_command(args, inferred);
2654 } else if let Some(inferred) = infer_shell_command_from_text(latest_user_text) {
2655 args_source = "inferred_from_user".to_string();
2656 args_integrity = "recovered".to_string();
2657 args = set_shell_command(args, inferred);
2658 } else {
2659 args_source = "missing".to_string();
2660 args_integrity = "empty".to_string();
2661 missing_terminal = true;
2662 missing_terminal_reason = Some("BASH_COMMAND_MISSING".to_string());
2663 }
2664 } else if matches!(normalized_tool.as_str(), "read" | "write" | "edit") {
2665 if let Some(path) = extract_file_path_arg(&args) {
2666 args = set_file_path_arg(args, path);
2667 } else if let Some(inferred) = infer_file_path_from_text(latest_user_text) {
2668 args_source = "inferred_from_user".to_string();
2669 args_integrity = "recovered".to_string();
2670 args = set_file_path_arg(args, inferred);
2671 } else {
2672 args_source = "missing".to_string();
2673 args_integrity = "empty".to_string();
2674 missing_terminal = true;
2675 missing_terminal_reason = Some("FILE_PATH_MISSING".to_string());
2676 }
2677
2678 if !missing_terminal && normalized_tool == "write" {
2679 if let Some(content) = extract_write_content_arg(&args) {
2680 args = set_write_content_arg(args, content);
2681 } else {
2682 args_source = "missing".to_string();
2683 args_integrity = "empty".to_string();
2684 missing_terminal = true;
2685 missing_terminal_reason = Some("WRITE_CONTENT_MISSING".to_string());
2686 }
2687 }
2688 } else if matches!(normalized_tool.as_str(), "webfetch" | "webfetch_html") {
2689 if let Some(url) = extract_webfetch_url_arg(&args) {
2690 args = set_webfetch_url_arg(args, url);
2691 } else if let Some(inferred) = infer_url_from_text(latest_assistant_context) {
2692 args_source = "inferred_from_context".to_string();
2693 args_integrity = "recovered".to_string();
2694 args = set_webfetch_url_arg(args, inferred);
2695 } else if let Some(inferred) = infer_url_from_text(latest_user_text) {
2696 args_source = "inferred_from_user".to_string();
2697 args_integrity = "recovered".to_string();
2698 args = set_webfetch_url_arg(args, inferred);
2699 } else {
2700 args_source = "missing".to_string();
2701 args_integrity = "empty".to_string();
2702 missing_terminal = true;
2703 missing_terminal_reason = Some("WEBFETCH_URL_MISSING".to_string());
2704 }
2705 }
2706
2707 NormalizedToolArgs {
2708 args,
2709 args_source,
2710 args_integrity,
2711 query,
2712 missing_terminal,
2713 missing_terminal_reason,
2714 }
2715}
2716
2717fn is_shell_tool_name(tool_name: &str) -> bool {
2718 matches!(
2719 tool_name.trim().to_ascii_lowercase().as_str(),
2720 "bash" | "shell" | "powershell" | "cmd"
2721 )
2722}
2723
2724fn set_file_path_arg(args: Value, path: String) -> Value {
2725 let mut obj = args.as_object().cloned().unwrap_or_default();
2726 obj.insert("path".to_string(), Value::String(path));
2727 Value::Object(obj)
2728}
2729
2730fn set_write_content_arg(args: Value, content: String) -> Value {
2731 let mut obj = args.as_object().cloned().unwrap_or_default();
2732 obj.insert("content".to_string(), Value::String(content));
2733 Value::Object(obj)
2734}
2735
2736fn extract_file_path_arg(args: &Value) -> Option<String> {
2737 extract_file_path_arg_internal(args, 0)
2738}
2739
2740fn extract_write_content_arg(args: &Value) -> Option<String> {
2741 extract_write_content_arg_internal(args, 0)
2742}
2743
2744fn extract_file_path_arg_internal(args: &Value, depth: usize) -> Option<String> {
2745 if depth > 5 {
2746 return None;
2747 }
2748
2749 match args {
2750 Value::String(raw) => {
2751 let trimmed = raw.trim();
2752 if trimmed.is_empty() {
2753 return None;
2754 }
2755 if !(trimmed.starts_with('{') || trimmed.starts_with('[') || trimmed.starts_with('"')) {
2757 return sanitize_path_candidate(trimmed);
2758 }
2759 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
2760 return extract_file_path_arg_internal(&parsed, depth + 1);
2761 }
2762 sanitize_path_candidate(trimmed)
2763 }
2764 Value::Array(items) => items
2765 .iter()
2766 .find_map(|item| extract_file_path_arg_internal(item, depth + 1)),
2767 Value::Object(obj) => {
2768 for key in FILE_PATH_KEYS {
2769 if let Some(raw) = obj.get(key).and_then(|v| v.as_str()) {
2770 if let Some(path) = sanitize_path_candidate(raw) {
2771 return Some(path);
2772 }
2773 }
2774 }
2775 for container in NESTED_ARGS_KEYS {
2776 if let Some(nested) = obj.get(container) {
2777 if let Some(path) = extract_file_path_arg_internal(nested, depth + 1) {
2778 return Some(path);
2779 }
2780 }
2781 }
2782 None
2783 }
2784 _ => None,
2785 }
2786}
2787
2788fn extract_write_content_arg_internal(args: &Value, depth: usize) -> Option<String> {
2789 if depth > 5 {
2790 return None;
2791 }
2792
2793 match args {
2794 Value::String(raw) => {
2795 let trimmed = raw.trim();
2796 if trimmed.is_empty() {
2797 return None;
2798 }
2799 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
2800 return extract_write_content_arg_internal(&parsed, depth + 1);
2801 }
2802 if sanitize_path_candidate(trimmed).is_some()
2805 && !trimmed.contains('\n')
2806 && trimmed.split_whitespace().count() <= 3
2807 {
2808 return None;
2809 }
2810 Some(trimmed.to_string())
2811 }
2812 Value::Array(items) => items
2813 .iter()
2814 .find_map(|item| extract_write_content_arg_internal(item, depth + 1)),
2815 Value::Object(obj) => {
2816 for key in WRITE_CONTENT_KEYS {
2817 if let Some(value) = obj.get(key) {
2818 if let Some(raw) = value.as_str() {
2819 if !raw.is_empty() {
2820 return Some(raw.to_string());
2821 }
2822 } else if let Some(recovered) =
2823 extract_write_content_arg_internal(value, depth + 1)
2824 {
2825 return Some(recovered);
2826 }
2827 }
2828 }
2829 for container in NESTED_ARGS_KEYS {
2830 if let Some(nested) = obj.get(container) {
2831 if let Some(content) = extract_write_content_arg_internal(nested, depth + 1) {
2832 return Some(content);
2833 }
2834 }
2835 }
2836 None
2837 }
2838 _ => None,
2839 }
2840}
2841
2842fn set_shell_command(args: Value, command: String) -> Value {
2843 let mut obj = args.as_object().cloned().unwrap_or_default();
2844 obj.insert("command".to_string(), Value::String(command));
2845 Value::Object(obj)
2846}
2847
2848fn extract_shell_command(args: &Value) -> Option<String> {
2849 extract_shell_command_internal(args, 0)
2850}
2851
2852fn extract_shell_command_internal(args: &Value, depth: usize) -> Option<String> {
2853 if depth > 5 {
2854 return None;
2855 }
2856
2857 match args {
2858 Value::String(raw) => {
2859 let trimmed = raw.trim();
2860 if trimmed.is_empty() {
2861 return None;
2862 }
2863 if !(trimmed.starts_with('{') || trimmed.starts_with('[') || trimmed.starts_with('"')) {
2864 return sanitize_shell_command_candidate(trimmed);
2865 }
2866 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
2867 return extract_shell_command_internal(&parsed, depth + 1);
2868 }
2869 sanitize_shell_command_candidate(trimmed)
2870 }
2871 Value::Array(items) => items
2872 .iter()
2873 .find_map(|item| extract_shell_command_internal(item, depth + 1)),
2874 Value::Object(obj) => {
2875 for key in SHELL_COMMAND_KEYS {
2876 if let Some(raw) = obj.get(key).and_then(|v| v.as_str()) {
2877 if let Some(command) = sanitize_shell_command_candidate(raw) {
2878 return Some(command);
2879 }
2880 }
2881 }
2882 for container in NESTED_ARGS_KEYS {
2883 if let Some(nested) = obj.get(container) {
2884 if let Some(command) = extract_shell_command_internal(nested, depth + 1) {
2885 return Some(command);
2886 }
2887 }
2888 }
2889 None
2890 }
2891 _ => None,
2892 }
2893}
2894
2895fn infer_shell_command_from_text(text: &str) -> Option<String> {
2896 let trimmed = text.trim();
2897 if trimmed.is_empty() {
2898 return None;
2899 }
2900
2901 let mut in_tick = false;
2903 let mut tick_buf = String::new();
2904 for ch in trimmed.chars() {
2905 if ch == '`' {
2906 if in_tick {
2907 if let Some(candidate) = sanitize_shell_command_candidate(&tick_buf) {
2908 if looks_like_shell_command(&candidate) {
2909 return Some(candidate);
2910 }
2911 }
2912 tick_buf.clear();
2913 }
2914 in_tick = !in_tick;
2915 continue;
2916 }
2917 if in_tick {
2918 tick_buf.push(ch);
2919 }
2920 }
2921
2922 for line in trimmed.lines() {
2923 let line = line.trim();
2924 if line.is_empty() {
2925 continue;
2926 }
2927 let lower = line.to_ascii_lowercase();
2928 for prefix in [
2929 "run ",
2930 "execute ",
2931 "call ",
2932 "use bash ",
2933 "use shell ",
2934 "bash ",
2935 "shell ",
2936 "powershell ",
2937 "pwsh ",
2938 ] {
2939 if lower.starts_with(prefix) {
2940 let candidate = line[prefix.len()..].trim();
2941 if let Some(command) = sanitize_shell_command_candidate(candidate) {
2942 if looks_like_shell_command(&command) {
2943 return Some(command);
2944 }
2945 }
2946 }
2947 }
2948 }
2949
2950 None
2951}
2952
2953fn set_websearch_query_and_source(args: Value, query: Option<String>, query_source: &str) -> Value {
2954 let mut obj = args.as_object().cloned().unwrap_or_default();
2955 if let Some(q) = query {
2956 obj.insert("query".to_string(), Value::String(q));
2957 }
2958 obj.insert(
2959 "__query_source".to_string(),
2960 Value::String(query_source.to_string()),
2961 );
2962 Value::Object(obj)
2963}
2964
2965fn set_webfetch_url_arg(args: Value, url: String) -> Value {
2966 let mut obj = args.as_object().cloned().unwrap_or_default();
2967 obj.insert("url".to_string(), Value::String(url));
2968 Value::Object(obj)
2969}
2970
2971fn extract_webfetch_url_arg(args: &Value) -> Option<String> {
2972 const URL_KEYS: [&str; 5] = ["url", "uri", "link", "href", "target_url"];
2973 for key in URL_KEYS {
2974 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
2975 if let Some(url) = sanitize_url_candidate(value) {
2976 return Some(url);
2977 }
2978 }
2979 }
2980 for container in ["arguments", "args", "input", "params"] {
2981 if let Some(obj) = args.get(container) {
2982 for key in URL_KEYS {
2983 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
2984 if let Some(url) = sanitize_url_candidate(value) {
2985 return Some(url);
2986 }
2987 }
2988 }
2989 }
2990 }
2991 args.as_str().and_then(sanitize_url_candidate)
2992}
2993
2994fn extract_websearch_query(args: &Value) -> Option<String> {
2995 const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
2996 for key in QUERY_KEYS {
2997 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
2998 if let Some(query) = sanitize_websearch_query_candidate(value) {
2999 return Some(query);
3000 }
3001 }
3002 }
3003 for container in ["arguments", "args", "input", "params"] {
3004 if let Some(obj) = args.get(container) {
3005 for key in QUERY_KEYS {
3006 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
3007 if let Some(query) = sanitize_websearch_query_candidate(value) {
3008 return Some(query);
3009 }
3010 }
3011 }
3012 }
3013 }
3014 args.as_str().and_then(sanitize_websearch_query_candidate)
3015}
3016
3017fn sanitize_websearch_query_candidate(raw: &str) -> Option<String> {
3018 let trimmed = raw.trim();
3019 if trimmed.is_empty() {
3020 return None;
3021 }
3022
3023 let lower = trimmed.to_ascii_lowercase();
3024 if let Some(start) = lower.find("<arg_value>") {
3025 let value_start = start + "<arg_value>".len();
3026 let tail = &trimmed[value_start..];
3027 let value = if let Some(end) = tail.to_ascii_lowercase().find("</arg_value>") {
3028 &tail[..end]
3029 } else {
3030 tail
3031 };
3032 let cleaned = value.trim();
3033 if !cleaned.is_empty() {
3034 return Some(cleaned.to_string());
3035 }
3036 }
3037
3038 let without_wrappers = trimmed
3039 .replace("<arg_key>", " ")
3040 .replace("</arg_key>", " ")
3041 .replace("<arg_value>", " ")
3042 .replace("</arg_value>", " ");
3043 let collapsed = without_wrappers
3044 .split_whitespace()
3045 .collect::<Vec<_>>()
3046 .join(" ");
3047 if collapsed.is_empty() {
3048 return None;
3049 }
3050
3051 let collapsed_lower = collapsed.to_ascii_lowercase();
3052 if let Some(rest) = collapsed_lower.strip_prefix("websearch query ") {
3053 let offset = collapsed.len() - rest.len();
3054 let q = collapsed[offset..].trim();
3055 if !q.is_empty() {
3056 return Some(q.to_string());
3057 }
3058 }
3059 if let Some(rest) = collapsed_lower.strip_prefix("query ") {
3060 let offset = collapsed.len() - rest.len();
3061 let q = collapsed[offset..].trim();
3062 if !q.is_empty() {
3063 return Some(q.to_string());
3064 }
3065 }
3066
3067 Some(collapsed)
3068}
3069
3070fn infer_websearch_query_from_text(text: &str) -> Option<String> {
3071 let trimmed = text.trim();
3072 if trimmed.is_empty() {
3073 return None;
3074 }
3075
3076 let lower = trimmed.to_lowercase();
3077 const PREFIXES: [&str; 11] = [
3078 "web search",
3079 "websearch",
3080 "search web for",
3081 "search web",
3082 "search for",
3083 "search",
3084 "look up",
3085 "lookup",
3086 "find",
3087 "web lookup",
3088 "query",
3089 ];
3090
3091 let mut candidate = trimmed;
3092 for prefix in PREFIXES {
3093 if lower.starts_with(prefix) && lower.len() >= prefix.len() {
3094 let remainder = trimmed[prefix.len()..]
3095 .trim_start_matches(|c: char| c.is_whitespace() || c == ':' || c == '-');
3096 candidate = remainder;
3097 break;
3098 }
3099 }
3100
3101 let normalized = candidate
3102 .trim()
3103 .trim_matches(|c: char| c == '"' || c == '\'' || c.is_whitespace())
3104 .trim_matches(|c: char| matches!(c, '.' | ',' | '!' | '?'))
3105 .trim()
3106 .to_string();
3107
3108 if normalized.split_whitespace().count() < 2 {
3109 return None;
3110 }
3111 Some(normalized)
3112}
3113
3114fn infer_file_path_from_text(text: &str) -> Option<String> {
3115 let trimmed = text.trim();
3116 if trimmed.is_empty() {
3117 return None;
3118 }
3119
3120 let mut candidates: Vec<String> = Vec::new();
3121
3122 let mut in_tick = false;
3124 let mut tick_buf = String::new();
3125 for ch in trimmed.chars() {
3126 if ch == '`' {
3127 if in_tick {
3128 let cand = sanitize_path_candidate(&tick_buf);
3129 if let Some(path) = cand {
3130 candidates.push(path);
3131 }
3132 tick_buf.clear();
3133 }
3134 in_tick = !in_tick;
3135 continue;
3136 }
3137 if in_tick {
3138 tick_buf.push(ch);
3139 }
3140 }
3141
3142 for raw in trimmed.split_whitespace() {
3144 if let Some(path) = sanitize_path_candidate(raw) {
3145 candidates.push(path);
3146 }
3147 }
3148
3149 let mut deduped = Vec::new();
3150 let mut seen = HashSet::new();
3151 for candidate in candidates {
3152 if seen.insert(candidate.clone()) {
3153 deduped.push(candidate);
3154 }
3155 }
3156
3157 deduped.into_iter().next()
3158}
3159
3160fn infer_url_from_text(text: &str) -> Option<String> {
3161 let trimmed = text.trim();
3162 if trimmed.is_empty() {
3163 return None;
3164 }
3165
3166 let mut candidates: Vec<String> = Vec::new();
3167
3168 let mut in_tick = false;
3170 let mut tick_buf = String::new();
3171 for ch in trimmed.chars() {
3172 if ch == '`' {
3173 if in_tick {
3174 if let Some(url) = sanitize_url_candidate(&tick_buf) {
3175 candidates.push(url);
3176 }
3177 tick_buf.clear();
3178 }
3179 in_tick = !in_tick;
3180 continue;
3181 }
3182 if in_tick {
3183 tick_buf.push(ch);
3184 }
3185 }
3186
3187 for raw in trimmed.split_whitespace() {
3189 if let Some(url) = sanitize_url_candidate(raw) {
3190 candidates.push(url);
3191 }
3192 }
3193
3194 let mut seen = HashSet::new();
3195 candidates
3196 .into_iter()
3197 .find(|candidate| seen.insert(candidate.clone()))
3198}
3199
3200fn sanitize_url_candidate(raw: &str) -> Option<String> {
3201 let token = raw
3202 .trim()
3203 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | '*' | '|'))
3204 .trim_start_matches(['(', '[', '{', '<'])
3205 .trim_end_matches([',', ';', ':', ')', ']', '}', '>'])
3206 .trim_end_matches('.')
3207 .trim();
3208
3209 if token.is_empty() {
3210 return None;
3211 }
3212 let lower = token.to_ascii_lowercase();
3213 if !(lower.starts_with("http://") || lower.starts_with("https://")) {
3214 return None;
3215 }
3216 Some(token.to_string())
3217}
3218
3219fn sanitize_path_candidate(raw: &str) -> Option<String> {
3220 let token = raw
3221 .trim()
3222 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | '*' | '|'))
3223 .trim_start_matches(['(', '[', '{', '<'])
3224 .trim_end_matches([',', ';', ':', ')', ']', '}', '>'])
3225 .trim_end_matches('.')
3226 .trim();
3227
3228 if token.is_empty() {
3229 return None;
3230 }
3231 let lower = token.to_ascii_lowercase();
3232 if lower.starts_with("http://") || lower.starts_with("https://") {
3233 return None;
3234 }
3235 if is_malformed_tool_path_token(token) {
3236 return None;
3237 }
3238 if is_root_only_path_token(token) {
3239 return None;
3240 }
3241 if is_placeholder_path_token(token) {
3242 return None;
3243 }
3244
3245 let looks_like_path = token.contains('/') || token.contains('\\');
3246 let has_file_ext = [
3247 ".md", ".txt", ".json", ".yaml", ".yml", ".toml", ".rs", ".ts", ".tsx", ".js", ".jsx",
3248 ".py", ".go", ".java", ".cpp", ".c", ".h", ".pdf", ".docx", ".pptx", ".xlsx", ".rtf",
3249 ]
3250 .iter()
3251 .any(|ext| lower.ends_with(ext));
3252
3253 if !looks_like_path && !has_file_ext {
3254 return None;
3255 }
3256
3257 Some(token.to_string())
3258}
3259
3260fn is_placeholder_path_token(token: &str) -> bool {
3261 let lowered = token.trim().to_ascii_lowercase();
3262 if lowered.is_empty() {
3263 return true;
3264 }
3265 matches!(
3266 lowered.as_str(),
3267 "files/directories"
3268 | "file/directory"
3269 | "relative/or/absolute/path"
3270 | "path/to/file"
3271 | "path/to/your/file"
3272 | "tool/policy"
3273 | "tools/policy"
3274 | "the expected artifact file"
3275 | "workspace/file"
3276 )
3277}
3278
3279fn is_malformed_tool_path_token(token: &str) -> bool {
3280 let lower = token.to_ascii_lowercase();
3281 if lower.contains("<tool_call")
3283 || lower.contains("</tool_call")
3284 || lower.contains("<function=")
3285 || lower.contains("<parameter=")
3286 || lower.contains("</function>")
3287 || lower.contains("</parameter>")
3288 {
3289 return true;
3290 }
3291 if token.contains('\n') || token.contains('\r') {
3293 return true;
3294 }
3295 if token.contains('*') || token.contains('?') {
3297 return true;
3298 }
3299 false
3300}
3301
3302fn is_root_only_path_token(token: &str) -> bool {
3303 let trimmed = token.trim();
3304 if trimmed.is_empty() {
3305 return true;
3306 }
3307 if matches!(trimmed, "/" | "\\" | "." | ".." | "~") {
3308 return true;
3309 }
3310 let bytes = trimmed.as_bytes();
3312 if bytes.len() == 2 && bytes[1] == b':' && (bytes[0] as char).is_ascii_alphabetic() {
3313 return true;
3314 }
3315 if bytes.len() == 3
3316 && bytes[1] == b':'
3317 && (bytes[0] as char).is_ascii_alphabetic()
3318 && (bytes[2] == b'\\' || bytes[2] == b'/')
3319 {
3320 return true;
3321 }
3322 false
3323}
3324
3325fn sanitize_shell_command_candidate(raw: &str) -> Option<String> {
3326 let token = raw
3327 .trim()
3328 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | ',' | ';'))
3329 .trim();
3330 if token.is_empty() {
3331 return None;
3332 }
3333 Some(token.to_string())
3334}
3335
3336fn looks_like_shell_command(candidate: &str) -> bool {
3337 let lower = candidate.to_ascii_lowercase();
3338 if lower.is_empty() {
3339 return false;
3340 }
3341 let first = lower.split_whitespace().next().unwrap_or_default();
3342 let common = [
3343 "rg",
3344 "git",
3345 "cargo",
3346 "pnpm",
3347 "npm",
3348 "node",
3349 "python",
3350 "pytest",
3351 "pwsh",
3352 "powershell",
3353 "cmd",
3354 "dir",
3355 "ls",
3356 "cat",
3357 "type",
3358 "echo",
3359 "cd",
3360 "mkdir",
3361 "cp",
3362 "copy",
3363 "move",
3364 "del",
3365 "rm",
3366 ];
3367 common.contains(&first)
3368 || first.starts_with("get-")
3369 || first.starts_with("./")
3370 || first.starts_with(".\\")
3371 || lower.contains(" | ")
3372 || lower.contains(" && ")
3373 || lower.contains(" ; ")
3374}
3375
3376const FILE_PATH_KEYS: [&str; 10] = [
3377 "path",
3378 "file_path",
3379 "filePath",
3380 "filepath",
3381 "filename",
3382 "file",
3383 "target",
3384 "targetFile",
3385 "absolutePath",
3386 "uri",
3387];
3388
3389const SHELL_COMMAND_KEYS: [&str; 4] = ["command", "cmd", "script", "line"];
3390
3391const WRITE_CONTENT_KEYS: [&str; 8] = [
3392 "content",
3393 "text",
3394 "body",
3395 "value",
3396 "markdown",
3397 "document",
3398 "output",
3399 "file_content",
3400];
3401
3402const NESTED_ARGS_KEYS: [&str; 10] = [
3403 "arguments",
3404 "args",
3405 "input",
3406 "params",
3407 "payload",
3408 "data",
3409 "tool_input",
3410 "toolInput",
3411 "tool_args",
3412 "toolArgs",
3413];
3414
3415fn tool_signature(tool_name: &str, args: &Value) -> String {
3416 let normalized = normalize_tool_name(tool_name);
3417 if normalized == "websearch" {
3418 let query = extract_websearch_query(args)
3419 .unwrap_or_default()
3420 .to_lowercase();
3421 let limit = args
3422 .get("limit")
3423 .or_else(|| args.get("numResults"))
3424 .or_else(|| args.get("num_results"))
3425 .and_then(|v| v.as_u64())
3426 .unwrap_or(8);
3427 let domains = args
3428 .get("domains")
3429 .or_else(|| args.get("domain"))
3430 .map(|v| v.to_string())
3431 .unwrap_or_default();
3432 let recency = args.get("recency").and_then(|v| v.as_u64()).unwrap_or(0);
3433 return format!("websearch:q={query}|limit={limit}|domains={domains}|recency={recency}");
3434 }
3435 format!("{}:{}", normalized, args)
3436}
3437
3438fn stable_hash(input: &str) -> String {
3439 let mut hasher = DefaultHasher::new();
3440 input.hash(&mut hasher);
3441 format!("{:016x}", hasher.finish())
3442}
3443
3444fn summarize_tool_outputs(outputs: &[String]) -> String {
3445 outputs
3446 .iter()
3447 .take(6)
3448 .map(|output| truncate_text(output, 600))
3449 .collect::<Vec<_>>()
3450 .join("\n\n")
3451}
3452
3453fn is_os_mismatch_tool_output(output: &str) -> bool {
3454 let lower = output.to_ascii_lowercase();
3455 lower.contains("os error 3")
3456 || lower.contains("system cannot find the path specified")
3457 || lower.contains("command not found")
3458 || lower.contains("is not recognized as an internal or external command")
3459 || lower.contains("shell command blocked on windows")
3460}
3461
3462fn format_context_mode(requested: &ContextMode, auto_compact: bool) -> &'static str {
3463 match requested {
3464 ContextMode::Full => "full",
3465 ContextMode::Compact => "compact",
3466 ContextMode::Auto => {
3467 if auto_compact {
3468 "auto_compact"
3469 } else {
3470 "auto_standard"
3471 }
3472 }
3473 }
3474}
3475
3476fn tandem_runtime_system_prompt(host: &HostRuntimeContext) -> String {
3477 let mut sections = Vec::new();
3478 if os_aware_prompts_enabled() {
3479 sections.push(format!(
3480 "[Execution Environment]\nHost OS: {}\nShell: {}\nPath style: {}\nArchitecture: {}",
3481 host_os_label(host.os),
3482 shell_family_label(host.shell_family),
3483 path_style_label(host.path_style),
3484 host.arch
3485 ));
3486 }
3487 sections.push(
3488 "You are operating inside Tandem (Desktop/TUI) as an engine-backed coding assistant.
3489Use tool calls to inspect and modify the workspace when needed instead of asking the user
3490to manually run basic discovery steps. Permission prompts may occur for some tools; if
3491a tool is denied or blocked, explain what was blocked and suggest a concrete next step."
3492 .to_string(),
3493 );
3494 sections.push(
3495 "For greetings or simple conversational messages (for example: hi, hello, thanks),
3496respond directly without calling tools."
3497 .to_string(),
3498 );
3499 if host.os == HostOs::Windows {
3500 sections.push(
3501 "Windows guidance: prefer cross-platform tools (`glob`, `grep`, `read`, `write`, `edit`) and PowerShell-native commands.
3502Avoid Unix-only shell syntax (`ls -la`, `find ... -type f`, `cat` pipelines) unless translated.
3503If a shell command fails with a path/shell mismatch, immediately switch to cross-platform tools (`read`, `glob`, `grep`)."
3504 .to_string(),
3505 );
3506 } else {
3507 sections.push(
3508 "POSIX guidance: standard shell commands are available.
3509Use cross-platform tools (`glob`, `grep`, `read`) when they are simpler and safer for codebase exploration."
3510 .to_string(),
3511 );
3512 }
3513 sections.join("\n\n")
3514}
3515
3516fn os_aware_prompts_enabled() -> bool {
3517 std::env::var("TANDEM_OS_AWARE_PROMPTS")
3518 .ok()
3519 .map(|v| {
3520 let normalized = v.trim().to_ascii_lowercase();
3521 !(normalized == "0" || normalized == "false" || normalized == "off")
3522 })
3523 .unwrap_or(true)
3524}
3525
3526fn host_os_label(os: HostOs) -> &'static str {
3527 match os {
3528 HostOs::Windows => "windows",
3529 HostOs::Linux => "linux",
3530 HostOs::Macos => "macos",
3531 }
3532}
3533
3534fn shell_family_label(shell: ShellFamily) -> &'static str {
3535 match shell {
3536 ShellFamily::Powershell => "powershell",
3537 ShellFamily::Posix => "posix",
3538 }
3539}
3540
3541fn path_style_label(path_style: PathStyle) -> &'static str {
3542 match path_style {
3543 PathStyle::Windows => "windows",
3544 PathStyle::Posix => "posix",
3545 }
3546}
3547
3548fn should_force_workspace_probe(user_text: &str, completion: &str) -> bool {
3549 let user = user_text.to_lowercase();
3550 let reply = completion.to_lowercase();
3551
3552 let asked_for_project_context = [
3553 "what is this project",
3554 "what's this project",
3555 "what project is this",
3556 "explain this project",
3557 "analyze this project",
3558 "inspect this project",
3559 "look at the project",
3560 "summarize this project",
3561 "show me this project",
3562 "what files are in",
3563 "show files",
3564 "list files",
3565 "read files",
3566 "browse files",
3567 "use glob",
3568 "run glob",
3569 ]
3570 .iter()
3571 .any(|needle| user.contains(needle));
3572
3573 if !asked_for_project_context {
3574 return false;
3575 }
3576
3577 let assistant_claimed_no_access = [
3578 "can't inspect",
3579 "cannot inspect",
3580 "unable to inspect",
3581 "unable to directly inspect",
3582 "can't access",
3583 "cannot access",
3584 "unable to access",
3585 "can't read files",
3586 "cannot read files",
3587 "unable to read files",
3588 "tool restriction",
3589 "tool restrictions",
3590 "don't have visibility",
3591 "no visibility",
3592 "haven't been able to inspect",
3593 "i don't know what this project is",
3594 "need your help to",
3595 "sandbox",
3596 "restriction",
3597 "system restriction",
3598 "permissions restrictions",
3599 ]
3600 .iter()
3601 .any(|needle| reply.contains(needle));
3602
3603 asked_for_project_context && assistant_claimed_no_access
3606}
3607
3608fn parse_tool_invocation(input: &str) -> Option<(String, serde_json::Value)> {
3609 let raw = input.trim();
3610 if !raw.starts_with("/tool ") {
3611 return None;
3612 }
3613 let rest = raw.trim_start_matches("/tool ").trim();
3614 let mut split = rest.splitn(2, ' ');
3615 let tool = normalize_tool_name(split.next()?.trim());
3616 let args = split
3617 .next()
3618 .and_then(|v| serde_json::from_str::<serde_json::Value>(v).ok())
3619 .unwrap_or_else(|| json!({}));
3620 Some((tool, args))
3621}
3622
3623fn parse_tool_invocations_from_response(input: &str) -> Vec<(String, serde_json::Value)> {
3624 let trimmed = input.trim();
3625 if trimmed.is_empty() {
3626 return Vec::new();
3627 }
3628
3629 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
3630 if let Some(found) = extract_tool_call_from_value(&parsed) {
3631 return vec![found];
3632 }
3633 }
3634
3635 if let Some(block) = extract_first_json_object(trimmed) {
3636 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&block) {
3637 if let Some(found) = extract_tool_call_from_value(&parsed) {
3638 return vec![found];
3639 }
3640 }
3641 }
3642
3643 parse_function_style_tool_calls(trimmed)
3644}
3645
3646#[cfg(test)]
3647fn parse_tool_invocation_from_response(input: &str) -> Option<(String, serde_json::Value)> {
3648 parse_tool_invocations_from_response(input)
3649 .into_iter()
3650 .next()
3651}
3652
3653fn parse_function_style_tool_calls(input: &str) -> Vec<(String, Value)> {
3654 let mut calls = Vec::new();
3655 let lower = input.to_lowercase();
3656 let names = [
3657 "todo_write",
3658 "todowrite",
3659 "update_todo_list",
3660 "update_todos",
3661 ];
3662 let mut cursor = 0usize;
3663
3664 while cursor < lower.len() {
3665 let mut best: Option<(usize, &str)> = None;
3666 for name in names {
3667 let needle = format!("{name}(");
3668 if let Some(rel_idx) = lower[cursor..].find(&needle) {
3669 let idx = cursor + rel_idx;
3670 if best.as_ref().is_none_or(|(best_idx, _)| idx < *best_idx) {
3671 best = Some((idx, name));
3672 }
3673 }
3674 }
3675
3676 let Some((tool_start, tool_name)) = best else {
3677 break;
3678 };
3679
3680 let open_paren = tool_start + tool_name.len();
3681 if let Some(close_paren) = find_matching_paren(input, open_paren) {
3682 if let Some(args_text) = input.get(open_paren + 1..close_paren) {
3683 let args = parse_function_style_args(args_text.trim());
3684 calls.push((normalize_tool_name(tool_name), Value::Object(args)));
3685 }
3686 cursor = close_paren.saturating_add(1);
3687 } else {
3688 cursor = tool_start.saturating_add(tool_name.len());
3689 }
3690 }
3691
3692 calls
3693}
3694
3695fn find_matching_paren(input: &str, open_paren: usize) -> Option<usize> {
3696 if input.as_bytes().get(open_paren).copied()? != b'(' {
3697 return None;
3698 }
3699
3700 let mut depth = 0usize;
3701 let mut in_single = false;
3702 let mut in_double = false;
3703 let mut escaped = false;
3704
3705 for (offset, ch) in input.get(open_paren..)?.char_indices() {
3706 if escaped {
3707 escaped = false;
3708 continue;
3709 }
3710 if ch == '\\' && (in_single || in_double) {
3711 escaped = true;
3712 continue;
3713 }
3714 if ch == '\'' && !in_double {
3715 in_single = !in_single;
3716 continue;
3717 }
3718 if ch == '"' && !in_single {
3719 in_double = !in_double;
3720 continue;
3721 }
3722 if in_single || in_double {
3723 continue;
3724 }
3725
3726 match ch {
3727 '(' => depth += 1,
3728 ')' => {
3729 depth = depth.saturating_sub(1);
3730 if depth == 0 {
3731 return Some(open_paren + offset);
3732 }
3733 }
3734 _ => {}
3735 }
3736 }
3737
3738 None
3739}
3740
3741fn parse_function_style_args(input: &str) -> Map<String, Value> {
3742 let mut args = Map::new();
3743 if input.trim().is_empty() {
3744 return args;
3745 }
3746
3747 let mut parts = Vec::<String>::new();
3748 let mut current = String::new();
3749 let mut in_single = false;
3750 let mut in_double = false;
3751 let mut escaped = false;
3752 let mut depth_paren = 0usize;
3753 let mut depth_bracket = 0usize;
3754 let mut depth_brace = 0usize;
3755
3756 for ch in input.chars() {
3757 if escaped {
3758 current.push(ch);
3759 escaped = false;
3760 continue;
3761 }
3762 if ch == '\\' && (in_single || in_double) {
3763 current.push(ch);
3764 escaped = true;
3765 continue;
3766 }
3767 if ch == '\'' && !in_double {
3768 in_single = !in_single;
3769 current.push(ch);
3770 continue;
3771 }
3772 if ch == '"' && !in_single {
3773 in_double = !in_double;
3774 current.push(ch);
3775 continue;
3776 }
3777 if in_single || in_double {
3778 current.push(ch);
3779 continue;
3780 }
3781
3782 match ch {
3783 '(' => depth_paren += 1,
3784 ')' => depth_paren = depth_paren.saturating_sub(1),
3785 '[' => depth_bracket += 1,
3786 ']' => depth_bracket = depth_bracket.saturating_sub(1),
3787 '{' => depth_brace += 1,
3788 '}' => depth_brace = depth_brace.saturating_sub(1),
3789 ',' if depth_paren == 0 && depth_bracket == 0 && depth_brace == 0 => {
3790 let part = current.trim();
3791 if !part.is_empty() {
3792 parts.push(part.to_string());
3793 }
3794 current.clear();
3795 continue;
3796 }
3797 _ => {}
3798 }
3799 current.push(ch);
3800 }
3801 let tail = current.trim();
3802 if !tail.is_empty() {
3803 parts.push(tail.to_string());
3804 }
3805
3806 for part in parts {
3807 let Some((raw_key, raw_value)) = part
3808 .split_once('=')
3809 .or_else(|| part.split_once(':'))
3810 .map(|(k, v)| (k.trim(), v.trim()))
3811 else {
3812 continue;
3813 };
3814 let key = raw_key.trim_matches(|c| c == '"' || c == '\'' || c == '`');
3815 if key.is_empty() {
3816 continue;
3817 }
3818 let value = parse_scalar_like_value(raw_value);
3819 args.insert(key.to_string(), value);
3820 }
3821
3822 args
3823}
3824
3825fn parse_scalar_like_value(raw: &str) -> Value {
3826 let trimmed = raw.trim();
3827 if trimmed.is_empty() {
3828 return Value::Null;
3829 }
3830
3831 if (trimmed.starts_with('"') && trimmed.ends_with('"'))
3832 || (trimmed.starts_with('\'') && trimmed.ends_with('\''))
3833 {
3834 return Value::String(trimmed[1..trimmed.len().saturating_sub(1)].to_string());
3835 }
3836
3837 if trimmed.eq_ignore_ascii_case("true") {
3838 return Value::Bool(true);
3839 }
3840 if trimmed.eq_ignore_ascii_case("false") {
3841 return Value::Bool(false);
3842 }
3843 if trimmed.eq_ignore_ascii_case("null") {
3844 return Value::Null;
3845 }
3846
3847 if let Ok(v) = serde_json::from_str::<Value>(trimmed) {
3848 return v;
3849 }
3850 if let Ok(v) = trimmed.parse::<i64>() {
3851 return Value::Number(Number::from(v));
3852 }
3853 if let Ok(v) = trimmed.parse::<f64>() {
3854 if let Some(n) = Number::from_f64(v) {
3855 return Value::Number(n);
3856 }
3857 }
3858
3859 Value::String(trimmed.to_string())
3860}
3861
3862fn normalize_todo_write_args(args: Value, completion: &str) -> Value {
3863 if is_todo_status_update_args(&args) {
3864 return args;
3865 }
3866
3867 let mut obj = match args {
3868 Value::Object(map) => map,
3869 Value::Array(items) => {
3870 return json!({ "todos": normalize_todo_arg_items(items) });
3871 }
3872 Value::String(text) => {
3873 let derived = extract_todo_candidates_from_text(&text);
3874 if !derived.is_empty() {
3875 return json!({ "todos": derived });
3876 }
3877 return json!({});
3878 }
3879 _ => return json!({}),
3880 };
3881
3882 if obj
3883 .get("todos")
3884 .and_then(|v| v.as_array())
3885 .map(|arr| !arr.is_empty())
3886 .unwrap_or(false)
3887 {
3888 return Value::Object(obj);
3889 }
3890
3891 for alias in ["tasks", "items", "list", "checklist"] {
3892 if let Some(items) = obj.get(alias).and_then(|v| v.as_array()) {
3893 let normalized = normalize_todo_arg_items(items.clone());
3894 if !normalized.is_empty() {
3895 obj.insert("todos".to_string(), Value::Array(normalized));
3896 return Value::Object(obj);
3897 }
3898 }
3899 }
3900
3901 let derived = extract_todo_candidates_from_text(completion);
3902 if !derived.is_empty() {
3903 obj.insert("todos".to_string(), Value::Array(derived));
3904 }
3905 Value::Object(obj)
3906}
3907
3908fn normalize_todo_arg_items(items: Vec<Value>) -> Vec<Value> {
3909 items
3910 .into_iter()
3911 .filter_map(|item| match item {
3912 Value::String(text) => {
3913 let content = text.trim();
3914 if content.is_empty() {
3915 None
3916 } else {
3917 Some(json!({"content": content}))
3918 }
3919 }
3920 Value::Object(mut obj) => {
3921 if !obj.contains_key("content") {
3922 if let Some(text) = obj.get("text").cloned() {
3923 obj.insert("content".to_string(), text);
3924 } else if let Some(title) = obj.get("title").cloned() {
3925 obj.insert("content".to_string(), title);
3926 } else if let Some(name) = obj.get("name").cloned() {
3927 obj.insert("content".to_string(), name);
3928 }
3929 }
3930 let content = obj
3931 .get("content")
3932 .and_then(|v| v.as_str())
3933 .map(str::trim)
3934 .unwrap_or("");
3935 if content.is_empty() {
3936 None
3937 } else {
3938 Some(Value::Object(obj))
3939 }
3940 }
3941 _ => None,
3942 })
3943 .collect()
3944}
3945
3946fn is_todo_status_update_args(args: &Value) -> bool {
3947 let Some(obj) = args.as_object() else {
3948 return false;
3949 };
3950 let has_status = obj
3951 .get("status")
3952 .and_then(|v| v.as_str())
3953 .map(|s| !s.trim().is_empty())
3954 .unwrap_or(false);
3955 let has_target =
3956 obj.get("task_id").is_some() || obj.get("todo_id").is_some() || obj.get("id").is_some();
3957 has_status && has_target
3958}
3959
3960fn is_empty_todo_write_args(args: &Value) -> bool {
3961 if is_todo_status_update_args(args) {
3962 return false;
3963 }
3964 let Some(obj) = args.as_object() else {
3965 return true;
3966 };
3967 !obj.get("todos")
3968 .and_then(|v| v.as_array())
3969 .map(|arr| !arr.is_empty())
3970 .unwrap_or(false)
3971}
3972
3973fn parse_streamed_tool_args(tool_name: &str, raw_args: &str) -> Value {
3974 let trimmed = raw_args.trim();
3975 if trimmed.is_empty() {
3976 return json!({});
3977 }
3978
3979 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
3980 return normalize_streamed_tool_args(tool_name, parsed, trimmed);
3981 }
3982
3983 let kv_args = parse_function_style_args(trimmed);
3986 if !kv_args.is_empty() {
3987 return normalize_streamed_tool_args(tool_name, Value::Object(kv_args), trimmed);
3988 }
3989
3990 if normalize_tool_name(tool_name) == "websearch" {
3991 if let Some(query) = sanitize_websearch_query_candidate(trimmed) {
3992 return json!({ "query": query });
3993 }
3994 return json!({});
3995 }
3996
3997 json!({})
3998}
3999
4000fn normalize_streamed_tool_args(tool_name: &str, parsed: Value, raw: &str) -> Value {
4001 let normalized_tool = normalize_tool_name(tool_name);
4002 if normalized_tool != "websearch" {
4003 return parsed;
4004 }
4005
4006 match parsed {
4007 Value::Object(mut obj) => {
4008 if !has_websearch_query(&obj) && !raw.trim().is_empty() {
4009 if let Some(query) = sanitize_websearch_query_candidate(raw) {
4010 obj.insert("query".to_string(), Value::String(query));
4011 }
4012 }
4013 Value::Object(obj)
4014 }
4015 Value::String(s) => match sanitize_websearch_query_candidate(&s) {
4016 Some(query) => json!({ "query": query }),
4017 None => json!({}),
4018 },
4019 other => other,
4020 }
4021}
4022
4023fn has_websearch_query(obj: &Map<String, Value>) -> bool {
4024 const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
4025 QUERY_KEYS.iter().any(|key| {
4026 obj.get(*key)
4027 .and_then(|v| v.as_str())
4028 .map(|s| !s.trim().is_empty())
4029 .unwrap_or(false)
4030 })
4031}
4032
4033fn extract_tool_call_from_value(value: &Value) -> Option<(String, Value)> {
4034 if let Some(obj) = value.as_object() {
4035 if let Some(tool) = obj.get("tool").and_then(|v| v.as_str()) {
4036 return Some((
4037 normalize_tool_name(tool),
4038 obj.get("args").cloned().unwrap_or_else(|| json!({})),
4039 ));
4040 }
4041
4042 if let Some(tool) = obj.get("name").and_then(|v| v.as_str()) {
4043 let args = obj
4044 .get("args")
4045 .cloned()
4046 .or_else(|| obj.get("arguments").cloned())
4047 .unwrap_or_else(|| json!({}));
4048 let normalized_tool = normalize_tool_name(tool);
4049 let args = if let Some(raw) = args.as_str() {
4050 parse_streamed_tool_args(&normalized_tool, raw)
4051 } else {
4052 args
4053 };
4054 return Some((normalized_tool, args));
4055 }
4056
4057 for key in [
4058 "tool_call",
4059 "toolCall",
4060 "call",
4061 "function_call",
4062 "functionCall",
4063 ] {
4064 if let Some(nested) = obj.get(key) {
4065 if let Some(found) = extract_tool_call_from_value(nested) {
4066 return Some(found);
4067 }
4068 }
4069 }
4070
4071 if let Some(calls) = obj.get("tool_calls").and_then(|v| v.as_array()) {
4072 for call in calls {
4073 if let Some(found) = extract_tool_call_from_value(call) {
4074 return Some(found);
4075 }
4076 }
4077 }
4078 }
4079
4080 if let Some(items) = value.as_array() {
4081 for item in items {
4082 if let Some(found) = extract_tool_call_from_value(item) {
4083 return Some(found);
4084 }
4085 }
4086 }
4087
4088 None
4089}
4090
4091fn extract_first_json_object(input: &str) -> Option<String> {
4092 let mut start = None;
4093 let mut depth = 0usize;
4094 for (idx, ch) in input.char_indices() {
4095 if ch == '{' {
4096 if start.is_none() {
4097 start = Some(idx);
4098 }
4099 depth += 1;
4100 } else if ch == '}' {
4101 if depth == 0 {
4102 continue;
4103 }
4104 depth -= 1;
4105 if depth == 0 {
4106 let begin = start?;
4107 let block = input.get(begin..=idx)?;
4108 return Some(block.to_string());
4109 }
4110 }
4111 }
4112 None
4113}
4114
4115fn extract_todo_candidates_from_text(input: &str) -> Vec<Value> {
4116 let mut seen = HashSet::<String>::new();
4117 let mut todos = Vec::new();
4118
4119 for raw_line in input.lines() {
4120 let mut line = raw_line.trim();
4121 let mut structured_line = false;
4122 if line.is_empty() {
4123 continue;
4124 }
4125 if line.starts_with("```") {
4126 continue;
4127 }
4128 if line.ends_with(':') {
4129 continue;
4130 }
4131 if let Some(rest) = line
4132 .strip_prefix("- [ ]")
4133 .or_else(|| line.strip_prefix("* [ ]"))
4134 .or_else(|| line.strip_prefix("- [x]"))
4135 .or_else(|| line.strip_prefix("* [x]"))
4136 {
4137 line = rest.trim();
4138 structured_line = true;
4139 } else if let Some(rest) = line.strip_prefix("- ").or_else(|| line.strip_prefix("* ")) {
4140 line = rest.trim();
4141 structured_line = true;
4142 } else {
4143 let bytes = line.as_bytes();
4144 let mut i = 0usize;
4145 while i < bytes.len() && bytes[i].is_ascii_digit() {
4146 i += 1;
4147 }
4148 if i > 0 && i + 1 < bytes.len() && (bytes[i] == b'.' || bytes[i] == b')') {
4149 line = line[i + 1..].trim();
4150 structured_line = true;
4151 }
4152 }
4153 if !structured_line {
4154 continue;
4155 }
4156
4157 let content = line.trim_matches(|c: char| c.is_whitespace() || c == '-' || c == '*');
4158 if content.len() < 5 || content.len() > 180 {
4159 continue;
4160 }
4161 let key = content.to_lowercase();
4162 if seen.contains(&key) {
4163 continue;
4164 }
4165 seen.insert(key);
4166 todos.push(json!({ "content": content }));
4167 if todos.len() >= 25 {
4168 break;
4169 }
4170 }
4171
4172 todos
4173}
4174
4175async fn emit_plan_todo_fallback(
4176 storage: std::sync::Arc<Storage>,
4177 bus: &EventBus,
4178 session_id: &str,
4179 message_id: &str,
4180 completion: &str,
4181) {
4182 let todos = extract_todo_candidates_from_text(completion);
4183 if todos.is_empty() {
4184 return;
4185 }
4186
4187 let invoke_part = WireMessagePart::tool_invocation(
4188 session_id,
4189 message_id,
4190 "todo_write",
4191 json!({"todos": todos.clone()}),
4192 );
4193 let call_id = invoke_part.id.clone();
4194 bus.publish(EngineEvent::new(
4195 "message.part.updated",
4196 json!({"part": invoke_part}),
4197 ));
4198
4199 if storage.set_todos(session_id, todos).await.is_err() {
4200 let mut failed_part =
4201 WireMessagePart::tool_result(session_id, message_id, "todo_write", json!(null));
4202 failed_part.id = call_id;
4203 failed_part.state = Some("failed".to_string());
4204 failed_part.error = Some("failed to persist plan todos".to_string());
4205 bus.publish(EngineEvent::new(
4206 "message.part.updated",
4207 json!({"part": failed_part}),
4208 ));
4209 return;
4210 }
4211
4212 let normalized = storage.get_todos(session_id).await;
4213 let mut result_part = WireMessagePart::tool_result(
4214 session_id,
4215 message_id,
4216 "todo_write",
4217 json!({ "todos": normalized }),
4218 );
4219 result_part.id = call_id;
4220 bus.publish(EngineEvent::new(
4221 "message.part.updated",
4222 json!({"part": result_part}),
4223 ));
4224 bus.publish(EngineEvent::new(
4225 "todo.updated",
4226 json!({
4227 "sessionID": session_id,
4228 "todos": normalized
4229 }),
4230 ));
4231}
4232
4233async fn emit_plan_question_fallback(
4234 storage: std::sync::Arc<Storage>,
4235 bus: &EventBus,
4236 session_id: &str,
4237 message_id: &str,
4238 completion: &str,
4239) {
4240 let trimmed = completion.trim();
4241 if trimmed.is_empty() {
4242 return;
4243 }
4244
4245 let hints = extract_todo_candidates_from_text(trimmed)
4246 .into_iter()
4247 .take(6)
4248 .filter_map(|v| {
4249 v.get("content")
4250 .and_then(|c| c.as_str())
4251 .map(ToString::to_string)
4252 })
4253 .collect::<Vec<_>>();
4254
4255 let mut options = hints
4256 .iter()
4257 .map(|label| json!({"label": label, "description": "Use this as a starting task"}))
4258 .collect::<Vec<_>>();
4259 if options.is_empty() {
4260 options = vec![
4261 json!({"label":"Define scope", "description":"Clarify the intended outcome"}),
4262 json!({"label":"Provide constraints", "description":"Budget, timeline, and constraints"}),
4263 json!({"label":"Draft a starter list", "description":"Generate a first-pass task list"}),
4264 ];
4265 }
4266
4267 let question_payload = vec![json!({
4268 "header":"Planning Input",
4269 "question":"I couldn't produce a concrete task list yet. Which tasks should I include first?",
4270 "options": options,
4271 "multiple": true,
4272 "custom": true
4273 })];
4274
4275 let request = storage
4276 .add_question_request(session_id, message_id, question_payload.clone())
4277 .await
4278 .ok();
4279 bus.publish(EngineEvent::new(
4280 "question.asked",
4281 json!({
4282 "id": request
4283 .as_ref()
4284 .map(|req| req.id.clone())
4285 .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
4286 "sessionID": session_id,
4287 "messageID": message_id,
4288 "questions": question_payload,
4289 "tool": request.and_then(|req| {
4290 req.tool.map(|tool| {
4291 json!({
4292 "callID": tool.call_id,
4293 "messageID": tool.message_id
4294 })
4295 })
4296 })
4297 }),
4298 ));
4299}
4300
4301#[derive(Debug, Clone, Copy)]
4302enum ChatHistoryProfile {
4303 Full,
4304 Standard,
4305 Compact,
4306}
4307
4308async fn load_chat_history(
4309 storage: std::sync::Arc<Storage>,
4310 session_id: &str,
4311 profile: ChatHistoryProfile,
4312) -> Vec<ChatMessage> {
4313 let Some(session) = storage.get_session(session_id).await else {
4314 return Vec::new();
4315 };
4316 let messages = session
4317 .messages
4318 .into_iter()
4319 .map(|m| {
4320 let role = format!("{:?}", m.role).to_lowercase();
4321 let content = m
4322 .parts
4323 .into_iter()
4324 .map(|part| match part {
4325 MessagePart::Text { text } => text,
4326 MessagePart::Reasoning { text } => text,
4327 MessagePart::ToolInvocation { tool, result, .. } => {
4328 format!("Tool {tool} => {}", result.unwrap_or_else(|| json!({})))
4329 }
4330 })
4331 .collect::<Vec<_>>()
4332 .join("\n");
4333 ChatMessage {
4334 role,
4335 content,
4336 attachments: Vec::new(),
4337 }
4338 })
4339 .collect::<Vec<_>>();
4340 compact_chat_history(messages, profile)
4341}
4342
4343fn attach_to_last_user_message(messages: &mut [ChatMessage], attachments: &[ChatAttachment]) {
4344 if attachments.is_empty() {
4345 return;
4346 }
4347 if let Some(message) = messages.iter_mut().rev().find(|m| m.role == "user") {
4348 message.attachments = attachments.to_vec();
4349 }
4350}
4351
4352async fn build_runtime_attachments(
4353 provider_id: &str,
4354 parts: &[MessagePartInput],
4355) -> Vec<ChatAttachment> {
4356 if !supports_image_attachments(provider_id) {
4357 return Vec::new();
4358 }
4359
4360 let mut attachments = Vec::new();
4361 for part in parts {
4362 let MessagePartInput::File { mime, url, .. } = part else {
4363 continue;
4364 };
4365 if !mime.to_ascii_lowercase().starts_with("image/") {
4366 continue;
4367 }
4368 if let Some(source_url) = normalize_attachment_source_url(url, mime).await {
4369 attachments.push(ChatAttachment::ImageUrl { url: source_url });
4370 }
4371 }
4372
4373 attachments
4374}
4375
4376fn supports_image_attachments(provider_id: &str) -> bool {
4377 matches!(
4378 provider_id,
4379 "openai"
4380 | "openrouter"
4381 | "ollama"
4382 | "groq"
4383 | "mistral"
4384 | "together"
4385 | "azure"
4386 | "bedrock"
4387 | "vertex"
4388 | "copilot"
4389 )
4390}
4391
4392async fn normalize_attachment_source_url(url: &str, mime: &str) -> Option<String> {
4393 let trimmed = url.trim();
4394 if trimmed.is_empty() {
4395 return None;
4396 }
4397 if trimmed.starts_with("http://")
4398 || trimmed.starts_with("https://")
4399 || trimmed.starts_with("data:")
4400 {
4401 return Some(trimmed.to_string());
4402 }
4403
4404 let file_path = trimmed
4405 .strip_prefix("file://")
4406 .map(PathBuf::from)
4407 .unwrap_or_else(|| PathBuf::from(trimmed));
4408 if !file_path.exists() {
4409 return None;
4410 }
4411
4412 let max_bytes = std::env::var("TANDEM_CHANNEL_MAX_ATTACHMENT_BYTES")
4413 .ok()
4414 .and_then(|v| v.parse::<usize>().ok())
4415 .unwrap_or(20 * 1024 * 1024);
4416
4417 let bytes = match tokio::fs::read(&file_path).await {
4418 Ok(bytes) => bytes,
4419 Err(err) => {
4420 tracing::warn!(
4421 "failed reading local attachment '{}': {}",
4422 file_path.to_string_lossy(),
4423 err
4424 );
4425 return None;
4426 }
4427 };
4428 if bytes.len() > max_bytes {
4429 tracing::warn!(
4430 "local attachment '{}' exceeds max bytes ({} > {})",
4431 file_path.to_string_lossy(),
4432 bytes.len(),
4433 max_bytes
4434 );
4435 return None;
4436 }
4437
4438 use base64::Engine as _;
4439 let b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
4440 Some(format!("data:{mime};base64,{b64}"))
4441}
4442
4443struct ToolSideEventContext<'a> {
4444 session_id: &'a str,
4445 message_id: &'a str,
4446 tool: &'a str,
4447 args: &'a serde_json::Value,
4448 metadata: &'a serde_json::Value,
4449 workspace_root: Option<&'a str>,
4450 effective_cwd: Option<&'a str>,
4451}
4452
4453async fn emit_tool_side_events(
4454 storage: std::sync::Arc<Storage>,
4455 bus: &EventBus,
4456 ctx: ToolSideEventContext<'_>,
4457) {
4458 let ToolSideEventContext {
4459 session_id,
4460 message_id,
4461 tool,
4462 args,
4463 metadata,
4464 workspace_root,
4465 effective_cwd,
4466 } = ctx;
4467 if tool == "todo_write" {
4468 let todos_from_metadata = metadata
4469 .get("todos")
4470 .and_then(|v| v.as_array())
4471 .cloned()
4472 .unwrap_or_default();
4473
4474 if !todos_from_metadata.is_empty() {
4475 let _ = storage.set_todos(session_id, todos_from_metadata).await;
4476 } else {
4477 let current = storage.get_todos(session_id).await;
4478 if let Some(updated) = apply_todo_updates_from_args(current, args) {
4479 let _ = storage.set_todos(session_id, updated).await;
4480 }
4481 }
4482
4483 let normalized = storage.get_todos(session_id).await;
4484 bus.publish(EngineEvent::new(
4485 "todo.updated",
4486 json!({
4487 "sessionID": session_id,
4488 "todos": normalized,
4489 "workspaceRoot": workspace_root,
4490 "effectiveCwd": effective_cwd
4491 }),
4492 ));
4493 }
4494 if tool == "question" {
4495 let questions = metadata
4496 .get("questions")
4497 .and_then(|v| v.as_array())
4498 .cloned()
4499 .unwrap_or_default();
4500 if questions.is_empty() {
4501 tracing::warn!(
4502 "question tool produced empty questions payload; skipping question.asked event session_id={} message_id={}",
4503 session_id,
4504 message_id
4505 );
4506 } else {
4507 let request = storage
4508 .add_question_request(session_id, message_id, questions.clone())
4509 .await
4510 .ok();
4511 bus.publish(EngineEvent::new(
4512 "question.asked",
4513 json!({
4514 "id": request
4515 .as_ref()
4516 .map(|req| req.id.clone())
4517 .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
4518 "sessionID": session_id,
4519 "messageID": message_id,
4520 "questions": questions,
4521 "tool": request.and_then(|req| {
4522 req.tool.map(|tool| {
4523 json!({
4524 "callID": tool.call_id,
4525 "messageID": tool.message_id
4526 })
4527 })
4528 }),
4529 "workspaceRoot": workspace_root,
4530 "effectiveCwd": effective_cwd
4531 }),
4532 ));
4533 }
4534 }
4535 if let Some(events) = metadata.get("events").and_then(|v| v.as_array()) {
4536 for event in events {
4537 let Some(event_type) = event.get("type").and_then(|v| v.as_str()) else {
4538 continue;
4539 };
4540 if !event_type.starts_with("agent_team.") {
4541 continue;
4542 }
4543 let mut properties = event
4544 .get("properties")
4545 .and_then(|v| v.as_object())
4546 .cloned()
4547 .unwrap_or_default();
4548 properties
4549 .entry("sessionID".to_string())
4550 .or_insert(json!(session_id));
4551 properties
4552 .entry("messageID".to_string())
4553 .or_insert(json!(message_id));
4554 properties
4555 .entry("workspaceRoot".to_string())
4556 .or_insert(json!(workspace_root));
4557 properties
4558 .entry("effectiveCwd".to_string())
4559 .or_insert(json!(effective_cwd));
4560 bus.publish(EngineEvent::new(event_type, Value::Object(properties)));
4561 }
4562 }
4563}
4564
4565fn apply_todo_updates_from_args(current: Vec<Value>, args: &Value) -> Option<Vec<Value>> {
4566 let obj = args.as_object()?;
4567 let mut todos = current;
4568 let mut changed = false;
4569
4570 if let Some(items) = obj.get("todos").and_then(|v| v.as_array()) {
4571 for item in items {
4572 let Some(item_obj) = item.as_object() else {
4573 continue;
4574 };
4575 let status = item_obj
4576 .get("status")
4577 .and_then(|v| v.as_str())
4578 .map(normalize_todo_status);
4579 let target = item_obj
4580 .get("task_id")
4581 .or_else(|| item_obj.get("todo_id"))
4582 .or_else(|| item_obj.get("id"));
4583
4584 if let (Some(status), Some(target)) = (status, target) {
4585 changed |= apply_single_todo_status_update(&mut todos, target, &status);
4586 }
4587 }
4588 }
4589
4590 let status = obj
4591 .get("status")
4592 .and_then(|v| v.as_str())
4593 .map(normalize_todo_status);
4594 let target = obj
4595 .get("task_id")
4596 .or_else(|| obj.get("todo_id"))
4597 .or_else(|| obj.get("id"));
4598 if let (Some(status), Some(target)) = (status, target) {
4599 changed |= apply_single_todo_status_update(&mut todos, target, &status);
4600 }
4601
4602 if changed {
4603 Some(todos)
4604 } else {
4605 None
4606 }
4607}
4608
4609fn apply_single_todo_status_update(todos: &mut [Value], target: &Value, status: &str) -> bool {
4610 let idx_from_value = match target {
4611 Value::Number(n) => n.as_u64().map(|v| v.saturating_sub(1) as usize),
4612 Value::String(s) => {
4613 let trimmed = s.trim();
4614 trimmed
4615 .parse::<usize>()
4616 .ok()
4617 .map(|v| v.saturating_sub(1))
4618 .or_else(|| {
4619 let digits = trimmed
4620 .chars()
4621 .rev()
4622 .take_while(|c| c.is_ascii_digit())
4623 .collect::<String>()
4624 .chars()
4625 .rev()
4626 .collect::<String>();
4627 digits.parse::<usize>().ok().map(|v| v.saturating_sub(1))
4628 })
4629 }
4630 _ => None,
4631 };
4632
4633 if let Some(idx) = idx_from_value {
4634 if idx < todos.len() {
4635 if let Some(obj) = todos[idx].as_object_mut() {
4636 obj.insert("status".to_string(), Value::String(status.to_string()));
4637 return true;
4638 }
4639 }
4640 }
4641
4642 let id_target = target.as_str().map(|s| s.trim()).filter(|s| !s.is_empty());
4643 if let Some(id_target) = id_target {
4644 for todo in todos.iter_mut() {
4645 if let Some(obj) = todo.as_object_mut() {
4646 if obj.get("id").and_then(|v| v.as_str()) == Some(id_target) {
4647 obj.insert("status".to_string(), Value::String(status.to_string()));
4648 return true;
4649 }
4650 }
4651 }
4652 }
4653
4654 false
4655}
4656
4657fn normalize_todo_status(raw: &str) -> String {
4658 match raw.trim().to_lowercase().as_str() {
4659 "in_progress" | "inprogress" | "running" | "working" => "in_progress".to_string(),
4660 "done" | "complete" | "completed" => "completed".to_string(),
4661 "cancelled" | "canceled" | "aborted" | "skipped" => "cancelled".to_string(),
4662 "open" | "todo" | "pending" => "pending".to_string(),
4663 other => other.to_string(),
4664 }
4665}
4666
4667fn compact_chat_history(
4668 messages: Vec<ChatMessage>,
4669 profile: ChatHistoryProfile,
4670) -> Vec<ChatMessage> {
4671 let (max_context_chars, keep_recent_messages) = match profile {
4672 ChatHistoryProfile::Full => (usize::MAX, usize::MAX),
4673 ChatHistoryProfile::Standard => (80_000usize, 40usize),
4674 ChatHistoryProfile::Compact => (12_000usize, 12usize),
4675 };
4676
4677 if messages.len() <= keep_recent_messages {
4678 let total_chars = messages.iter().map(|m| m.content.len()).sum::<usize>();
4679 if total_chars <= max_context_chars {
4680 return messages;
4681 }
4682 }
4683
4684 let mut kept = messages;
4685 let mut dropped_count = 0usize;
4686 let mut total_chars = kept.iter().map(|m| m.content.len()).sum::<usize>();
4687
4688 while kept.len() > keep_recent_messages || total_chars > max_context_chars {
4689 if kept.is_empty() {
4690 break;
4691 }
4692 let removed = kept.remove(0);
4693 total_chars = total_chars.saturating_sub(removed.content.len());
4694 dropped_count += 1;
4695 }
4696
4697 if dropped_count > 0 {
4698 kept.insert(
4699 0,
4700 ChatMessage {
4701 role: "system".to_string(),
4702 content: format!(
4703 "[history compacted: omitted {} older messages to fit context window]",
4704 dropped_count
4705 ),
4706 attachments: Vec::new(),
4707 },
4708 );
4709 }
4710 kept
4711}
4712
4713#[cfg(test)]
4714mod tests {
4715 use super::*;
4716 use crate::{EventBus, Storage};
4717 use uuid::Uuid;
4718
4719 #[tokio::test]
4720 async fn todo_updated_event_is_normalized() {
4721 let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
4722 let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
4723 let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
4724 let session_id = session.id.clone();
4725 storage.save_session(session).await.expect("save session");
4726
4727 let bus = EventBus::new();
4728 let mut rx = bus.subscribe();
4729 emit_tool_side_events(
4730 storage.clone(),
4731 &bus,
4732 ToolSideEventContext {
4733 session_id: &session_id,
4734 message_id: "m1",
4735 tool: "todo_write",
4736 args: &json!({"todos":[{"content":"ship parity"}]}),
4737 metadata: &json!({"todos":[{"content":"ship parity"}]}),
4738 workspace_root: Some("."),
4739 effective_cwd: Some("."),
4740 },
4741 )
4742 .await;
4743
4744 let event = rx.recv().await.expect("event");
4745 assert_eq!(event.event_type, "todo.updated");
4746 let todos = event
4747 .properties
4748 .get("todos")
4749 .and_then(|v| v.as_array())
4750 .cloned()
4751 .unwrap_or_default();
4752 assert_eq!(todos.len(), 1);
4753 assert!(todos[0].get("id").and_then(|v| v.as_str()).is_some());
4754 assert_eq!(
4755 todos[0].get("content").and_then(|v| v.as_str()),
4756 Some("ship parity")
4757 );
4758 assert!(todos[0].get("status").and_then(|v| v.as_str()).is_some());
4759 }
4760
4761 #[tokio::test]
4762 async fn question_asked_event_contains_tool_reference() {
4763 let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
4764 let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
4765 let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
4766 let session_id = session.id.clone();
4767 storage.save_session(session).await.expect("save session");
4768
4769 let bus = EventBus::new();
4770 let mut rx = bus.subscribe();
4771 emit_tool_side_events(
4772 storage,
4773 &bus,
4774 ToolSideEventContext {
4775 session_id: &session_id,
4776 message_id: "msg-1",
4777 tool: "question",
4778 args: &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
4779 metadata: &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
4780 workspace_root: Some("."),
4781 effective_cwd: Some("."),
4782 },
4783 )
4784 .await;
4785
4786 let event = rx.recv().await.expect("event");
4787 assert_eq!(event.event_type, "question.asked");
4788 assert_eq!(
4789 event
4790 .properties
4791 .get("sessionID")
4792 .and_then(|v| v.as_str())
4793 .unwrap_or(""),
4794 session_id
4795 );
4796 let tool = event
4797 .properties
4798 .get("tool")
4799 .cloned()
4800 .unwrap_or_else(|| json!({}));
4801 assert!(tool.get("callID").and_then(|v| v.as_str()).is_some());
4802 assert_eq!(
4803 tool.get("messageID").and_then(|v| v.as_str()),
4804 Some("msg-1")
4805 );
4806 }
4807
4808 #[test]
4809 fn compact_chat_history_keeps_recent_and_inserts_summary() {
4810 let mut messages = Vec::new();
4811 for i in 0..60 {
4812 messages.push(ChatMessage {
4813 role: "user".to_string(),
4814 content: format!("message-{i}"),
4815 attachments: Vec::new(),
4816 });
4817 }
4818 let compacted = compact_chat_history(messages, ChatHistoryProfile::Standard);
4819 assert!(compacted.len() <= 41);
4820 assert_eq!(compacted[0].role, "system");
4821 assert!(compacted[0].content.contains("history compacted"));
4822 assert!(compacted.iter().any(|m| m.content.contains("message-59")));
4823 }
4824
4825 #[test]
4826 fn extracts_todos_from_checklist_and_numbered_lines() {
4827 let input = r#"
4828Plan:
4829- [ ] Audit current implementation
4830- [ ] Add planner fallback
48311. Add regression test coverage
4832"#;
4833 let todos = extract_todo_candidates_from_text(input);
4834 assert_eq!(todos.len(), 3);
4835 assert_eq!(
4836 todos[0].get("content").and_then(|v| v.as_str()),
4837 Some("Audit current implementation")
4838 );
4839 }
4840
4841 #[test]
4842 fn does_not_extract_todos_from_plain_prose_lines() {
4843 let input = r#"
4844I need more information to proceed.
4845Can you tell me the event size and budget?
4846Once I have that, I can provide a detailed plan.
4847"#;
4848 let todos = extract_todo_candidates_from_text(input);
4849 assert!(todos.is_empty());
4850 }
4851
4852 #[test]
4853 fn parses_wrapped_tool_call_from_markdown_response() {
4854 let input = r#"
4855Here is the tool call:
4856```json
4857{"tool_call":{"name":"todo_write","arguments":{"todos":[{"content":"a"}]}}}
4858```
4859"#;
4860 let parsed = parse_tool_invocation_from_response(input).expect("tool call");
4861 assert_eq!(parsed.0, "todo_write");
4862 assert!(parsed.1.get("todos").is_some());
4863 }
4864
4865 #[test]
4866 fn parses_top_level_name_args_tool_call() {
4867 let input = r#"{"name":"bash","args":{"command":"echo hi"}}"#;
4868 let parsed = parse_tool_invocation_from_response(input).expect("top-level tool call");
4869 assert_eq!(parsed.0, "bash");
4870 assert_eq!(
4871 parsed.1.get("command").and_then(|v| v.as_str()),
4872 Some("echo hi")
4873 );
4874 }
4875
4876 #[test]
4877 fn parses_function_style_todowrite_call() {
4878 let input = r#"Status: Completed
4879Call: todowrite(task_id=2, status="completed")"#;
4880 let parsed = parse_tool_invocation_from_response(input).expect("function-style tool call");
4881 assert_eq!(parsed.0, "todo_write");
4882 assert_eq!(parsed.1.get("task_id").and_then(|v| v.as_i64()), Some(2));
4883 assert_eq!(
4884 parsed.1.get("status").and_then(|v| v.as_str()),
4885 Some("completed")
4886 );
4887 }
4888
4889 #[test]
4890 fn parses_multiple_function_style_todowrite_calls() {
4891 let input = r#"
4892Call: todowrite(task_id=2, status="completed")
4893Call: todowrite(task_id=3, status="in_progress")
4894"#;
4895 let parsed = parse_tool_invocations_from_response(input);
4896 assert_eq!(parsed.len(), 2);
4897 assert_eq!(parsed[0].0, "todo_write");
4898 assert_eq!(parsed[0].1.get("task_id").and_then(|v| v.as_i64()), Some(2));
4899 assert_eq!(
4900 parsed[0].1.get("status").and_then(|v| v.as_str()),
4901 Some("completed")
4902 );
4903 assert_eq!(parsed[1].1.get("task_id").and_then(|v| v.as_i64()), Some(3));
4904 assert_eq!(
4905 parsed[1].1.get("status").and_then(|v| v.as_str()),
4906 Some("in_progress")
4907 );
4908 }
4909
4910 #[test]
4911 fn applies_todo_status_update_from_task_id_args() {
4912 let current = vec![
4913 json!({"id":"todo-1","content":"a","status":"pending"}),
4914 json!({"id":"todo-2","content":"b","status":"pending"}),
4915 json!({"id":"todo-3","content":"c","status":"pending"}),
4916 ];
4917 let updated =
4918 apply_todo_updates_from_args(current, &json!({"task_id":2, "status":"completed"}))
4919 .expect("status update");
4920 assert_eq!(
4921 updated[1].get("status").and_then(|v| v.as_str()),
4922 Some("completed")
4923 );
4924 }
4925
4926 #[test]
4927 fn normalizes_todo_write_tasks_alias() {
4928 let normalized = normalize_todo_write_args(
4929 json!({"tasks":[{"title":"Book venue"},{"name":"Send invites"}]}),
4930 "",
4931 );
4932 let todos = normalized
4933 .get("todos")
4934 .and_then(|v| v.as_array())
4935 .cloned()
4936 .unwrap_or_default();
4937 assert_eq!(todos.len(), 2);
4938 assert_eq!(
4939 todos[0].get("content").and_then(|v| v.as_str()),
4940 Some("Book venue")
4941 );
4942 assert_eq!(
4943 todos[1].get("content").and_then(|v| v.as_str()),
4944 Some("Send invites")
4945 );
4946 }
4947
4948 #[test]
4949 fn normalizes_todo_write_from_completion_when_args_empty() {
4950 let completion = "Plan:\n1. Secure venue\n2. Create playlist\n3. Send invites";
4951 let normalized = normalize_todo_write_args(json!({}), completion);
4952 let todos = normalized
4953 .get("todos")
4954 .and_then(|v| v.as_array())
4955 .cloned()
4956 .unwrap_or_default();
4957 assert_eq!(todos.len(), 3);
4958 assert!(!is_empty_todo_write_args(&normalized));
4959 }
4960
4961 #[test]
4962 fn empty_todo_write_args_allows_status_updates() {
4963 let args = json!({"task_id": 2, "status":"completed"});
4964 assert!(!is_empty_todo_write_args(&args));
4965 }
4966
4967 #[test]
4968 fn streamed_websearch_args_fallback_to_query_string() {
4969 let parsed = parse_streamed_tool_args("websearch", "meaning of life");
4970 assert_eq!(
4971 parsed.get("query").and_then(|v| v.as_str()),
4972 Some("meaning of life")
4973 );
4974 }
4975
4976 #[test]
4977 fn streamed_websearch_stringified_json_args_are_unwrapped() {
4978 let parsed = parse_streamed_tool_args("websearch", r#""donkey gestation period""#);
4979 assert_eq!(
4980 parsed.get("query").and_then(|v| v.as_str()),
4981 Some("donkey gestation period")
4982 );
4983 }
4984
4985 #[test]
4986 fn streamed_websearch_args_strip_arg_key_value_wrappers() {
4987 let parsed = parse_streamed_tool_args(
4988 "websearch",
4989 "query</arg_key><arg_value>taj card what is it benefits how to apply</arg_value>",
4990 );
4991 assert_eq!(
4992 parsed.get("query").and_then(|v| v.as_str()),
4993 Some("taj card what is it benefits how to apply")
4994 );
4995 }
4996
4997 #[test]
4998 fn normalize_tool_args_websearch_infers_from_user_text() {
4999 let normalized =
5000 normalize_tool_args("websearch", json!({}), "web search meaning of life", "");
5001 assert_eq!(
5002 normalized.args.get("query").and_then(|v| v.as_str()),
5003 Some("meaning of life")
5004 );
5005 assert_eq!(normalized.args_source, "inferred_from_user");
5006 assert_eq!(normalized.args_integrity, "recovered");
5007 }
5008
5009 #[test]
5010 fn normalize_tool_args_websearch_keeps_existing_query() {
5011 let normalized = normalize_tool_args(
5012 "websearch",
5013 json!({"query":"already set"}),
5014 "web search should not override",
5015 "",
5016 );
5017 assert_eq!(
5018 normalized.args.get("query").and_then(|v| v.as_str()),
5019 Some("already set")
5020 );
5021 assert_eq!(normalized.args_source, "provider_json");
5022 assert_eq!(normalized.args_integrity, "ok");
5023 }
5024
5025 #[test]
5026 fn normalize_tool_args_websearch_fails_when_unrecoverable() {
5027 let normalized = normalize_tool_args("websearch", json!({}), "search", "");
5028 assert!(normalized.query.is_none());
5029 assert!(normalized.missing_terminal);
5030 assert_eq!(normalized.args_source, "missing");
5031 assert_eq!(normalized.args_integrity, "empty");
5032 }
5033
5034 #[test]
5035 fn normalize_tool_args_webfetch_infers_url_from_user_prompt() {
5036 let normalized = normalize_tool_args(
5037 "webfetch",
5038 json!({}),
5039 "Please fetch `https://tandem.frumu.ai/docs/` in markdown mode",
5040 "",
5041 );
5042 assert!(!normalized.missing_terminal);
5043 assert_eq!(
5044 normalized.args.get("url").and_then(|v| v.as_str()),
5045 Some("https://tandem.frumu.ai/docs/")
5046 );
5047 assert_eq!(normalized.args_source, "inferred_from_user");
5048 assert_eq!(normalized.args_integrity, "recovered");
5049 }
5050
5051 #[test]
5052 fn normalize_tool_args_webfetch_recovers_nested_url_alias() {
5053 let normalized = normalize_tool_args(
5054 "webfetch",
5055 json!({"args":{"uri":"https://example.com/page"}}),
5056 "",
5057 "",
5058 );
5059 assert!(!normalized.missing_terminal);
5060 assert_eq!(
5061 normalized.args.get("url").and_then(|v| v.as_str()),
5062 Some("https://example.com/page")
5063 );
5064 assert_eq!(normalized.args_source, "provider_json");
5065 }
5066
5067 #[test]
5068 fn normalize_tool_args_webfetch_fails_when_url_unrecoverable() {
5069 let normalized = normalize_tool_args("webfetch", json!({}), "fetch the site", "");
5070 assert!(normalized.missing_terminal);
5071 assert_eq!(
5072 normalized.missing_terminal_reason.as_deref(),
5073 Some("WEBFETCH_URL_MISSING")
5074 );
5075 }
5076
5077 #[test]
5078 fn normalize_tool_args_write_requires_path() {
5079 let normalized = normalize_tool_args("write", json!({}), "", "");
5080 assert!(normalized.missing_terminal);
5081 assert_eq!(
5082 normalized.missing_terminal_reason.as_deref(),
5083 Some("FILE_PATH_MISSING")
5084 );
5085 }
5086
5087 #[test]
5088 fn normalize_tool_args_write_recovers_alias_path_key() {
5089 let normalized = normalize_tool_args(
5090 "write",
5091 json!({"filePath":"docs/CONCEPT.md","content":"hello"}),
5092 "",
5093 "",
5094 );
5095 assert!(!normalized.missing_terminal);
5096 assert_eq!(
5097 normalized.args.get("path").and_then(|v| v.as_str()),
5098 Some("docs/CONCEPT.md")
5099 );
5100 assert_eq!(
5101 normalized.args.get("content").and_then(|v| v.as_str()),
5102 Some("hello")
5103 );
5104 }
5105
5106 #[test]
5107 fn normalize_tool_args_read_infers_path_from_user_prompt() {
5108 let normalized = normalize_tool_args(
5109 "read",
5110 json!({}),
5111 "Please inspect `FEATURE_LIST.md` and summarize key sections.",
5112 "",
5113 );
5114 assert!(!normalized.missing_terminal);
5115 assert_eq!(
5116 normalized.args.get("path").and_then(|v| v.as_str()),
5117 Some("FEATURE_LIST.md")
5118 );
5119 assert_eq!(normalized.args_source, "inferred_from_user");
5120 assert_eq!(normalized.args_integrity, "recovered");
5121 }
5122
5123 #[test]
5124 fn normalize_tool_args_read_does_not_infer_path_from_assistant_context() {
5125 let normalized = normalize_tool_args(
5126 "read",
5127 json!({}),
5128 "generic instruction",
5129 "I will read src-tauri/src/orchestrator/engine.rs first.",
5130 );
5131 assert!(normalized.missing_terminal);
5132 assert_eq!(
5133 normalized.missing_terminal_reason.as_deref(),
5134 Some("FILE_PATH_MISSING")
5135 );
5136 }
5137
5138 #[test]
5139 fn normalize_tool_args_write_recovers_path_from_nested_array_payload() {
5140 let normalized = normalize_tool_args(
5141 "write",
5142 json!({"args":[{"file_path":"docs/CONCEPT.md"}],"content":"hello"}),
5143 "",
5144 "",
5145 );
5146 assert!(!normalized.missing_terminal);
5147 assert_eq!(
5148 normalized.args.get("path").and_then(|v| v.as_str()),
5149 Some("docs/CONCEPT.md")
5150 );
5151 }
5152
5153 #[test]
5154 fn normalize_tool_args_write_recovers_content_alias() {
5155 let normalized = normalize_tool_args(
5156 "write",
5157 json!({"path":"docs/FEATURES.md","body":"feature notes"}),
5158 "",
5159 "",
5160 );
5161 assert!(!normalized.missing_terminal);
5162 assert_eq!(
5163 normalized.args.get("content").and_then(|v| v.as_str()),
5164 Some("feature notes")
5165 );
5166 }
5167
5168 #[test]
5169 fn normalize_tool_args_write_fails_when_content_missing() {
5170 let normalized = normalize_tool_args("write", json!({"path":"docs/FEATURES.md"}), "", "");
5171 assert!(normalized.missing_terminal);
5172 assert_eq!(
5173 normalized.missing_terminal_reason.as_deref(),
5174 Some("WRITE_CONTENT_MISSING")
5175 );
5176 }
5177
5178 #[test]
5179 fn normalize_tool_args_write_recovers_raw_nested_string_content() {
5180 let normalized = normalize_tool_args(
5181 "write",
5182 json!({"path":"docs/FEATURES.md","args":"Line 1\nLine 2"}),
5183 "",
5184 "",
5185 );
5186 assert!(!normalized.missing_terminal);
5187 assert_eq!(
5188 normalized.args.get("path").and_then(|v| v.as_str()),
5189 Some("docs/FEATURES.md")
5190 );
5191 assert_eq!(
5192 normalized.args.get("content").and_then(|v| v.as_str()),
5193 Some("Line 1\nLine 2")
5194 );
5195 }
5196
5197 #[test]
5198 fn normalize_tool_args_write_does_not_treat_path_as_content() {
5199 let normalized = normalize_tool_args("write", json!("docs/FEATURES.md"), "", "");
5200 assert!(normalized.missing_terminal);
5201 assert_eq!(
5202 normalized.missing_terminal_reason.as_deref(),
5203 Some("WRITE_CONTENT_MISSING")
5204 );
5205 }
5206
5207 #[test]
5208 fn normalize_tool_args_read_infers_path_from_bold_markdown() {
5209 let normalized = normalize_tool_args(
5210 "read",
5211 json!({}),
5212 "Please read **FEATURE_LIST.md** and summarize.",
5213 "",
5214 );
5215 assert!(!normalized.missing_terminal);
5216 assert_eq!(
5217 normalized.args.get("path").and_then(|v| v.as_str()),
5218 Some("FEATURE_LIST.md")
5219 );
5220 }
5221
5222 #[test]
5223 fn normalize_tool_args_shell_infers_command_from_user_prompt() {
5224 let normalized = normalize_tool_args("bash", json!({}), "Run `rg -n \"TODO\" .`", "");
5225 assert!(!normalized.missing_terminal);
5226 assert_eq!(
5227 normalized.args.get("command").and_then(|v| v.as_str()),
5228 Some("rg -n \"TODO\" .")
5229 );
5230 assert_eq!(normalized.args_source, "inferred_from_user");
5231 assert_eq!(normalized.args_integrity, "recovered");
5232 }
5233
5234 #[test]
5235 fn normalize_tool_args_read_rejects_root_only_path() {
5236 let normalized = normalize_tool_args("read", json!({"path":"/"}), "", "");
5237 assert!(normalized.missing_terminal);
5238 assert_eq!(
5239 normalized.missing_terminal_reason.as_deref(),
5240 Some("FILE_PATH_MISSING")
5241 );
5242 }
5243
5244 #[test]
5245 fn normalize_tool_args_read_recovers_when_provider_path_is_root_only() {
5246 let normalized =
5247 normalize_tool_args("read", json!({"path":"/"}), "Please open `CONCEPT.md`", "");
5248 assert!(!normalized.missing_terminal);
5249 assert_eq!(
5250 normalized.args.get("path").and_then(|v| v.as_str()),
5251 Some("CONCEPT.md")
5252 );
5253 assert_eq!(normalized.args_source, "inferred_from_user");
5254 assert_eq!(normalized.args_integrity, "recovered");
5255 }
5256
5257 #[test]
5258 fn normalize_tool_args_read_rejects_tool_call_markup_path() {
5259 let normalized = normalize_tool_args(
5260 "read",
5261 json!({
5262 "path":"<tool_call>\n<function=glob>\n<parameter=pattern>**/*</parameter>\n</function>\n</tool_call>"
5263 }),
5264 "",
5265 "",
5266 );
5267 assert!(normalized.missing_terminal);
5268 assert_eq!(
5269 normalized.missing_terminal_reason.as_deref(),
5270 Some("FILE_PATH_MISSING")
5271 );
5272 }
5273
5274 #[test]
5275 fn normalize_tool_args_read_rejects_glob_pattern_path() {
5276 let normalized = normalize_tool_args("read", json!({"path":"**/*"}), "", "");
5277 assert!(normalized.missing_terminal);
5278 assert_eq!(
5279 normalized.missing_terminal_reason.as_deref(),
5280 Some("FILE_PATH_MISSING")
5281 );
5282 }
5283
5284 #[test]
5285 fn normalize_tool_args_read_rejects_placeholder_path() {
5286 let normalized = normalize_tool_args("read", json!({"path":"files/directories"}), "", "");
5287 assert!(normalized.missing_terminal);
5288 assert_eq!(
5289 normalized.missing_terminal_reason.as_deref(),
5290 Some("FILE_PATH_MISSING")
5291 );
5292 }
5293
5294 #[test]
5295 fn normalize_tool_args_read_rejects_tool_policy_placeholder_path() {
5296 let normalized = normalize_tool_args("read", json!({"path":"tool/policy"}), "", "");
5297 assert!(normalized.missing_terminal);
5298 assert_eq!(
5299 normalized.missing_terminal_reason.as_deref(),
5300 Some("FILE_PATH_MISSING")
5301 );
5302 }
5303
5304 #[test]
5305 fn normalize_tool_args_read_recovers_pdf_path_from_user_text() {
5306 let normalized = normalize_tool_args(
5307 "read",
5308 json!({"path":"tool/policy"}),
5309 "Read `T1011U kitöltési útmutató.pdf` and summarize.",
5310 "",
5311 );
5312 assert!(!normalized.missing_terminal);
5313 assert_eq!(
5314 normalized.args.get("path").and_then(|v| v.as_str()),
5315 Some("T1011U kitöltési útmutató.pdf")
5316 );
5317 assert_eq!(normalized.args_source, "inferred_from_user");
5318 assert_eq!(normalized.args_integrity, "recovered");
5319 }
5320
5321 #[test]
5322 fn normalize_tool_name_strips_default_api_namespace() {
5323 assert_eq!(normalize_tool_name("default_api:read"), "read");
5324 assert_eq!(normalize_tool_name("functions.shell"), "bash");
5325 }
5326
5327 #[test]
5328 fn mcp_server_from_tool_name_parses_server_segment() {
5329 assert_eq!(
5330 mcp_server_from_tool_name("mcp.arcade.jira_getboards"),
5331 Some("arcade")
5332 );
5333 assert_eq!(mcp_server_from_tool_name("read"), None);
5334 assert_eq!(mcp_server_from_tool_name("mcp"), None);
5335 }
5336
5337 #[test]
5338 fn batch_helpers_use_name_when_tool_is_wrapper() {
5339 let args = json!({
5340 "tool_calls":[
5341 {"tool":"default_api","name":"read","args":{"path":"CONCEPT.md"}},
5342 {"tool":"default_api:glob","args":{"pattern":"*.md"}}
5343 ]
5344 });
5345 let calls = extract_batch_calls(&args);
5346 assert_eq!(calls.len(), 2);
5347 assert_eq!(calls[0].0, "read");
5348 assert_eq!(calls[1].0, "glob");
5349 assert!(is_read_only_batch_call(&args));
5350 let sig = batch_tool_signature(&args).unwrap_or_default();
5351 assert!(sig.contains("read:"));
5352 assert!(sig.contains("glob:"));
5353 }
5354
5355 #[test]
5356 fn batch_helpers_resolve_nested_function_name() {
5357 let args = json!({
5358 "tool_calls":[
5359 {"tool":"default_api","function":{"name":"read"},"args":{"path":"CONCEPT.md"}}
5360 ]
5361 });
5362 let calls = extract_batch_calls(&args);
5363 assert_eq!(calls.len(), 1);
5364 assert_eq!(calls[0].0, "read");
5365 assert!(is_read_only_batch_call(&args));
5366 }
5367
5368 #[test]
5369 fn batch_output_classifier_detects_non_productive_unknown_results() {
5370 let output = r#"
5371[
5372 {"tool":"default_api","output":"Unknown tool: default_api","metadata":{}},
5373 {"tool":"default_api","output":"Unknown tool: default_api","metadata":{}}
5374]
5375"#;
5376 assert!(is_non_productive_batch_output(output));
5377 }
5378
5379 #[test]
5380 fn runtime_prompt_includes_execution_environment_block() {
5381 let prompt = tandem_runtime_system_prompt(&HostRuntimeContext {
5382 os: HostOs::Windows,
5383 arch: "x86_64".to_string(),
5384 shell_family: ShellFamily::Powershell,
5385 path_style: PathStyle::Windows,
5386 });
5387 assert!(prompt.contains("[Execution Environment]"));
5388 assert!(prompt.contains("Host OS: windows"));
5389 assert!(prompt.contains("Shell: powershell"));
5390 assert!(prompt.contains("Path style: windows"));
5391 }
5392
5393 #[test]
5394 fn extract_mcp_auth_required_metadata_parses_expected_shape() {
5395 let metadata = json!({
5396 "server": "arcade",
5397 "mcpAuth": {
5398 "required": true,
5399 "challengeId": "abc123",
5400 "authorizationUrl": "https://example.com/oauth",
5401 "message": "Authorize first",
5402 "pending": true,
5403 "blocked": true,
5404 "retryAfterMs": 8000
5405 }
5406 });
5407 let parsed = extract_mcp_auth_required_metadata(&metadata).expect("expected metadata");
5408 assert_eq!(parsed.challenge_id, "abc123");
5409 assert_eq!(parsed.authorization_url, "https://example.com/oauth");
5410 assert_eq!(parsed.message, "Authorize first");
5411 assert_eq!(parsed.server.as_deref(), Some("arcade"));
5412 assert!(parsed.pending);
5413 assert!(parsed.blocked);
5414 assert_eq!(parsed.retry_after_ms, Some(8000));
5415 }
5416
5417 #[test]
5418 fn auth_required_output_detector_matches_auth_text() {
5419 assert!(is_auth_required_tool_output(
5420 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com"
5421 ));
5422 assert!(is_auth_required_tool_output(
5423 "Authorization pending for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com\nRetry after 8s."
5424 ));
5425 assert!(!is_auth_required_tool_output("Tool `read` result: ok"));
5426 }
5427
5428 #[test]
5429 fn guard_budget_output_detector_matches_expected_text() {
5430 assert!(is_guard_budget_tool_output(
5431 "Tool `mcp.arcade.gmail_sendemail` call skipped: per-run guard budget exceeded (10)."
5432 ));
5433 assert!(!is_guard_budget_tool_output("Tool `read` result: ok"));
5434 }
5435
5436 #[test]
5437 fn summarize_guard_budget_outputs_returns_run_scoped_message() {
5438 let outputs = vec![
5439 "Tool `mcp.arcade.gmail_sendemail` call skipped: per-run guard budget exceeded (10)."
5440 .to_string(),
5441 "Tool `mcp.arcade.jira_getboards` call skipped: per-run guard budget exceeded (10)."
5442 .to_string(),
5443 ];
5444 let summary = summarize_guard_budget_outputs(&outputs).expect("expected summary");
5445 assert!(summary.contains("per-run tool guard budget"));
5446 assert!(summary.contains("fresh run"));
5447 }
5448
5449 #[test]
5450 fn duplicate_signature_output_detector_matches_expected_text() {
5451 assert!(is_duplicate_signature_limit_output(
5452 "Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
5453 ));
5454 assert!(!is_duplicate_signature_limit_output(
5455 "Tool `read` result: ok"
5456 ));
5457 }
5458
5459 #[test]
5460 fn summarize_duplicate_signature_outputs_returns_run_scoped_message() {
5461 let outputs = vec![
5462 "Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
5463 .to_string(),
5464 "Tool `bash` call skipped: duplicate call signature retry limit reached (2)."
5465 .to_string(),
5466 ];
5467 let summary =
5468 summarize_duplicate_signature_outputs(&outputs).expect("expected duplicate summary");
5469 assert!(summary.contains("same tool call kept repeating"));
5470 assert!(summary.contains("clearer command target"));
5471 }
5472
5473 #[test]
5474 fn summarize_auth_pending_outputs_returns_summary_when_all_are_auth_related() {
5475 let outputs = vec![
5476 "Authorization pending for `mcp.arcade.gmail_sendemail`.\nAuthorize here: https://example.com/a".to_string(),
5477 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com/b".to_string(),
5478 ];
5479 let summary = summarize_auth_pending_outputs(&outputs).expect("summary expected");
5480 assert!(summary.contains("Authorization is required before I can continue"));
5481 assert!(summary.contains("gmail_sendemail"));
5482 assert!(summary.contains("gmail_whoami"));
5483 }
5484
5485 #[test]
5486 fn summarize_auth_pending_outputs_returns_none_for_mixed_outputs() {
5487 let outputs = vec![
5488 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com".to_string(),
5489 "Tool `read` result:\nok".to_string(),
5490 ];
5491 assert!(summarize_auth_pending_outputs(&outputs).is_none());
5492 }
5493
5494 #[test]
5495 fn parse_budget_override_zero_disables_budget() {
5496 unsafe {
5497 std::env::set_var("TANDEM_TOOL_BUDGET_DEFAULT", "0");
5498 }
5499 assert_eq!(
5500 parse_budget_override("TANDEM_TOOL_BUDGET_DEFAULT"),
5501 Some(usize::MAX)
5502 );
5503 unsafe {
5504 std::env::remove_var("TANDEM_TOOL_BUDGET_DEFAULT");
5505 }
5506 }
5507
5508 #[test]
5509 fn disable_tool_guard_budgets_env_overrides_all_budgets() {
5510 unsafe {
5511 std::env::set_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS", "1");
5512 }
5513 assert_eq!(tool_budget_for("mcp.arcade.gmail_sendemail"), usize::MAX);
5514 assert_eq!(tool_budget_for("websearch"), usize::MAX);
5515 unsafe {
5516 std::env::remove_var("TANDEM_DISABLE_TOOL_GUARD_BUDGETS");
5517 }
5518 }
5519}