1use chrono::Utc;
2use futures::future::BoxFuture;
3use futures::StreamExt;
4use serde_json::{json, Map, Number, Value};
5use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
6use std::hash::{Hash, Hasher};
7use std::path::{Path, PathBuf};
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatAttachment, ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12 EngineEvent, HostOs, HostRuntimeContext, Message, MessagePart, MessagePartInput, MessageRole,
13 ModelSpec, PathStyle, SendMessageRequest, ShellFamily,
14};
15use tandem_wire::WireMessagePart;
16use tokio_util::sync::CancellationToken;
17use tracing::Level;
18
19use crate::{
20 derive_session_title_from_prompt, title_needs_repair, AgentDefinition, AgentRegistry,
21 CancellationRegistry, EventBus, PermissionAction, PermissionManager, PluginRegistry, Storage,
22};
23use tokio::sync::RwLock;
24
25#[derive(Default)]
26struct StreamedToolCall {
27 name: String,
28 args: String,
29}
30
31#[derive(Debug, Clone)]
32pub struct SpawnAgentToolContext {
33 pub session_id: String,
34 pub message_id: String,
35 pub tool_call_id: Option<String>,
36 pub args: Value,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnAgentToolResult {
41 pub output: String,
42 pub metadata: Value,
43}
44
45#[derive(Debug, Clone)]
46pub struct ToolPolicyContext {
47 pub session_id: String,
48 pub message_id: String,
49 pub tool: String,
50 pub args: Value,
51}
52
53#[derive(Debug, Clone)]
54pub struct ToolPolicyDecision {
55 pub allowed: bool,
56 pub reason: Option<String>,
57}
58
59pub trait SpawnAgentHook: Send + Sync {
60 fn spawn_agent(
61 &self,
62 ctx: SpawnAgentToolContext,
63 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>>;
64}
65
66pub trait ToolPolicyHook: Send + Sync {
67 fn evaluate_tool(
68 &self,
69 ctx: ToolPolicyContext,
70 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>>;
71}
72
73#[derive(Debug, Clone)]
74pub struct PromptContextHookContext {
75 pub session_id: String,
76 pub message_id: String,
77 pub provider_id: String,
78 pub model_id: String,
79 pub iteration: usize,
80}
81
82pub trait PromptContextHook: Send + Sync {
83 fn augment_provider_messages(
84 &self,
85 ctx: PromptContextHookContext,
86 messages: Vec<ChatMessage>,
87 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>>;
88}
89
90#[derive(Clone)]
91pub struct EngineLoop {
92 storage: std::sync::Arc<Storage>,
93 event_bus: EventBus,
94 providers: ProviderRegistry,
95 plugins: PluginRegistry,
96 agents: AgentRegistry,
97 permissions: PermissionManager,
98 tools: ToolRegistry,
99 cancellations: CancellationRegistry,
100 host_runtime_context: HostRuntimeContext,
101 workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
102 session_allowed_tools: std::sync::Arc<RwLock<HashMap<String, Vec<String>>>>,
103 spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
104 tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
105 prompt_context_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn PromptContextHook>>>>,
106}
107
108impl EngineLoop {
109 #[allow(clippy::too_many_arguments)]
110 pub fn new(
111 storage: std::sync::Arc<Storage>,
112 event_bus: EventBus,
113 providers: ProviderRegistry,
114 plugins: PluginRegistry,
115 agents: AgentRegistry,
116 permissions: PermissionManager,
117 tools: ToolRegistry,
118 cancellations: CancellationRegistry,
119 host_runtime_context: HostRuntimeContext,
120 ) -> Self {
121 Self {
122 storage,
123 event_bus,
124 providers,
125 plugins,
126 agents,
127 permissions,
128 tools,
129 cancellations,
130 host_runtime_context,
131 workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
132 session_allowed_tools: std::sync::Arc::new(RwLock::new(HashMap::new())),
133 spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
134 tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
135 prompt_context_hook: std::sync::Arc::new(RwLock::new(None)),
136 }
137 }
138
139 pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
140 *self.spawn_agent_hook.write().await = Some(hook);
141 }
142
143 pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
144 *self.tool_policy_hook.write().await = Some(hook);
145 }
146
147 pub async fn set_prompt_context_hook(&self, hook: std::sync::Arc<dyn PromptContextHook>) {
148 *self.prompt_context_hook.write().await = Some(hook);
149 }
150
151 pub async fn set_session_allowed_tools(&self, session_id: &str, allowed_tools: Vec<String>) {
152 let normalized = allowed_tools
153 .into_iter()
154 .map(|tool| normalize_tool_name(&tool))
155 .filter(|tool| !tool.trim().is_empty())
156 .collect::<Vec<_>>();
157 self.session_allowed_tools
158 .write()
159 .await
160 .insert(session_id.to_string(), normalized);
161 }
162
163 pub async fn clear_session_allowed_tools(&self, session_id: &str) {
164 self.session_allowed_tools.write().await.remove(session_id);
165 }
166
167 pub async fn grant_workspace_override_for_session(
168 &self,
169 session_id: &str,
170 ttl_seconds: u64,
171 ) -> u64 {
172 let expires_at = chrono::Utc::now()
173 .timestamp_millis()
174 .max(0)
175 .saturating_add((ttl_seconds as i64).saturating_mul(1000))
176 as u64;
177 self.workspace_overrides
178 .write()
179 .await
180 .insert(session_id.to_string(), expires_at);
181 expires_at
182 }
183
184 pub async fn run_prompt_async(
185 &self,
186 session_id: String,
187 req: SendMessageRequest,
188 ) -> anyhow::Result<()> {
189 self.run_prompt_async_with_context(session_id, req, None)
190 .await
191 }
192
193 pub async fn run_prompt_async_with_context(
194 &self,
195 session_id: String,
196 req: SendMessageRequest,
197 correlation_id: Option<String>,
198 ) -> anyhow::Result<()> {
199 let session_model = self
200 .storage
201 .get_session(&session_id)
202 .await
203 .and_then(|s| s.model);
204 let (provider_id, model_id_value) =
205 resolve_model_route(req.model.as_ref(), session_model.as_ref()).ok_or_else(|| {
206 anyhow::anyhow!(
207 "MODEL_SELECTION_REQUIRED: explicit provider/model is required for this request."
208 )
209 })?;
210 let correlation_ref = correlation_id.as_deref();
211 let model_id = Some(model_id_value.as_str());
212 let cancel = self.cancellations.create(&session_id).await;
213 emit_event(
214 Level::INFO,
215 ProcessKind::Engine,
216 ObservabilityEvent {
217 event: "provider.call.start",
218 component: "engine.loop",
219 correlation_id: correlation_ref,
220 session_id: Some(&session_id),
221 run_id: None,
222 message_id: None,
223 provider_id: Some(provider_id.as_str()),
224 model_id,
225 status: Some("start"),
226 error_code: None,
227 detail: Some("run_prompt_async dispatch"),
228 },
229 );
230 self.event_bus.publish(EngineEvent::new(
231 "session.status",
232 json!({"sessionID": session_id, "status":"running"}),
233 ));
234 let request_parts = req.parts.clone();
235 let text = req
236 .parts
237 .iter()
238 .map(|p| match p {
239 MessagePartInput::Text { text } => text.clone(),
240 MessagePartInput::File {
241 mime,
242 filename,
243 url,
244 } => format!(
245 "[file mime={} name={} url={}]",
246 mime,
247 filename.clone().unwrap_or_else(|| "unknown".to_string()),
248 url
249 ),
250 })
251 .collect::<Vec<_>>()
252 .join("\n");
253 let runtime_attachments = build_runtime_attachments(&provider_id, &request_parts).await;
254 self.auto_rename_session_from_user_text(&session_id, &text)
255 .await;
256 let active_agent = self.agents.get(req.agent.as_deref()).await;
257 let mut user_message_id = self
258 .find_recent_matching_user_message_id(&session_id, &text)
259 .await;
260 if user_message_id.is_none() {
261 let user_message = Message::new(
262 MessageRole::User,
263 vec![MessagePart::Text { text: text.clone() }],
264 );
265 let created_message_id = user_message.id.clone();
266 self.storage
267 .append_message(&session_id, user_message)
268 .await?;
269
270 let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
271 self.event_bus.publish(EngineEvent::new(
272 "message.part.updated",
273 json!({
274 "part": user_part,
275 "delta": text,
276 "agent": active_agent.name
277 }),
278 ));
279 user_message_id = Some(created_message_id);
280 }
281 let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
282
283 if cancel.is_cancelled() {
284 self.event_bus.publish(EngineEvent::new(
285 "session.status",
286 json!({"sessionID": session_id, "status":"cancelled"}),
287 ));
288 self.cancellations.remove(&session_id).await;
289 return Ok(());
290 }
291
292 let mut question_tool_used = false;
293 let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
294 if normalize_tool_name(&tool) == "question" {
295 question_tool_used = true;
296 }
297 if !agent_can_use_tool(&active_agent, &tool) {
298 format!(
299 "Tool `{tool}` is not enabled for agent `{}`.",
300 active_agent.name
301 )
302 } else {
303 self.execute_tool_with_permission(
304 &session_id,
305 &user_message_id,
306 tool.clone(),
307 args,
308 active_agent.skills.as_deref(),
309 &text,
310 None,
311 cancel.clone(),
312 )
313 .await?
314 .unwrap_or_default()
315 }
316 } else {
317 let mut completion = String::new();
318 let mut max_iterations = 25usize;
319 let mut followup_context: Option<String> = None;
320 let mut last_tool_outputs: Vec<String> = Vec::new();
321 let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
322 let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
323 let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
324 let mut shell_mismatch_signatures: HashSet<String> = HashSet::new();
325 let mut blocked_mcp_servers: HashSet<String> = HashSet::new();
326 let mut websearch_query_blocked = false;
327 let mut auto_workspace_probe_attempted = false;
328
329 while max_iterations > 0 && !cancel.is_cancelled() {
330 let iteration = 26usize.saturating_sub(max_iterations);
331 max_iterations -= 1;
332 let mut messages = load_chat_history(self.storage.clone(), &session_id).await;
333 if iteration == 1 && !runtime_attachments.is_empty() {
334 attach_to_last_user_message(&mut messages, &runtime_attachments);
335 }
336 let mut system_parts =
337 vec![tandem_runtime_system_prompt(&self.host_runtime_context)];
338 if let Some(system) = active_agent.system_prompt.as_ref() {
339 system_parts.push(system.clone());
340 }
341 messages.insert(
342 0,
343 ChatMessage {
344 role: "system".to_string(),
345 content: system_parts.join("\n\n"),
346 attachments: Vec::new(),
347 },
348 );
349 if let Some(extra) = followup_context.take() {
350 messages.push(ChatMessage {
351 role: "user".to_string(),
352 content: extra,
353 attachments: Vec::new(),
354 });
355 }
356 if let Some(hook) = self.prompt_context_hook.read().await.clone() {
357 let ctx = PromptContextHookContext {
358 session_id: session_id.clone(),
359 message_id: user_message_id.clone(),
360 provider_id: provider_id.clone(),
361 model_id: model_id_value.clone(),
362 iteration,
363 };
364 if let Ok(augmented) =
365 hook.augment_provider_messages(ctx, messages.clone()).await
366 {
367 messages = augmented;
368 }
369 }
370 let mut tool_schemas = self.tools.list().await;
371 if active_agent.tools.is_some() {
372 tool_schemas.retain(|schema| agent_can_use_tool(&active_agent, &schema.name));
373 }
374 tool_schemas.retain(|schema| {
375 let normalized = normalize_tool_name(&schema.name);
376 if let Some(server) = mcp_server_from_tool_name(&normalized) {
377 !blocked_mcp_servers.contains(server)
378 } else {
379 true
380 }
381 });
382 if let Some(allowed_tools) = self
383 .session_allowed_tools
384 .read()
385 .await
386 .get(&session_id)
387 .cloned()
388 {
389 if !allowed_tools.is_empty() {
390 tool_schemas.retain(|schema| {
391 let normalized = normalize_tool_name(&schema.name);
392 allowed_tools.iter().any(|tool| tool == &normalized)
393 });
394 }
395 }
396 if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
397 let detail = validation_err.to_string();
398 emit_event(
399 Level::ERROR,
400 ProcessKind::Engine,
401 ObservabilityEvent {
402 event: "provider.call.error",
403 component: "engine.loop",
404 correlation_id: correlation_ref,
405 session_id: Some(&session_id),
406 run_id: None,
407 message_id: Some(&user_message_id),
408 provider_id: Some(provider_id.as_str()),
409 model_id,
410 status: Some("failed"),
411 error_code: Some("TOOL_SCHEMA_INVALID"),
412 detail: Some(&detail),
413 },
414 );
415 anyhow::bail!("{detail}");
416 }
417 let stream = self
418 .providers
419 .stream_for_provider(
420 Some(provider_id.as_str()),
421 Some(model_id_value.as_str()),
422 messages,
423 Some(tool_schemas),
424 cancel.clone(),
425 )
426 .await
427 .inspect_err(|err| {
428 let error_text = err.to_string();
429 let error_code = provider_error_code(&error_text);
430 let detail = truncate_text(&error_text, 500);
431 emit_event(
432 Level::ERROR,
433 ProcessKind::Engine,
434 ObservabilityEvent {
435 event: "provider.call.error",
436 component: "engine.loop",
437 correlation_id: correlation_ref,
438 session_id: Some(&session_id),
439 run_id: None,
440 message_id: Some(&user_message_id),
441 provider_id: Some(provider_id.as_str()),
442 model_id,
443 status: Some("failed"),
444 error_code: Some(error_code),
445 detail: Some(&detail),
446 },
447 );
448 })?;
449 tokio::pin!(stream);
450 completion.clear();
451 let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
452 let mut provider_usage: Option<TokenUsage> = None;
453 while let Some(chunk) = stream.next().await {
454 let chunk = match chunk {
455 Ok(chunk) => chunk,
456 Err(err) => {
457 let error_text = err.to_string();
458 let error_code = provider_error_code(&error_text);
459 let detail = truncate_text(&error_text, 500);
460 emit_event(
461 Level::ERROR,
462 ProcessKind::Engine,
463 ObservabilityEvent {
464 event: "provider.call.error",
465 component: "engine.loop",
466 correlation_id: correlation_ref,
467 session_id: Some(&session_id),
468 run_id: None,
469 message_id: Some(&user_message_id),
470 provider_id: Some(provider_id.as_str()),
471 model_id,
472 status: Some("failed"),
473 error_code: Some(error_code),
474 detail: Some(&detail),
475 },
476 );
477 return Err(anyhow::anyhow!(
478 "provider stream chunk error: {error_text}"
479 ));
480 }
481 };
482 match chunk {
483 StreamChunk::TextDelta(delta) => {
484 let delta = strip_model_control_markers(&delta);
485 if delta.trim().is_empty() {
486 continue;
487 }
488 if completion.is_empty() {
489 emit_event(
490 Level::INFO,
491 ProcessKind::Engine,
492 ObservabilityEvent {
493 event: "provider.call.first_byte",
494 component: "engine.loop",
495 correlation_id: correlation_ref,
496 session_id: Some(&session_id),
497 run_id: None,
498 message_id: Some(&user_message_id),
499 provider_id: Some(provider_id.as_str()),
500 model_id,
501 status: Some("streaming"),
502 error_code: None,
503 detail: Some("first text delta"),
504 },
505 );
506 }
507 completion.push_str(&delta);
508 let delta = truncate_text(&delta, 4_000);
509 let delta_part =
510 WireMessagePart::text(&session_id, &user_message_id, delta.clone());
511 self.event_bus.publish(EngineEvent::new(
512 "message.part.updated",
513 json!({"part": delta_part, "delta": delta}),
514 ));
515 }
516 StreamChunk::ReasoningDelta(_reasoning) => {}
517 StreamChunk::Done {
518 finish_reason: _,
519 usage,
520 } => {
521 if usage.is_some() {
522 provider_usage = usage;
523 }
524 break;
525 }
526 StreamChunk::ToolCallStart { id, name } => {
527 let entry = streamed_tool_calls.entry(id).or_default();
528 if entry.name.is_empty() {
529 entry.name = name;
530 }
531 }
532 StreamChunk::ToolCallDelta { id, args_delta } => {
533 let entry = streamed_tool_calls.entry(id.clone()).or_default();
534 entry.args.push_str(&args_delta);
535 let tool_name = if entry.name.trim().is_empty() {
536 "tool".to_string()
537 } else {
538 normalize_tool_name(&entry.name)
539 };
540 let parsed_preview = if entry.name.trim().is_empty() {
541 Value::String(truncate_text(&entry.args, 1_000))
542 } else {
543 parse_streamed_tool_args(&tool_name, &entry.args)
544 };
545 let mut tool_part = WireMessagePart::tool_invocation(
546 &session_id,
547 &user_message_id,
548 tool_name.clone(),
549 json!({}),
550 );
551 tool_part.id = Some(id.clone());
552 self.event_bus.publish(EngineEvent::new(
553 "message.part.updated",
554 json!({
555 "part": tool_part,
556 "toolCallDelta": {
557 "id": id,
558 "tool": tool_name,
559 "argsDelta": truncate_text(&args_delta, 1_000),
560 "parsedArgsPreview": parsed_preview
561 }
562 }),
563 ));
564 }
565 StreamChunk::ToolCallEnd { id: _ } => {}
566 }
567 if cancel.is_cancelled() {
568 break;
569 }
570 }
571
572 let mut tool_calls = streamed_tool_calls
573 .into_values()
574 .filter_map(|call| {
575 if call.name.trim().is_empty() {
576 return None;
577 }
578 let tool_name = normalize_tool_name(&call.name);
579 let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
580 Some((tool_name, parsed_args))
581 })
582 .collect::<Vec<_>>();
583 if tool_calls.is_empty() {
584 tool_calls = parse_tool_invocations_from_response(&completion);
585 }
586 if tool_calls.is_empty()
587 && !auto_workspace_probe_attempted
588 && should_force_workspace_probe(&text, &completion)
589 {
590 auto_workspace_probe_attempted = true;
591 tool_calls = vec![("glob".to_string(), json!({ "pattern": "*" }))];
592 }
593 if !tool_calls.is_empty() {
594 let mut outputs = Vec::new();
595 let mut executed_productive_tool = false;
596 let mut auth_required_hit_in_cycle = false;
597 let mut guard_budget_hit_in_cycle = false;
598 for (tool, args) in tool_calls {
599 if !agent_can_use_tool(&active_agent, &tool) {
600 continue;
601 }
602 let tool_key = normalize_tool_name(&tool);
603 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
604 if blocked_mcp_servers.contains(server) {
605 outputs.push(format!(
606 "Tool `{}` call skipped: authorization is still pending for MCP server `{}`.",
607 tool_key, server
608 ));
609 continue;
610 }
611 }
612 if tool_key == "question" {
613 question_tool_used = true;
614 }
615 if websearch_query_blocked && tool_key == "websearch" {
616 outputs.push(
617 "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
618 .to_string(),
619 );
620 continue;
621 }
622 let mut effective_args = args.clone();
623 if tool_key == "todo_write" {
624 effective_args = normalize_todo_write_args(effective_args, &completion);
625 if is_empty_todo_write_args(&effective_args) {
626 outputs.push(
627 "Tool `todo_write` call skipped: empty todo payload."
628 .to_string(),
629 );
630 continue;
631 }
632 }
633 let signature = if tool_key == "batch" {
634 batch_tool_signature(&args)
635 .unwrap_or_else(|| tool_signature(&tool_key, &args))
636 } else {
637 tool_signature(&tool_key, &args)
638 };
639 if is_shell_tool_name(&tool_key)
640 && shell_mismatch_signatures.contains(&signature)
641 {
642 outputs.push(
643 "Tool `bash` call skipped: previous invocation hit an OS/path mismatch. Use `read`, `glob`, or `grep`."
644 .to_string(),
645 );
646 continue;
647 }
648 let mut signature_count = 1usize;
649 if is_read_only_tool(&tool_key)
650 || (tool_key == "batch" && is_read_only_batch_call(&args))
651 {
652 let count = readonly_signature_counts
653 .entry(signature.clone())
654 .and_modify(|v| *v = v.saturating_add(1))
655 .or_insert(1);
656 signature_count = *count;
657 if tool_key == "websearch" && *count > 2 {
658 self.event_bus.publish(EngineEvent::new(
659 "tool.loop_guard.triggered",
660 json!({
661 "sessionID": session_id,
662 "messageID": user_message_id,
663 "tool": tool_key,
664 "reason": "duplicate_signature_retry_exhausted",
665 "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
666 "loop_guard_triggered": true
667 }),
668 ));
669 outputs.push(
670 "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
671 .to_string(),
672 );
673 continue;
674 }
675 if tool_key != "websearch" && *count > 1 {
676 if let Some(cached) = readonly_tool_cache.get(&signature) {
677 outputs.push(cached.clone());
678 } else {
679 outputs.push(format!(
680 "Tool `{}` call skipped: duplicate call signature detected.",
681 tool_key
682 ));
683 }
684 continue;
685 }
686 }
687 let budget = tool_budget_for(&tool_key);
688 let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
689 if *entry >= budget {
690 outputs.push(format!(
691 "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
692 tool_key, budget
693 ));
694 guard_budget_hit_in_cycle = true;
695 continue;
696 }
697 *entry += 1;
698 if let Some(output) = self
699 .execute_tool_with_permission(
700 &session_id,
701 &user_message_id,
702 tool,
703 effective_args,
704 active_agent.skills.as_deref(),
705 &text,
706 Some(&completion),
707 cancel.clone(),
708 )
709 .await?
710 {
711 let productive = !(tool_key == "batch"
712 && is_non_productive_batch_output(&output))
713 && !is_auth_required_tool_output(&output);
714 if output.contains("WEBSEARCH_QUERY_MISSING") {
715 websearch_query_blocked = true;
716 }
717 if is_shell_tool_name(&tool_key) && is_os_mismatch_tool_output(&output)
718 {
719 shell_mismatch_signatures.insert(signature.clone());
720 }
721 if is_read_only_tool(&tool_key)
722 && tool_key != "websearch"
723 && signature_count == 1
724 {
725 readonly_tool_cache.insert(signature, output.clone());
726 }
727 if productive {
728 executed_productive_tool = true;
729 }
730 if is_auth_required_tool_output(&output) {
731 if let Some(server) = mcp_server_from_tool_name(&tool_key) {
732 blocked_mcp_servers.insert(server.to_string());
733 }
734 auth_required_hit_in_cycle = true;
735 }
736 outputs.push(output);
737 if auth_required_hit_in_cycle {
738 break;
739 }
740 if guard_budget_hit_in_cycle {
741 break;
742 }
743 }
744 }
745 if !outputs.is_empty() {
746 last_tool_outputs = outputs.clone();
747 let guard_budget_hit =
748 outputs.iter().any(|o| is_guard_budget_tool_output(o));
749 if executed_productive_tool {
750 followup_context = Some(format!(
751 "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
752 summarize_tool_outputs(&outputs)
753 ));
754 continue;
755 }
756 if guard_budget_hit {
757 completion = summarize_guard_budget_outputs(&outputs)
758 .unwrap_or_else(|| {
759 "This run hit the per-run tool guard budget, so tool execution was paused to avoid retries. Send a new message to start a fresh run.".to_string()
760 });
761 } else if let Some(summary) = summarize_auth_pending_outputs(&outputs) {
762 completion = summary;
763 } else {
764 completion.clear();
765 }
766 break;
767 }
768 }
769
770 if let Some(usage) = provider_usage {
771 self.event_bus.publish(EngineEvent::new(
772 "provider.usage",
773 json!({
774 "sessionID": session_id,
775 "messageID": user_message_id,
776 "promptTokens": usage.prompt_tokens,
777 "completionTokens": usage.completion_tokens,
778 "totalTokens": usage.total_tokens,
779 }),
780 ));
781 }
782
783 break;
784 }
785 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
786 if let Some(narrative) = self
787 .generate_final_narrative_without_tools(
788 &session_id,
789 &active_agent,
790 Some(provider_id.as_str()),
791 Some(model_id_value.as_str()),
792 cancel.clone(),
793 &last_tool_outputs,
794 )
795 .await
796 {
797 completion = narrative;
798 }
799 }
800 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
801 let preview = last_tool_outputs
802 .iter()
803 .take(3)
804 .map(|o| truncate_text(o, 240))
805 .collect::<Vec<_>>()
806 .join("\n");
807 completion = format!(
808 "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
809 preview
810 );
811 }
812 completion = strip_model_control_markers(&completion);
813 truncate_text(&completion, 16_000)
814 };
815 emit_event(
816 Level::INFO,
817 ProcessKind::Engine,
818 ObservabilityEvent {
819 event: "provider.call.finish",
820 component: "engine.loop",
821 correlation_id: correlation_ref,
822 session_id: Some(&session_id),
823 run_id: None,
824 message_id: Some(&user_message_id),
825 provider_id: Some(provider_id.as_str()),
826 model_id,
827 status: Some("ok"),
828 error_code: None,
829 detail: Some("provider stream complete"),
830 },
831 );
832 if active_agent.name.eq_ignore_ascii_case("plan") {
833 emit_plan_todo_fallback(
834 self.storage.clone(),
835 &self.event_bus,
836 &session_id,
837 &user_message_id,
838 &completion,
839 )
840 .await;
841 let todos_after_fallback = self.storage.get_todos(&session_id).await;
842 if todos_after_fallback.is_empty() && !question_tool_used {
843 emit_plan_question_fallback(
844 self.storage.clone(),
845 &self.event_bus,
846 &session_id,
847 &user_message_id,
848 &completion,
849 )
850 .await;
851 }
852 }
853 if cancel.is_cancelled() {
854 self.event_bus.publish(EngineEvent::new(
855 "session.status",
856 json!({"sessionID": session_id, "status":"cancelled"}),
857 ));
858 self.cancellations.remove(&session_id).await;
859 return Ok(());
860 }
861 let assistant = Message::new(
862 MessageRole::Assistant,
863 vec![MessagePart::Text {
864 text: completion.clone(),
865 }],
866 );
867 let assistant_message_id = assistant.id.clone();
868 self.storage.append_message(&session_id, assistant).await?;
869 let final_part = WireMessagePart::text(
870 &session_id,
871 &assistant_message_id,
872 truncate_text(&completion, 16_000),
873 );
874 self.event_bus.publish(EngineEvent::new(
875 "message.part.updated",
876 json!({"part": final_part}),
877 ));
878 self.event_bus.publish(EngineEvent::new(
879 "session.updated",
880 json!({"sessionID": session_id, "status":"idle"}),
881 ));
882 self.event_bus.publish(EngineEvent::new(
883 "session.status",
884 json!({"sessionID": session_id, "status":"idle"}),
885 ));
886 self.cancellations.remove(&session_id).await;
887 Ok(())
888 }
889
890 pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
891 self.providers.default_complete(&prompt).await
892 }
893
894 pub async fn run_oneshot_for_provider(
895 &self,
896 prompt: String,
897 provider_id: Option<&str>,
898 ) -> anyhow::Result<String> {
899 self.providers
900 .complete_for_provider(provider_id, &prompt, None)
901 .await
902 }
903
904 #[allow(clippy::too_many_arguments)]
905 async fn execute_tool_with_permission(
906 &self,
907 session_id: &str,
908 message_id: &str,
909 tool: String,
910 args: Value,
911 equipped_skills: Option<&[String]>,
912 latest_user_text: &str,
913 latest_assistant_context: Option<&str>,
914 cancel: CancellationToken,
915 ) -> anyhow::Result<Option<String>> {
916 let tool = normalize_tool_name(&tool);
917 let normalized = normalize_tool_args(
918 &tool,
919 args,
920 latest_user_text,
921 latest_assistant_context.unwrap_or_default(),
922 );
923 self.event_bus.publish(EngineEvent::new(
924 "tool.args.normalized",
925 json!({
926 "sessionID": session_id,
927 "messageID": message_id,
928 "tool": tool,
929 "argsSource": normalized.args_source,
930 "argsIntegrity": normalized.args_integrity,
931 "query": normalized.query,
932 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
933 "requestID": Value::Null
934 }),
935 ));
936 if normalized.args_integrity == "recovered" {
937 self.event_bus.publish(EngineEvent::new(
938 "tool.args.recovered",
939 json!({
940 "sessionID": session_id,
941 "messageID": message_id,
942 "tool": tool,
943 "argsSource": normalized.args_source,
944 "query": normalized.query,
945 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
946 "requestID": Value::Null
947 }),
948 ));
949 }
950 if normalized.missing_terminal {
951 let missing_reason = normalized
952 .missing_terminal_reason
953 .clone()
954 .unwrap_or_else(|| "TOOL_ARGUMENTS_MISSING".to_string());
955 self.event_bus.publish(EngineEvent::new(
956 "tool.args.missing_terminal",
957 json!({
958 "sessionID": session_id,
959 "messageID": message_id,
960 "tool": tool,
961 "argsSource": normalized.args_source,
962 "argsIntegrity": normalized.args_integrity,
963 "requestID": Value::Null,
964 "error": missing_reason
965 }),
966 ));
967 let mut failed_part =
968 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
969 failed_part.state = Some("failed".to_string());
970 failed_part.error = Some(missing_reason.clone());
971 self.event_bus.publish(EngineEvent::new(
972 "message.part.updated",
973 json!({"part": failed_part}),
974 ));
975 return Ok(Some(missing_reason));
976 }
977
978 let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
979 Ok(args) => args,
980 Err(message) => return Ok(Some(message)),
981 };
982 if let Some(allowed_tools) = self
983 .session_allowed_tools
984 .read()
985 .await
986 .get(session_id)
987 .cloned()
988 {
989 if !allowed_tools.is_empty() && !allowed_tools.iter().any(|name| name == &tool) {
990 return Ok(Some(format!("Tool `{tool}` is not allowed for this run.")));
991 }
992 }
993 if let Some(hook) = self.tool_policy_hook.read().await.clone() {
994 let decision = hook
995 .evaluate_tool(ToolPolicyContext {
996 session_id: session_id.to_string(),
997 message_id: message_id.to_string(),
998 tool: tool.clone(),
999 args: args.clone(),
1000 })
1001 .await?;
1002 if !decision.allowed {
1003 let reason = decision
1004 .reason
1005 .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
1006 let mut blocked_part =
1007 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1008 blocked_part.state = Some("failed".to_string());
1009 blocked_part.error = Some(reason.clone());
1010 self.event_bus.publish(EngineEvent::new(
1011 "message.part.updated",
1012 json!({"part": blocked_part}),
1013 ));
1014 return Ok(Some(reason));
1015 }
1016 }
1017 let mut tool_call_id: Option<String> = None;
1018 if let Some(violation) = self
1019 .workspace_sandbox_violation(session_id, &tool, &args)
1020 .await
1021 {
1022 let mut blocked_part =
1023 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1024 blocked_part.state = Some("failed".to_string());
1025 blocked_part.error = Some(violation.clone());
1026 self.event_bus.publish(EngineEvent::new(
1027 "message.part.updated",
1028 json!({"part": blocked_part}),
1029 ));
1030 return Ok(Some(violation));
1031 }
1032 let rule = self
1033 .plugins
1034 .permission_override(&tool)
1035 .await
1036 .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
1037 if matches!(rule, PermissionAction::Deny) {
1038 return Ok(Some(format!(
1039 "Permission denied for tool `{tool}` by policy."
1040 )));
1041 }
1042
1043 let mut effective_args = args.clone();
1044 if matches!(rule, PermissionAction::Ask) {
1045 let pending = self
1046 .permissions
1047 .ask_for_session_with_context(
1048 Some(session_id),
1049 &tool,
1050 args.clone(),
1051 Some(crate::PermissionArgsContext {
1052 args_source: normalized.args_source.clone(),
1053 args_integrity: normalized.args_integrity.clone(),
1054 query: normalized.query.clone(),
1055 }),
1056 )
1057 .await;
1058 let mut pending_part = WireMessagePart::tool_invocation(
1059 session_id,
1060 message_id,
1061 tool.clone(),
1062 args.clone(),
1063 );
1064 pending_part.id = Some(pending.id.clone());
1065 tool_call_id = Some(pending.id.clone());
1066 pending_part.state = Some("pending".to_string());
1067 self.event_bus.publish(EngineEvent::new(
1068 "message.part.updated",
1069 json!({"part": pending_part}),
1070 ));
1071 let reply = self
1072 .permissions
1073 .wait_for_reply(&pending.id, cancel.clone())
1074 .await;
1075 if cancel.is_cancelled() {
1076 return Ok(None);
1077 }
1078 let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
1079 if !approved {
1080 let mut denied_part =
1081 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1082 denied_part.id = Some(pending.id);
1083 denied_part.state = Some("denied".to_string());
1084 denied_part.error = Some("Permission denied by user".to_string());
1085 self.event_bus.publish(EngineEvent::new(
1086 "message.part.updated",
1087 json!({"part": denied_part}),
1088 ));
1089 return Ok(Some(format!(
1090 "Permission denied for tool `{tool}` by user."
1091 )));
1092 }
1093 effective_args = args;
1094 }
1095
1096 let mut args = self.plugins.inject_tool_args(&tool, effective_args).await;
1097 let tool_context = self.resolve_tool_execution_context(session_id).await;
1098 if let Some((workspace_root, effective_cwd)) = tool_context.as_ref() {
1099 if let Some(obj) = args.as_object_mut() {
1100 obj.insert(
1101 "__workspace_root".to_string(),
1102 Value::String(workspace_root.clone()),
1103 );
1104 obj.insert(
1105 "__effective_cwd".to_string(),
1106 Value::String(effective_cwd.clone()),
1107 );
1108 obj.insert(
1109 "__session_id".to_string(),
1110 Value::String(session_id.to_string()),
1111 );
1112 }
1113 tracing::info!(
1114 "tool execution context session_id={} tool={} workspace_root={} effective_cwd={}",
1115 session_id,
1116 tool,
1117 workspace_root,
1118 effective_cwd
1119 );
1120 }
1121 let mut invoke_part =
1122 WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
1123 if let Some(call_id) = tool_call_id.clone() {
1124 invoke_part.id = Some(call_id);
1125 }
1126 let invoke_part_id = invoke_part.id.clone();
1127 self.event_bus.publish(EngineEvent::new(
1128 "message.part.updated",
1129 json!({"part": invoke_part}),
1130 ));
1131 let args_for_side_events = args.clone();
1132 if tool == "spawn_agent" {
1133 let hook = self.spawn_agent_hook.read().await.clone();
1134 if let Some(hook) = hook {
1135 let spawned = hook
1136 .spawn_agent(SpawnAgentToolContext {
1137 session_id: session_id.to_string(),
1138 message_id: message_id.to_string(),
1139 tool_call_id: invoke_part_id.clone(),
1140 args: args_for_side_events.clone(),
1141 })
1142 .await?;
1143 let output = self.plugins.transform_tool_output(spawned.output).await;
1144 let output = truncate_text(&output, 16_000);
1145 emit_tool_side_events(
1146 self.storage.clone(),
1147 &self.event_bus,
1148 ToolSideEventContext {
1149 session_id,
1150 message_id,
1151 tool: &tool,
1152 args: &args_for_side_events,
1153 metadata: &spawned.metadata,
1154 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1155 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1156 },
1157 )
1158 .await;
1159 let mut result_part = WireMessagePart::tool_result(
1160 session_id,
1161 message_id,
1162 tool.clone(),
1163 json!(output.clone()),
1164 );
1165 result_part.id = invoke_part_id;
1166 self.event_bus.publish(EngineEvent::new(
1167 "message.part.updated",
1168 json!({"part": result_part}),
1169 ));
1170 return Ok(Some(truncate_text(
1171 &format!("Tool `{tool}` result:\n{output}"),
1172 16_000,
1173 )));
1174 }
1175 let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
1176 let mut failed_part =
1177 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1178 failed_part.id = invoke_part_id.clone();
1179 failed_part.state = Some("failed".to_string());
1180 failed_part.error = Some(output.to_string());
1181 self.event_bus.publish(EngineEvent::new(
1182 "message.part.updated",
1183 json!({"part": failed_part}),
1184 ));
1185 return Ok(Some(output.to_string()));
1186 }
1187 let result = match self
1188 .tools
1189 .execute_with_cancel(&tool, args, cancel.clone())
1190 .await
1191 {
1192 Ok(result) => result,
1193 Err(err) => {
1194 let err_text = err.to_string();
1195 if let Some(auth) = extract_mcp_auth_required_from_error_text(&tool, &err_text) {
1196 self.event_bus.publish(EngineEvent::new(
1197 "mcp.auth.required",
1198 json!({
1199 "sessionID": session_id,
1200 "messageID": message_id,
1201 "tool": tool.clone(),
1202 "server": auth.server,
1203 "authorizationUrl": auth.authorization_url,
1204 "message": auth.message,
1205 "challengeId": auth.challenge_id
1206 }),
1207 ));
1208 let auth_output = format!(
1209 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1210 tool, auth.message, auth.authorization_url
1211 );
1212 let mut result_part = WireMessagePart::tool_result(
1213 session_id,
1214 message_id,
1215 tool.clone(),
1216 json!(auth_output.clone()),
1217 );
1218 result_part.id = invoke_part_id.clone();
1219 self.event_bus.publish(EngineEvent::new(
1220 "message.part.updated",
1221 json!({"part": result_part}),
1222 ));
1223 return Ok(Some(truncate_text(
1224 &format!("Tool `{tool}` result:\n{auth_output}"),
1225 16_000,
1226 )));
1227 }
1228 let mut failed_part =
1229 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
1230 failed_part.id = invoke_part_id.clone();
1231 failed_part.state = Some("failed".to_string());
1232 failed_part.error = Some(err_text.clone());
1233 self.event_bus.publish(EngineEvent::new(
1234 "message.part.updated",
1235 json!({"part": failed_part}),
1236 ));
1237 return Err(err);
1238 }
1239 };
1240 if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1241 let event_name = if auth.pending && auth.blocked {
1242 "mcp.auth.pending"
1243 } else {
1244 "mcp.auth.required"
1245 };
1246 self.event_bus.publish(EngineEvent::new(
1247 event_name,
1248 json!({
1249 "sessionID": session_id,
1250 "messageID": message_id,
1251 "tool": tool.clone(),
1252 "server": auth.server,
1253 "authorizationUrl": auth.authorization_url,
1254 "message": auth.message,
1255 "challengeId": auth.challenge_id,
1256 "pending": auth.pending,
1257 "blocked": auth.blocked,
1258 "retryAfterMs": auth.retry_after_ms
1259 }),
1260 ));
1261 }
1262 emit_tool_side_events(
1263 self.storage.clone(),
1264 &self.event_bus,
1265 ToolSideEventContext {
1266 session_id,
1267 message_id,
1268 tool: &tool,
1269 args: &args_for_side_events,
1270 metadata: &result.metadata,
1271 workspace_root: tool_context.as_ref().map(|ctx| ctx.0.as_str()),
1272 effective_cwd: tool_context.as_ref().map(|ctx| ctx.1.as_str()),
1273 },
1274 )
1275 .await;
1276 let output = if let Some(auth) = extract_mcp_auth_required_metadata(&result.metadata) {
1277 if auth.pending && auth.blocked {
1278 let retry_after_secs = auth.retry_after_ms.unwrap_or(0).div_ceil(1000);
1279 format!(
1280 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1281 tool, auth.message, auth.authorization_url, retry_after_secs
1282 )
1283 } else {
1284 format!(
1285 "Authorization required for `{}`.\n{}\n\nAuthorize here: {}",
1286 tool, auth.message, auth.authorization_url
1287 )
1288 }
1289 } else {
1290 self.plugins.transform_tool_output(result.output).await
1291 };
1292 let output = truncate_text(&output, 16_000);
1293 let mut result_part = WireMessagePart::tool_result(
1294 session_id,
1295 message_id,
1296 tool.clone(),
1297 json!(output.clone()),
1298 );
1299 result_part.id = invoke_part_id;
1300 self.event_bus.publish(EngineEvent::new(
1301 "message.part.updated",
1302 json!({"part": result_part}),
1303 ));
1304 Ok(Some(truncate_text(
1305 &format!("Tool `{tool}` result:\n{output}"),
1306 16_000,
1307 )))
1308 }
1309
1310 async fn find_recent_matching_user_message_id(
1311 &self,
1312 session_id: &str,
1313 text: &str,
1314 ) -> Option<String> {
1315 let session = self.storage.get_session(session_id).await?;
1316 let last = session.messages.last()?;
1317 if !matches!(last.role, MessageRole::User) {
1318 return None;
1319 }
1320 let age_ms = (Utc::now() - last.created_at).num_milliseconds().max(0) as u64;
1321 if age_ms > 10_000 {
1322 return None;
1323 }
1324 let last_text = last
1325 .parts
1326 .iter()
1327 .filter_map(|part| match part {
1328 MessagePart::Text { text } => Some(text.clone()),
1329 _ => None,
1330 })
1331 .collect::<Vec<_>>()
1332 .join("\n");
1333 if last_text == text {
1334 return Some(last.id.clone());
1335 }
1336 None
1337 }
1338
1339 async fn auto_rename_session_from_user_text(&self, session_id: &str, fallback_text: &str) {
1340 let Some(mut session) = self.storage.get_session(session_id).await else {
1341 return;
1342 };
1343 if !title_needs_repair(&session.title) {
1344 return;
1345 }
1346
1347 let first_user_text = session.messages.iter().find_map(|message| {
1348 if !matches!(message.role, MessageRole::User) {
1349 return None;
1350 }
1351 message.parts.iter().find_map(|part| match part {
1352 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.clone()),
1353 _ => None,
1354 })
1355 });
1356
1357 let source = first_user_text.unwrap_or_else(|| fallback_text.to_string());
1358 let Some(title) = derive_session_title_from_prompt(&source, 60) else {
1359 return;
1360 };
1361
1362 session.title = title;
1363 session.time.updated = Utc::now();
1364 let _ = self.storage.save_session(session).await;
1365 }
1366
1367 async fn workspace_sandbox_violation(
1368 &self,
1369 session_id: &str,
1370 tool: &str,
1371 args: &Value,
1372 ) -> Option<String> {
1373 if self.workspace_override_active(session_id).await {
1374 return None;
1375 }
1376 let session = self.storage.get_session(session_id).await?;
1377 let workspace = session
1378 .workspace_root
1379 .or_else(|| crate::normalize_workspace_path(&session.directory))?;
1380 let workspace_path = PathBuf::from(&workspace);
1381 let candidate_paths = extract_tool_candidate_paths(tool, args);
1382 if candidate_paths.is_empty() {
1383 if is_shell_tool_name(tool) {
1384 if let Some(command) = extract_shell_command(args) {
1385 if shell_command_targets_sensitive_path(&command) {
1386 return Some(format!(
1387 "Sandbox blocked `{tool}` command targeting sensitive paths."
1388 ));
1389 }
1390 }
1391 }
1392 return None;
1393 }
1394 if let Some(sensitive) = candidate_paths.iter().find(|path| {
1395 let raw = Path::new(path);
1396 let resolved = if raw.is_absolute() {
1397 raw.to_path_buf()
1398 } else {
1399 workspace_path.join(raw)
1400 };
1401 is_sensitive_path_candidate(&resolved)
1402 }) {
1403 return Some(format!(
1404 "Sandbox blocked `{tool}` path `{sensitive}` (sensitive path policy)."
1405 ));
1406 }
1407
1408 let outside = candidate_paths.iter().find(|path| {
1409 let raw = Path::new(path);
1410 let resolved = if raw.is_absolute() {
1411 raw.to_path_buf()
1412 } else {
1413 workspace_path.join(raw)
1414 };
1415 !crate::is_within_workspace_root(&resolved, &workspace_path)
1416 })?;
1417 Some(format!(
1418 "Sandbox blocked `{tool}` path `{outside}` (workspace root: `{workspace}`)"
1419 ))
1420 }
1421
1422 async fn resolve_tool_execution_context(&self, session_id: &str) -> Option<(String, String)> {
1423 let session = self.storage.get_session(session_id).await?;
1424 let workspace_root = session
1425 .workspace_root
1426 .or_else(|| crate::normalize_workspace_path(&session.directory))?;
1427 let effective_cwd = if session.directory.trim().is_empty()
1428 || session.directory.trim() == "."
1429 {
1430 workspace_root.clone()
1431 } else {
1432 crate::normalize_workspace_path(&session.directory).unwrap_or(workspace_root.clone())
1433 };
1434 Some((workspace_root, effective_cwd))
1435 }
1436
1437 async fn workspace_override_active(&self, session_id: &str) -> bool {
1438 let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
1439 let mut overrides = self.workspace_overrides.write().await;
1440 overrides.retain(|_, expires_at| *expires_at > now);
1441 overrides
1442 .get(session_id)
1443 .map(|expires_at| *expires_at > now)
1444 .unwrap_or(false)
1445 }
1446
1447 async fn generate_final_narrative_without_tools(
1448 &self,
1449 session_id: &str,
1450 active_agent: &AgentDefinition,
1451 provider_hint: Option<&str>,
1452 model_id: Option<&str>,
1453 cancel: CancellationToken,
1454 tool_outputs: &[String],
1455 ) -> Option<String> {
1456 if cancel.is_cancelled() {
1457 return None;
1458 }
1459 let mut messages = load_chat_history(self.storage.clone(), session_id).await;
1460 let mut system_parts = vec![tandem_runtime_system_prompt(&self.host_runtime_context)];
1461 if let Some(system) = active_agent.system_prompt.as_ref() {
1462 system_parts.push(system.clone());
1463 }
1464 messages.insert(
1465 0,
1466 ChatMessage {
1467 role: "system".to_string(),
1468 content: system_parts.join("\n\n"),
1469 attachments: Vec::new(),
1470 },
1471 );
1472 messages.push(ChatMessage {
1473 role: "user".to_string(),
1474 content: format!(
1475 "Tool observations:\n{}\n\nProvide a direct final answer now. Do not call tools.",
1476 summarize_tool_outputs(tool_outputs)
1477 ),
1478 attachments: Vec::new(),
1479 });
1480 let stream = self
1481 .providers
1482 .stream_for_provider(provider_hint, model_id, messages, None, cancel.clone())
1483 .await
1484 .ok()?;
1485 tokio::pin!(stream);
1486 let mut completion = String::new();
1487 while let Some(chunk) = stream.next().await {
1488 if cancel.is_cancelled() {
1489 return None;
1490 }
1491 match chunk {
1492 Ok(StreamChunk::TextDelta(delta)) => {
1493 let delta = strip_model_control_markers(&delta);
1494 if !delta.trim().is_empty() {
1495 completion.push_str(&delta);
1496 }
1497 }
1498 Ok(StreamChunk::Done { .. }) => break,
1499 Ok(_) => {}
1500 Err(_) => return None,
1501 }
1502 }
1503 let completion = truncate_text(&strip_model_control_markers(&completion), 16_000);
1504 if completion.trim().is_empty() {
1505 None
1506 } else {
1507 Some(completion)
1508 }
1509 }
1510}
1511
1512fn resolve_model_route(
1513 request_model: Option<&ModelSpec>,
1514 session_model: Option<&ModelSpec>,
1515) -> Option<(String, String)> {
1516 fn normalize(spec: &ModelSpec) -> Option<(String, String)> {
1517 let provider_id = spec.provider_id.trim();
1518 let model_id = spec.model_id.trim();
1519 if provider_id.is_empty() || model_id.is_empty() {
1520 return None;
1521 }
1522 Some((provider_id.to_string(), model_id.to_string()))
1523 }
1524
1525 request_model
1526 .and_then(normalize)
1527 .or_else(|| session_model.and_then(normalize))
1528}
1529
1530fn strip_model_control_markers(input: &str) -> String {
1531 let mut cleaned = input.to_string();
1532 for marker in ["<|eom|>", "<|eot_id|>", "<|im_end|>", "<|end|>"] {
1533 if cleaned.contains(marker) {
1534 cleaned = cleaned.replace(marker, "");
1535 }
1536 }
1537 cleaned
1538}
1539
1540fn truncate_text(input: &str, max_len: usize) -> String {
1541 if input.len() <= max_len {
1542 return input.to_string();
1543 }
1544 let mut out = input[..max_len].to_string();
1545 out.push_str("...<truncated>");
1546 out
1547}
1548
1549fn provider_error_code(error_text: &str) -> &'static str {
1550 let lower = error_text.to_lowercase();
1551 if lower.contains("invalid_function_parameters")
1552 || lower.contains("array schema missing items")
1553 || lower.contains("tool schema")
1554 {
1555 return "TOOL_SCHEMA_INVALID";
1556 }
1557 if lower.contains("rate limit") || lower.contains("too many requests") || lower.contains("429")
1558 {
1559 return "RATE_LIMIT_EXCEEDED";
1560 }
1561 if lower.contains("context length")
1562 || lower.contains("max tokens")
1563 || lower.contains("token limit")
1564 {
1565 return "CONTEXT_LENGTH_EXCEEDED";
1566 }
1567 if lower.contains("unauthorized")
1568 || lower.contains("authentication")
1569 || lower.contains("401")
1570 || lower.contains("403")
1571 {
1572 return "AUTHENTICATION_ERROR";
1573 }
1574 if lower.contains("timeout") || lower.contains("timed out") {
1575 return "TIMEOUT";
1576 }
1577 if lower.contains("server error")
1578 || lower.contains("500")
1579 || lower.contains("502")
1580 || lower.contains("503")
1581 || lower.contains("504")
1582 {
1583 return "PROVIDER_SERVER_ERROR";
1584 }
1585 "PROVIDER_REQUEST_FAILED"
1586}
1587
1588fn normalize_tool_name(name: &str) -> String {
1589 let mut normalized = name.trim().to_ascii_lowercase().replace('-', "_");
1590 for prefix in [
1591 "default_api:",
1592 "default_api.",
1593 "functions.",
1594 "function.",
1595 "tools.",
1596 "tool.",
1597 "builtin:",
1598 "builtin.",
1599 ] {
1600 if let Some(rest) = normalized.strip_prefix(prefix) {
1601 let trimmed = rest.trim();
1602 if !trimmed.is_empty() {
1603 normalized = trimmed.to_string();
1604 break;
1605 }
1606 }
1607 }
1608 match normalized.as_str() {
1609 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1610 "run_command" | "shell" | "powershell" | "cmd" => "bash".to_string(),
1611 other => other.to_string(),
1612 }
1613}
1614
1615fn mcp_server_from_tool_name(tool_name: &str) -> Option<&str> {
1616 let mut parts = tool_name.split('.');
1617 let prefix = parts.next()?;
1618 if prefix != "mcp" {
1619 return None;
1620 }
1621 parts.next().filter(|server| !server.is_empty())
1622}
1623
1624fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1625 let Some(obj) = args.as_object() else {
1626 return Vec::new();
1627 };
1628 let keys: &[&str] = match tool {
1629 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1630 "glob" => &["pattern"],
1631 "lsp" => &["filePath", "path"],
1632 "bash" => &["cwd"],
1633 "apply_patch" => &[],
1634 _ => &["path", "cwd"],
1635 };
1636 keys.iter()
1637 .filter_map(|key| obj.get(*key))
1638 .filter_map(|value| value.as_str())
1639 .filter(|s| !s.trim().is_empty())
1640 .map(ToString::to_string)
1641 .collect()
1642}
1643
1644fn agent_can_use_tool(agent: &AgentDefinition, tool_name: &str) -> bool {
1645 let target = normalize_tool_name(tool_name);
1646 match agent.tools.as_ref() {
1647 None => true,
1648 Some(list) => list.iter().any(|t| normalize_tool_name(t) == target),
1649 }
1650}
1651
1652fn enforce_skill_scope(
1653 tool_name: &str,
1654 args: Value,
1655 equipped_skills: Option<&[String]>,
1656) -> Result<Value, String> {
1657 if normalize_tool_name(tool_name) != "skill" {
1658 return Ok(args);
1659 }
1660 let Some(configured) = equipped_skills else {
1661 return Ok(args);
1662 };
1663
1664 let mut allowed = configured
1665 .iter()
1666 .map(|s| s.trim().to_string())
1667 .filter(|s| !s.is_empty())
1668 .collect::<Vec<_>>();
1669 if allowed
1670 .iter()
1671 .any(|s| s == "*" || s.eq_ignore_ascii_case("all"))
1672 {
1673 return Ok(args);
1674 }
1675 allowed.sort();
1676 allowed.dedup();
1677 if allowed.is_empty() {
1678 return Err("No skills are equipped for this agent.".to_string());
1679 }
1680
1681 let requested = args
1682 .get("name")
1683 .and_then(|v| v.as_str())
1684 .map(|v| v.trim().to_string())
1685 .unwrap_or_default();
1686 if !requested.is_empty() && !allowed.iter().any(|s| s == &requested) {
1687 return Err(format!(
1688 "Skill '{}' is not equipped for this agent. Equipped skills: {}",
1689 requested,
1690 allowed.join(", ")
1691 ));
1692 }
1693
1694 let mut out = if let Some(obj) = args.as_object() {
1695 Value::Object(obj.clone())
1696 } else {
1697 json!({})
1698 };
1699 if let Some(obj) = out.as_object_mut() {
1700 obj.insert("allowed_skills".to_string(), json!(allowed));
1701 }
1702 Ok(out)
1703}
1704
1705fn is_read_only_tool(tool_name: &str) -> bool {
1706 matches!(
1707 normalize_tool_name(tool_name).as_str(),
1708 "glob"
1709 | "read"
1710 | "grep"
1711 | "search"
1712 | "codesearch"
1713 | "list"
1714 | "ls"
1715 | "lsp"
1716 | "websearch"
1717 | "webfetch"
1718 | "webfetch_html"
1719 )
1720}
1721
1722fn is_batch_wrapper_tool_name(name: &str) -> bool {
1723 matches!(
1724 normalize_tool_name(name).as_str(),
1725 "default_api" | "default" | "api" | "function" | "functions" | "tool" | "tools"
1726 )
1727}
1728
1729fn non_empty_string_at<'a>(obj: &'a Map<String, Value>, key: &str) -> Option<&'a str> {
1730 obj.get(key)
1731 .and_then(|v| v.as_str())
1732 .map(str::trim)
1733 .filter(|s| !s.is_empty())
1734}
1735
1736fn nested_non_empty_string_at<'a>(
1737 obj: &'a Map<String, Value>,
1738 parent: &str,
1739 key: &str,
1740) -> Option<&'a str> {
1741 obj.get(parent)
1742 .and_then(|v| v.as_object())
1743 .and_then(|nested| nested.get(key))
1744 .and_then(|v| v.as_str())
1745 .map(str::trim)
1746 .filter(|s| !s.is_empty())
1747}
1748
1749fn extract_batch_calls(args: &Value) -> Vec<(String, Value)> {
1750 let calls = args
1751 .get("tool_calls")
1752 .and_then(|v| v.as_array())
1753 .cloned()
1754 .unwrap_or_default();
1755 calls
1756 .into_iter()
1757 .filter_map(|call| {
1758 let obj = call.as_object()?;
1759 let tool_raw = non_empty_string_at(obj, "tool")
1760 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"))
1761 .or_else(|| nested_non_empty_string_at(obj, "function", "tool"))
1762 .or_else(|| nested_non_empty_string_at(obj, "function_call", "tool"))
1763 .or_else(|| nested_non_empty_string_at(obj, "call", "tool"));
1764 let name_raw = non_empty_string_at(obj, "name")
1765 .or_else(|| nested_non_empty_string_at(obj, "function", "name"))
1766 .or_else(|| nested_non_empty_string_at(obj, "function_call", "name"))
1767 .or_else(|| nested_non_empty_string_at(obj, "call", "name"))
1768 .or_else(|| nested_non_empty_string_at(obj, "tool", "name"));
1769 let effective = match (tool_raw, name_raw) {
1770 (Some(t), Some(n)) if is_batch_wrapper_tool_name(t) => n,
1771 (Some(t), _) => t,
1772 (None, Some(n)) => n,
1773 (None, None) => return None,
1774 };
1775 let normalized = normalize_tool_name(effective);
1776 let call_args = obj.get("args").cloned().unwrap_or_else(|| json!({}));
1777 Some((normalized, call_args))
1778 })
1779 .collect()
1780}
1781
1782fn is_read_only_batch_call(args: &Value) -> bool {
1783 let calls = extract_batch_calls(args);
1784 !calls.is_empty() && calls.iter().all(|(tool, _)| is_read_only_tool(tool))
1785}
1786
1787fn batch_tool_signature(args: &Value) -> Option<String> {
1788 let calls = extract_batch_calls(args);
1789 if calls.is_empty() {
1790 return None;
1791 }
1792 let parts = calls
1793 .into_iter()
1794 .map(|(tool, call_args)| tool_signature(&tool, &call_args))
1795 .collect::<Vec<_>>();
1796 Some(format!("batch:{}", parts.join("|")))
1797}
1798
1799fn is_non_productive_batch_output(output: &str) -> bool {
1800 let Ok(value) = serde_json::from_str::<Value>(output.trim()) else {
1801 return false;
1802 };
1803 let Some(items) = value.as_array() else {
1804 return false;
1805 };
1806 if items.is_empty() {
1807 return true;
1808 }
1809 items.iter().all(|item| {
1810 let text = item
1811 .get("output")
1812 .and_then(|v| v.as_str())
1813 .map(str::trim)
1814 .unwrap_or_default()
1815 .to_ascii_lowercase();
1816 text.is_empty()
1817 || text.starts_with("unknown tool:")
1818 || text.contains("call skipped")
1819 || text.contains("guard budget exceeded")
1820 })
1821}
1822
1823fn is_auth_required_tool_output(output: &str) -> bool {
1824 let lower = output.to_ascii_lowercase();
1825 (lower.contains("authorization required")
1826 || lower.contains("requires authorization")
1827 || lower.contains("authorization pending"))
1828 && (lower.contains("authorize here") || lower.contains("http"))
1829}
1830
1831#[derive(Debug, Clone)]
1832struct McpAuthRequiredMetadata {
1833 challenge_id: String,
1834 authorization_url: String,
1835 message: String,
1836 server: Option<String>,
1837 pending: bool,
1838 blocked: bool,
1839 retry_after_ms: Option<u64>,
1840}
1841
1842fn extract_mcp_auth_required_metadata(metadata: &Value) -> Option<McpAuthRequiredMetadata> {
1843 let auth = metadata.get("mcpAuth")?;
1844 if !auth
1845 .get("required")
1846 .and_then(|v| v.as_bool())
1847 .unwrap_or(false)
1848 {
1849 return None;
1850 }
1851 let authorization_url = auth
1852 .get("authorizationUrl")
1853 .and_then(|v| v.as_str())
1854 .map(str::trim)
1855 .filter(|v| !v.is_empty())?
1856 .to_string();
1857 let message = auth
1858 .get("message")
1859 .and_then(|v| v.as_str())
1860 .map(str::trim)
1861 .filter(|v| !v.is_empty())
1862 .unwrap_or("This tool requires authorization before it can run.")
1863 .to_string();
1864 let challenge_id = auth
1865 .get("challengeId")
1866 .and_then(|v| v.as_str())
1867 .map(str::trim)
1868 .filter(|v| !v.is_empty())
1869 .unwrap_or("unknown")
1870 .to_string();
1871 let server = metadata
1872 .get("server")
1873 .and_then(|v| v.as_str())
1874 .map(str::trim)
1875 .filter(|v| !v.is_empty())
1876 .map(ToString::to_string);
1877 let pending = auth
1878 .get("pending")
1879 .and_then(|v| v.as_bool())
1880 .unwrap_or(false);
1881 let blocked = auth
1882 .get("blocked")
1883 .and_then(|v| v.as_bool())
1884 .unwrap_or(false);
1885 let retry_after_ms = auth.get("retryAfterMs").and_then(|v| v.as_u64());
1886 Some(McpAuthRequiredMetadata {
1887 challenge_id,
1888 authorization_url,
1889 message,
1890 server,
1891 pending,
1892 blocked,
1893 retry_after_ms,
1894 })
1895}
1896
1897fn extract_mcp_auth_required_from_error_text(
1898 tool_name: &str,
1899 error_text: &str,
1900) -> Option<McpAuthRequiredMetadata> {
1901 let lower = error_text.to_ascii_lowercase();
1902 let auth_hint = lower.contains("authorization")
1903 || lower.contains("oauth")
1904 || lower.contains("invalid oauth token")
1905 || lower.contains("requires authorization");
1906 if !auth_hint {
1907 return None;
1908 }
1909 let authorization_url = find_first_url(error_text)?;
1910 let challenge_id = stable_hash(&format!("{tool_name}:{authorization_url}"));
1911 let server = tool_name
1912 .strip_prefix("mcp.")
1913 .and_then(|rest| rest.split('.').next())
1914 .filter(|s| !s.is_empty())
1915 .map(ToString::to_string);
1916 Some(McpAuthRequiredMetadata {
1917 challenge_id,
1918 authorization_url,
1919 message: "This integration requires authorization before this action can run.".to_string(),
1920 server,
1921 pending: false,
1922 blocked: false,
1923 retry_after_ms: None,
1924 })
1925}
1926
1927fn summarize_auth_pending_outputs(outputs: &[String]) -> Option<String> {
1928 if outputs.is_empty()
1929 || !outputs
1930 .iter()
1931 .all(|output| is_auth_required_tool_output(output))
1932 {
1933 return None;
1934 }
1935 let mut auth_lines = outputs
1936 .iter()
1937 .filter_map(|output| {
1938 let trimmed = output.trim();
1939 if trimmed.is_empty() {
1940 None
1941 } else {
1942 Some(trimmed.to_string())
1943 }
1944 })
1945 .collect::<Vec<_>>();
1946 auth_lines.sort();
1947 auth_lines.dedup();
1948 if auth_lines.is_empty() {
1949 return None;
1950 }
1951 Some(format!(
1952 "Authorization is required before I can continue with this action.\n\n{}",
1953 auth_lines.join("\n\n")
1954 ))
1955}
1956
1957fn summarize_guard_budget_outputs(outputs: &[String]) -> Option<String> {
1958 if outputs.is_empty()
1959 || !outputs
1960 .iter()
1961 .all(|output| is_guard_budget_tool_output(output))
1962 {
1963 return None;
1964 }
1965 let mut lines = outputs
1966 .iter()
1967 .filter_map(|output| {
1968 let trimmed = output.trim();
1969 if trimmed.is_empty() {
1970 None
1971 } else {
1972 Some(trimmed.to_string())
1973 }
1974 })
1975 .collect::<Vec<_>>();
1976 lines.sort();
1977 lines.dedup();
1978 if lines.is_empty() {
1979 return None;
1980 }
1981 Some(format!(
1982 "This run hit the per-run tool guard budget, so I paused tool execution to avoid runaway retries.\n\n{}\n\nSend a new message to start a fresh run.",
1983 lines.join("\n")
1984 ))
1985}
1986
1987fn find_first_url(text: &str) -> Option<String> {
1988 text.split_whitespace().find_map(|token| {
1989 if token.starts_with("https://") || token.starts_with("http://") {
1990 let cleaned = token.trim_end_matches(&[')', ']', '}', '"', '\'', ',', '.'][..]);
1991 if cleaned.len() > "https://".len() {
1992 return Some(cleaned.to_string());
1993 }
1994 }
1995 None
1996 })
1997}
1998
1999fn tool_budget_for(tool_name: &str) -> usize {
2000 let normalized = normalize_tool_name(tool_name);
2001 let (default_budget, env_key) = match normalized.as_str() {
2002 "glob" => (4usize, "TANDEM_TOOL_BUDGET_GLOB"),
2003 "read" => (8usize, "TANDEM_TOOL_BUDGET_READ"),
2004 "websearch" => (8usize, "TANDEM_TOOL_BUDGET_WEBSEARCH"),
2005 "batch" => (10usize, "TANDEM_TOOL_BUDGET_BATCH"),
2006 "grep" | "search" | "codesearch" => (6usize, "TANDEM_TOOL_BUDGET_SEARCH"),
2007 _ => (10usize, "TANDEM_TOOL_BUDGET_DEFAULT"),
2008 };
2009 std::env::var(env_key)
2010 .ok()
2011 .and_then(|v| v.trim().parse::<usize>().ok())
2012 .filter(|v| *v > 0)
2013 .unwrap_or(default_budget)
2014}
2015
2016fn is_guard_budget_tool_output(output: &str) -> bool {
2017 output
2018 .to_ascii_lowercase()
2019 .contains("per-run guard budget exceeded")
2020}
2021
2022fn is_sensitive_path_candidate(path: &Path) -> bool {
2023 let lowered = path.to_string_lossy().to_ascii_lowercase();
2024 if lowered.contains("/.ssh/")
2025 || lowered.ends_with("/.ssh")
2026 || lowered.contains("/.gnupg/")
2027 || lowered.ends_with("/.gnupg")
2028 {
2029 return true;
2030 }
2031 if lowered.contains("/.aws/credentials")
2032 || lowered.ends_with("/.npmrc")
2033 || lowered.ends_with("/.netrc")
2034 || lowered.ends_with("/.pypirc")
2035 {
2036 return true;
2037 }
2038 if lowered.contains("id_rsa")
2039 || lowered.contains("id_ed25519")
2040 || lowered.contains("id_ecdsa")
2041 || lowered.contains(".pem")
2042 || lowered.contains(".p12")
2043 || lowered.contains(".pfx")
2044 || lowered.contains(".key")
2045 {
2046 return true;
2047 }
2048 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
2049 let n = name.to_ascii_lowercase();
2050 if n == ".env" || n.starts_with(".env.") {
2051 return true;
2052 }
2053 }
2054 false
2055}
2056
2057fn shell_command_targets_sensitive_path(command: &str) -> bool {
2058 let lower = command.to_ascii_lowercase();
2059 let patterns = [
2060 ".env",
2061 ".ssh",
2062 ".gnupg",
2063 ".aws/credentials",
2064 "id_rsa",
2065 "id_ed25519",
2066 ".pem",
2067 ".p12",
2068 ".pfx",
2069 ".key",
2070 ];
2071 patterns.iter().any(|p| lower.contains(p))
2072}
2073
2074#[derive(Debug, Clone)]
2075struct NormalizedToolArgs {
2076 args: Value,
2077 args_source: String,
2078 args_integrity: String,
2079 query: Option<String>,
2080 missing_terminal: bool,
2081 missing_terminal_reason: Option<String>,
2082}
2083
2084fn normalize_tool_args(
2085 tool_name: &str,
2086 raw_args: Value,
2087 latest_user_text: &str,
2088 latest_assistant_context: &str,
2089) -> NormalizedToolArgs {
2090 let normalized_tool = normalize_tool_name(tool_name);
2091 let mut args = raw_args;
2092 let mut args_source = if args.is_string() {
2093 "provider_string".to_string()
2094 } else {
2095 "provider_json".to_string()
2096 };
2097 let mut args_integrity = "ok".to_string();
2098 let mut query = None;
2099 let mut missing_terminal = false;
2100 let mut missing_terminal_reason = None;
2101
2102 if normalized_tool == "websearch" {
2103 if let Some(found) = extract_websearch_query(&args) {
2104 query = Some(found);
2105 args = set_websearch_query_and_source(args, query.clone(), "tool_args");
2106 } else if let Some(inferred) = infer_websearch_query_from_text(latest_user_text) {
2107 args_source = "inferred_from_user".to_string();
2108 args_integrity = "recovered".to_string();
2109 query = Some(inferred);
2110 args = set_websearch_query_and_source(args, query.clone(), "inferred_from_user");
2111 } else if let Some(recovered) = infer_websearch_query_from_text(latest_assistant_context) {
2112 args_source = "recovered_from_context".to_string();
2113 args_integrity = "recovered".to_string();
2114 query = Some(recovered);
2115 args = set_websearch_query_and_source(args, query.clone(), "recovered_from_context");
2116 } else {
2117 args_source = "missing".to_string();
2118 args_integrity = "empty".to_string();
2119 missing_terminal = true;
2120 missing_terminal_reason = Some("WEBSEARCH_QUERY_MISSING".to_string());
2121 }
2122 } else if is_shell_tool_name(&normalized_tool) {
2123 if let Some(command) = extract_shell_command(&args) {
2124 args = set_shell_command(args, command);
2125 } else if let Some(inferred) = infer_shell_command_from_text(latest_assistant_context) {
2126 args_source = "inferred_from_context".to_string();
2127 args_integrity = "recovered".to_string();
2128 args = set_shell_command(args, inferred);
2129 } else if let Some(inferred) = infer_shell_command_from_text(latest_user_text) {
2130 args_source = "inferred_from_user".to_string();
2131 args_integrity = "recovered".to_string();
2132 args = set_shell_command(args, inferred);
2133 } else {
2134 args_source = "missing".to_string();
2135 args_integrity = "empty".to_string();
2136 missing_terminal = true;
2137 missing_terminal_reason = Some("BASH_COMMAND_MISSING".to_string());
2138 }
2139 } else if matches!(normalized_tool.as_str(), "read" | "write" | "edit") {
2140 if let Some(path) = extract_file_path_arg(&args) {
2141 args = set_file_path_arg(args, path);
2142 } else if let Some(inferred) = infer_file_path_from_text(latest_user_text) {
2143 args_source = "inferred_from_user".to_string();
2144 args_integrity = "recovered".to_string();
2145 args = set_file_path_arg(args, inferred);
2146 } else {
2147 args_source = "missing".to_string();
2148 args_integrity = "empty".to_string();
2149 missing_terminal = true;
2150 missing_terminal_reason = Some("FILE_PATH_MISSING".to_string());
2151 }
2152
2153 if !missing_terminal && normalized_tool == "write" {
2154 if let Some(content) = extract_write_content_arg(&args) {
2155 args = set_write_content_arg(args, content);
2156 } else {
2157 args_source = "missing".to_string();
2158 args_integrity = "empty".to_string();
2159 missing_terminal = true;
2160 missing_terminal_reason = Some("WRITE_CONTENT_MISSING".to_string());
2161 }
2162 }
2163 } else if matches!(normalized_tool.as_str(), "webfetch" | "webfetch_html") {
2164 if let Some(url) = extract_webfetch_url_arg(&args) {
2165 args = set_webfetch_url_arg(args, url);
2166 } else if let Some(inferred) = infer_url_from_text(latest_assistant_context) {
2167 args_source = "inferred_from_context".to_string();
2168 args_integrity = "recovered".to_string();
2169 args = set_webfetch_url_arg(args, inferred);
2170 } else if let Some(inferred) = infer_url_from_text(latest_user_text) {
2171 args_source = "inferred_from_user".to_string();
2172 args_integrity = "recovered".to_string();
2173 args = set_webfetch_url_arg(args, inferred);
2174 } else {
2175 args_source = "missing".to_string();
2176 args_integrity = "empty".to_string();
2177 missing_terminal = true;
2178 missing_terminal_reason = Some("WEBFETCH_URL_MISSING".to_string());
2179 }
2180 }
2181
2182 NormalizedToolArgs {
2183 args,
2184 args_source,
2185 args_integrity,
2186 query,
2187 missing_terminal,
2188 missing_terminal_reason,
2189 }
2190}
2191
2192fn is_shell_tool_name(tool_name: &str) -> bool {
2193 matches!(
2194 tool_name.trim().to_ascii_lowercase().as_str(),
2195 "bash" | "shell" | "powershell" | "cmd"
2196 )
2197}
2198
2199fn set_file_path_arg(args: Value, path: String) -> Value {
2200 let mut obj = args.as_object().cloned().unwrap_or_default();
2201 obj.insert("path".to_string(), Value::String(path));
2202 Value::Object(obj)
2203}
2204
2205fn set_write_content_arg(args: Value, content: String) -> Value {
2206 let mut obj = args.as_object().cloned().unwrap_or_default();
2207 obj.insert("content".to_string(), Value::String(content));
2208 Value::Object(obj)
2209}
2210
2211fn extract_file_path_arg(args: &Value) -> Option<String> {
2212 extract_file_path_arg_internal(args, 0)
2213}
2214
2215fn extract_write_content_arg(args: &Value) -> Option<String> {
2216 extract_write_content_arg_internal(args, 0)
2217}
2218
2219fn extract_file_path_arg_internal(args: &Value, depth: usize) -> Option<String> {
2220 if depth > 5 {
2221 return None;
2222 }
2223
2224 match args {
2225 Value::String(raw) => {
2226 let trimmed = raw.trim();
2227 if trimmed.is_empty() {
2228 return None;
2229 }
2230 if !(trimmed.starts_with('{') || trimmed.starts_with('[') || trimmed.starts_with('"')) {
2232 return sanitize_path_candidate(trimmed);
2233 }
2234 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
2235 return extract_file_path_arg_internal(&parsed, depth + 1);
2236 }
2237 sanitize_path_candidate(trimmed)
2238 }
2239 Value::Array(items) => items
2240 .iter()
2241 .find_map(|item| extract_file_path_arg_internal(item, depth + 1)),
2242 Value::Object(obj) => {
2243 for key in FILE_PATH_KEYS {
2244 if let Some(raw) = obj.get(key).and_then(|v| v.as_str()) {
2245 if let Some(path) = sanitize_path_candidate(raw) {
2246 return Some(path);
2247 }
2248 }
2249 }
2250 for container in NESTED_ARGS_KEYS {
2251 if let Some(nested) = obj.get(container) {
2252 if let Some(path) = extract_file_path_arg_internal(nested, depth + 1) {
2253 return Some(path);
2254 }
2255 }
2256 }
2257 None
2258 }
2259 _ => None,
2260 }
2261}
2262
2263fn extract_write_content_arg_internal(args: &Value, depth: usize) -> Option<String> {
2264 if depth > 5 {
2265 return None;
2266 }
2267
2268 match args {
2269 Value::String(raw) => {
2270 let trimmed = raw.trim();
2271 if trimmed.is_empty() {
2272 return None;
2273 }
2274 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
2275 return extract_write_content_arg_internal(&parsed, depth + 1);
2276 }
2277 if sanitize_path_candidate(trimmed).is_some()
2280 && !trimmed.contains('\n')
2281 && trimmed.split_whitespace().count() <= 3
2282 {
2283 return None;
2284 }
2285 Some(trimmed.to_string())
2286 }
2287 Value::Array(items) => items
2288 .iter()
2289 .find_map(|item| extract_write_content_arg_internal(item, depth + 1)),
2290 Value::Object(obj) => {
2291 for key in WRITE_CONTENT_KEYS {
2292 if let Some(value) = obj.get(key) {
2293 if let Some(raw) = value.as_str() {
2294 if !raw.is_empty() {
2295 return Some(raw.to_string());
2296 }
2297 } else if let Some(recovered) =
2298 extract_write_content_arg_internal(value, depth + 1)
2299 {
2300 return Some(recovered);
2301 }
2302 }
2303 }
2304 for container in NESTED_ARGS_KEYS {
2305 if let Some(nested) = obj.get(container) {
2306 if let Some(content) = extract_write_content_arg_internal(nested, depth + 1) {
2307 return Some(content);
2308 }
2309 }
2310 }
2311 None
2312 }
2313 _ => None,
2314 }
2315}
2316
2317fn set_shell_command(args: Value, command: String) -> Value {
2318 let mut obj = args.as_object().cloned().unwrap_or_default();
2319 obj.insert("command".to_string(), Value::String(command));
2320 Value::Object(obj)
2321}
2322
2323fn extract_shell_command(args: &Value) -> Option<String> {
2324 extract_shell_command_internal(args, 0)
2325}
2326
2327fn extract_shell_command_internal(args: &Value, depth: usize) -> Option<String> {
2328 if depth > 5 {
2329 return None;
2330 }
2331
2332 match args {
2333 Value::String(raw) => {
2334 let trimmed = raw.trim();
2335 if trimmed.is_empty() {
2336 return None;
2337 }
2338 if !(trimmed.starts_with('{') || trimmed.starts_with('[') || trimmed.starts_with('"')) {
2339 return sanitize_shell_command_candidate(trimmed);
2340 }
2341 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
2342 return extract_shell_command_internal(&parsed, depth + 1);
2343 }
2344 sanitize_shell_command_candidate(trimmed)
2345 }
2346 Value::Array(items) => items
2347 .iter()
2348 .find_map(|item| extract_shell_command_internal(item, depth + 1)),
2349 Value::Object(obj) => {
2350 for key in SHELL_COMMAND_KEYS {
2351 if let Some(raw) = obj.get(key).and_then(|v| v.as_str()) {
2352 if let Some(command) = sanitize_shell_command_candidate(raw) {
2353 return Some(command);
2354 }
2355 }
2356 }
2357 for container in NESTED_ARGS_KEYS {
2358 if let Some(nested) = obj.get(container) {
2359 if let Some(command) = extract_shell_command_internal(nested, depth + 1) {
2360 return Some(command);
2361 }
2362 }
2363 }
2364 None
2365 }
2366 _ => None,
2367 }
2368}
2369
2370fn infer_shell_command_from_text(text: &str) -> Option<String> {
2371 let trimmed = text.trim();
2372 if trimmed.is_empty() {
2373 return None;
2374 }
2375
2376 let mut in_tick = false;
2378 let mut tick_buf = String::new();
2379 for ch in trimmed.chars() {
2380 if ch == '`' {
2381 if in_tick {
2382 if let Some(candidate) = sanitize_shell_command_candidate(&tick_buf) {
2383 if looks_like_shell_command(&candidate) {
2384 return Some(candidate);
2385 }
2386 }
2387 tick_buf.clear();
2388 }
2389 in_tick = !in_tick;
2390 continue;
2391 }
2392 if in_tick {
2393 tick_buf.push(ch);
2394 }
2395 }
2396
2397 for line in trimmed.lines() {
2398 let line = line.trim();
2399 if line.is_empty() {
2400 continue;
2401 }
2402 let lower = line.to_ascii_lowercase();
2403 for prefix in [
2404 "run ",
2405 "execute ",
2406 "call ",
2407 "use bash ",
2408 "use shell ",
2409 "bash ",
2410 "shell ",
2411 "powershell ",
2412 "pwsh ",
2413 ] {
2414 if lower.starts_with(prefix) {
2415 let candidate = line[prefix.len()..].trim();
2416 if let Some(command) = sanitize_shell_command_candidate(candidate) {
2417 if looks_like_shell_command(&command) {
2418 return Some(command);
2419 }
2420 }
2421 }
2422 }
2423 }
2424
2425 None
2426}
2427
2428fn set_websearch_query_and_source(args: Value, query: Option<String>, query_source: &str) -> Value {
2429 let mut obj = args.as_object().cloned().unwrap_or_default();
2430 if let Some(q) = query {
2431 obj.insert("query".to_string(), Value::String(q));
2432 }
2433 obj.insert(
2434 "__query_source".to_string(),
2435 Value::String(query_source.to_string()),
2436 );
2437 Value::Object(obj)
2438}
2439
2440fn set_webfetch_url_arg(args: Value, url: String) -> Value {
2441 let mut obj = args.as_object().cloned().unwrap_or_default();
2442 obj.insert("url".to_string(), Value::String(url));
2443 Value::Object(obj)
2444}
2445
2446fn extract_webfetch_url_arg(args: &Value) -> Option<String> {
2447 const URL_KEYS: [&str; 5] = ["url", "uri", "link", "href", "target_url"];
2448 for key in URL_KEYS {
2449 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
2450 if let Some(url) = sanitize_url_candidate(value) {
2451 return Some(url);
2452 }
2453 }
2454 }
2455 for container in ["arguments", "args", "input", "params"] {
2456 if let Some(obj) = args.get(container) {
2457 for key in URL_KEYS {
2458 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
2459 if let Some(url) = sanitize_url_candidate(value) {
2460 return Some(url);
2461 }
2462 }
2463 }
2464 }
2465 }
2466 args.as_str().and_then(sanitize_url_candidate)
2467}
2468
2469fn extract_websearch_query(args: &Value) -> Option<String> {
2470 const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
2471 for key in QUERY_KEYS {
2472 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
2473 if let Some(query) = sanitize_websearch_query_candidate(value) {
2474 return Some(query);
2475 }
2476 }
2477 }
2478 for container in ["arguments", "args", "input", "params"] {
2479 if let Some(obj) = args.get(container) {
2480 for key in QUERY_KEYS {
2481 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
2482 if let Some(query) = sanitize_websearch_query_candidate(value) {
2483 return Some(query);
2484 }
2485 }
2486 }
2487 }
2488 }
2489 args.as_str().and_then(sanitize_websearch_query_candidate)
2490}
2491
2492fn sanitize_websearch_query_candidate(raw: &str) -> Option<String> {
2493 let trimmed = raw.trim();
2494 if trimmed.is_empty() {
2495 return None;
2496 }
2497
2498 let lower = trimmed.to_ascii_lowercase();
2499 if let Some(start) = lower.find("<arg_value>") {
2500 let value_start = start + "<arg_value>".len();
2501 let tail = &trimmed[value_start..];
2502 let value = if let Some(end) = tail.to_ascii_lowercase().find("</arg_value>") {
2503 &tail[..end]
2504 } else {
2505 tail
2506 };
2507 let cleaned = value.trim();
2508 if !cleaned.is_empty() {
2509 return Some(cleaned.to_string());
2510 }
2511 }
2512
2513 let without_wrappers = trimmed
2514 .replace("<arg_key>", " ")
2515 .replace("</arg_key>", " ")
2516 .replace("<arg_value>", " ")
2517 .replace("</arg_value>", " ");
2518 let collapsed = without_wrappers
2519 .split_whitespace()
2520 .collect::<Vec<_>>()
2521 .join(" ");
2522 if collapsed.is_empty() {
2523 return None;
2524 }
2525
2526 let collapsed_lower = collapsed.to_ascii_lowercase();
2527 if let Some(rest) = collapsed_lower.strip_prefix("websearch query ") {
2528 let offset = collapsed.len() - rest.len();
2529 let q = collapsed[offset..].trim();
2530 if !q.is_empty() {
2531 return Some(q.to_string());
2532 }
2533 }
2534 if let Some(rest) = collapsed_lower.strip_prefix("query ") {
2535 let offset = collapsed.len() - rest.len();
2536 let q = collapsed[offset..].trim();
2537 if !q.is_empty() {
2538 return Some(q.to_string());
2539 }
2540 }
2541
2542 Some(collapsed)
2543}
2544
2545fn infer_websearch_query_from_text(text: &str) -> Option<String> {
2546 let trimmed = text.trim();
2547 if trimmed.is_empty() {
2548 return None;
2549 }
2550
2551 let lower = trimmed.to_lowercase();
2552 const PREFIXES: [&str; 11] = [
2553 "web search",
2554 "websearch",
2555 "search web for",
2556 "search web",
2557 "search for",
2558 "search",
2559 "look up",
2560 "lookup",
2561 "find",
2562 "web lookup",
2563 "query",
2564 ];
2565
2566 let mut candidate = trimmed;
2567 for prefix in PREFIXES {
2568 if lower.starts_with(prefix) && lower.len() >= prefix.len() {
2569 let remainder = trimmed[prefix.len()..]
2570 .trim_start_matches(|c: char| c.is_whitespace() || c == ':' || c == '-');
2571 candidate = remainder;
2572 break;
2573 }
2574 }
2575
2576 let normalized = candidate
2577 .trim()
2578 .trim_matches(|c: char| c == '"' || c == '\'' || c.is_whitespace())
2579 .trim_matches(|c: char| matches!(c, '.' | ',' | '!' | '?'))
2580 .trim()
2581 .to_string();
2582
2583 if normalized.split_whitespace().count() < 2 {
2584 return None;
2585 }
2586 Some(normalized)
2587}
2588
2589fn infer_file_path_from_text(text: &str) -> Option<String> {
2590 let trimmed = text.trim();
2591 if trimmed.is_empty() {
2592 return None;
2593 }
2594
2595 let mut candidates: Vec<String> = Vec::new();
2596
2597 let mut in_tick = false;
2599 let mut tick_buf = String::new();
2600 for ch in trimmed.chars() {
2601 if ch == '`' {
2602 if in_tick {
2603 let cand = sanitize_path_candidate(&tick_buf);
2604 if let Some(path) = cand {
2605 candidates.push(path);
2606 }
2607 tick_buf.clear();
2608 }
2609 in_tick = !in_tick;
2610 continue;
2611 }
2612 if in_tick {
2613 tick_buf.push(ch);
2614 }
2615 }
2616
2617 for raw in trimmed.split_whitespace() {
2619 if let Some(path) = sanitize_path_candidate(raw) {
2620 candidates.push(path);
2621 }
2622 }
2623
2624 let mut deduped = Vec::new();
2625 let mut seen = HashSet::new();
2626 for candidate in candidates {
2627 if seen.insert(candidate.clone()) {
2628 deduped.push(candidate);
2629 }
2630 }
2631
2632 deduped.into_iter().next()
2633}
2634
2635fn infer_url_from_text(text: &str) -> Option<String> {
2636 let trimmed = text.trim();
2637 if trimmed.is_empty() {
2638 return None;
2639 }
2640
2641 let mut candidates: Vec<String> = Vec::new();
2642
2643 let mut in_tick = false;
2645 let mut tick_buf = String::new();
2646 for ch in trimmed.chars() {
2647 if ch == '`' {
2648 if in_tick {
2649 if let Some(url) = sanitize_url_candidate(&tick_buf) {
2650 candidates.push(url);
2651 }
2652 tick_buf.clear();
2653 }
2654 in_tick = !in_tick;
2655 continue;
2656 }
2657 if in_tick {
2658 tick_buf.push(ch);
2659 }
2660 }
2661
2662 for raw in trimmed.split_whitespace() {
2664 if let Some(url) = sanitize_url_candidate(raw) {
2665 candidates.push(url);
2666 }
2667 }
2668
2669 let mut seen = HashSet::new();
2670 candidates
2671 .into_iter()
2672 .find(|candidate| seen.insert(candidate.clone()))
2673}
2674
2675fn sanitize_url_candidate(raw: &str) -> Option<String> {
2676 let token = raw
2677 .trim()
2678 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | '*' | '|'))
2679 .trim_start_matches(['(', '[', '{', '<'])
2680 .trim_end_matches([',', ';', ':', ')', ']', '}', '>'])
2681 .trim_end_matches('.')
2682 .trim();
2683
2684 if token.is_empty() {
2685 return None;
2686 }
2687 let lower = token.to_ascii_lowercase();
2688 if !(lower.starts_with("http://") || lower.starts_with("https://")) {
2689 return None;
2690 }
2691 Some(token.to_string())
2692}
2693
2694fn sanitize_path_candidate(raw: &str) -> Option<String> {
2695 let token = raw
2696 .trim()
2697 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | '*' | '|'))
2698 .trim_start_matches(['(', '[', '{', '<'])
2699 .trim_end_matches([',', ';', ':', ')', ']', '}', '>'])
2700 .trim_end_matches('.')
2701 .trim();
2702
2703 if token.is_empty() {
2704 return None;
2705 }
2706 let lower = token.to_ascii_lowercase();
2707 if lower.starts_with("http://") || lower.starts_with("https://") {
2708 return None;
2709 }
2710 if is_malformed_tool_path_token(token) {
2711 return None;
2712 }
2713 if is_root_only_path_token(token) {
2714 return None;
2715 }
2716 if is_placeholder_path_token(token) {
2717 return None;
2718 }
2719
2720 let looks_like_path = token.contains('/') || token.contains('\\');
2721 let has_file_ext = [
2722 ".md", ".txt", ".json", ".yaml", ".yml", ".toml", ".rs", ".ts", ".tsx", ".js", ".jsx",
2723 ".py", ".go", ".java", ".cpp", ".c", ".h", ".pdf", ".docx", ".pptx", ".xlsx", ".rtf",
2724 ]
2725 .iter()
2726 .any(|ext| lower.ends_with(ext));
2727
2728 if !looks_like_path && !has_file_ext {
2729 return None;
2730 }
2731
2732 Some(token.to_string())
2733}
2734
2735fn is_placeholder_path_token(token: &str) -> bool {
2736 let lowered = token.trim().to_ascii_lowercase();
2737 if lowered.is_empty() {
2738 return true;
2739 }
2740 matches!(
2741 lowered.as_str(),
2742 "files/directories"
2743 | "file/directory"
2744 | "relative/or/absolute/path"
2745 | "path/to/file"
2746 | "path/to/your/file"
2747 | "tool/policy"
2748 | "tools/policy"
2749 | "the expected artifact file"
2750 | "workspace/file"
2751 )
2752}
2753
2754fn is_malformed_tool_path_token(token: &str) -> bool {
2755 let lower = token.to_ascii_lowercase();
2756 if lower.contains("<tool_call")
2758 || lower.contains("</tool_call")
2759 || lower.contains("<function=")
2760 || lower.contains("<parameter=")
2761 || lower.contains("</function>")
2762 || lower.contains("</parameter>")
2763 {
2764 return true;
2765 }
2766 if token.contains('\n') || token.contains('\r') {
2768 return true;
2769 }
2770 if token.contains('*') || token.contains('?') {
2772 return true;
2773 }
2774 false
2775}
2776
2777fn is_root_only_path_token(token: &str) -> bool {
2778 let trimmed = token.trim();
2779 if trimmed.is_empty() {
2780 return true;
2781 }
2782 if matches!(trimmed, "/" | "\\" | "." | ".." | "~") {
2783 return true;
2784 }
2785 let bytes = trimmed.as_bytes();
2787 if bytes.len() == 2 && bytes[1] == b':' && (bytes[0] as char).is_ascii_alphabetic() {
2788 return true;
2789 }
2790 if bytes.len() == 3
2791 && bytes[1] == b':'
2792 && (bytes[0] as char).is_ascii_alphabetic()
2793 && (bytes[2] == b'\\' || bytes[2] == b'/')
2794 {
2795 return true;
2796 }
2797 false
2798}
2799
2800fn sanitize_shell_command_candidate(raw: &str) -> Option<String> {
2801 let token = raw
2802 .trim()
2803 .trim_matches(|c: char| matches!(c, '`' | '"' | '\'' | ',' | ';'))
2804 .trim();
2805 if token.is_empty() {
2806 return None;
2807 }
2808 Some(token.to_string())
2809}
2810
2811fn looks_like_shell_command(candidate: &str) -> bool {
2812 let lower = candidate.to_ascii_lowercase();
2813 if lower.is_empty() {
2814 return false;
2815 }
2816 let first = lower.split_whitespace().next().unwrap_or_default();
2817 let common = [
2818 "rg",
2819 "git",
2820 "cargo",
2821 "pnpm",
2822 "npm",
2823 "node",
2824 "python",
2825 "pytest",
2826 "pwsh",
2827 "powershell",
2828 "cmd",
2829 "dir",
2830 "ls",
2831 "cat",
2832 "type",
2833 "echo",
2834 "cd",
2835 "mkdir",
2836 "cp",
2837 "copy",
2838 "move",
2839 "del",
2840 "rm",
2841 ];
2842 common.contains(&first)
2843 || first.starts_with("get-")
2844 || first.starts_with("./")
2845 || first.starts_with(".\\")
2846 || lower.contains(" | ")
2847 || lower.contains(" && ")
2848 || lower.contains(" ; ")
2849}
2850
2851const FILE_PATH_KEYS: [&str; 10] = [
2852 "path",
2853 "file_path",
2854 "filePath",
2855 "filepath",
2856 "filename",
2857 "file",
2858 "target",
2859 "targetFile",
2860 "absolutePath",
2861 "uri",
2862];
2863
2864const SHELL_COMMAND_KEYS: [&str; 4] = ["command", "cmd", "script", "line"];
2865
2866const WRITE_CONTENT_KEYS: [&str; 8] = [
2867 "content",
2868 "text",
2869 "body",
2870 "value",
2871 "markdown",
2872 "document",
2873 "output",
2874 "file_content",
2875];
2876
2877const NESTED_ARGS_KEYS: [&str; 10] = [
2878 "arguments",
2879 "args",
2880 "input",
2881 "params",
2882 "payload",
2883 "data",
2884 "tool_input",
2885 "toolInput",
2886 "tool_args",
2887 "toolArgs",
2888];
2889
2890fn tool_signature(tool_name: &str, args: &Value) -> String {
2891 let normalized = normalize_tool_name(tool_name);
2892 if normalized == "websearch" {
2893 let query = extract_websearch_query(args)
2894 .unwrap_or_default()
2895 .to_lowercase();
2896 let limit = args
2897 .get("limit")
2898 .or_else(|| args.get("numResults"))
2899 .or_else(|| args.get("num_results"))
2900 .and_then(|v| v.as_u64())
2901 .unwrap_or(8);
2902 let domains = args
2903 .get("domains")
2904 .or_else(|| args.get("domain"))
2905 .map(|v| v.to_string())
2906 .unwrap_or_default();
2907 let recency = args.get("recency").and_then(|v| v.as_u64()).unwrap_or(0);
2908 return format!("websearch:q={query}|limit={limit}|domains={domains}|recency={recency}");
2909 }
2910 format!("{}:{}", normalized, args)
2911}
2912
2913fn stable_hash(input: &str) -> String {
2914 let mut hasher = DefaultHasher::new();
2915 input.hash(&mut hasher);
2916 format!("{:016x}", hasher.finish())
2917}
2918
2919fn summarize_tool_outputs(outputs: &[String]) -> String {
2920 outputs
2921 .iter()
2922 .take(6)
2923 .map(|output| truncate_text(output, 600))
2924 .collect::<Vec<_>>()
2925 .join("\n\n")
2926}
2927
2928fn is_os_mismatch_tool_output(output: &str) -> bool {
2929 let lower = output.to_ascii_lowercase();
2930 lower.contains("os error 3")
2931 || lower.contains("system cannot find the path specified")
2932 || lower.contains("command not found")
2933 || lower.contains("is not recognized as an internal or external command")
2934 || lower.contains("shell command blocked on windows")
2935}
2936
2937fn tandem_runtime_system_prompt(host: &HostRuntimeContext) -> String {
2938 let mut sections = Vec::new();
2939 if os_aware_prompts_enabled() {
2940 sections.push(format!(
2941 "[Execution Environment]\nHost OS: {}\nShell: {}\nPath style: {}\nArchitecture: {}",
2942 host_os_label(host.os),
2943 shell_family_label(host.shell_family),
2944 path_style_label(host.path_style),
2945 host.arch
2946 ));
2947 }
2948 sections.push(
2949 "You are operating inside Tandem (Desktop/TUI) as an engine-backed coding assistant.
2950Use tool calls to inspect and modify the workspace when needed instead of asking the user
2951to manually run basic discovery steps. Permission prompts may occur for some tools; if
2952a tool is denied or blocked, explain what was blocked and suggest a concrete next step."
2953 .to_string(),
2954 );
2955 if host.os == HostOs::Windows {
2956 sections.push(
2957 "Windows guidance: prefer cross-platform tools (`glob`, `grep`, `read`, `write`, `edit`) and PowerShell-native commands.
2958Avoid Unix-only shell syntax (`ls -la`, `find ... -type f`, `cat` pipelines) unless translated.
2959If a shell command fails with a path/shell mismatch, immediately switch to cross-platform tools (`read`, `glob`, `grep`)."
2960 .to_string(),
2961 );
2962 } else {
2963 sections.push(
2964 "POSIX guidance: standard shell commands are available.
2965Use cross-platform tools (`glob`, `grep`, `read`) when they are simpler and safer for codebase exploration."
2966 .to_string(),
2967 );
2968 }
2969 sections.join("\n\n")
2970}
2971
2972fn os_aware_prompts_enabled() -> bool {
2973 std::env::var("TANDEM_OS_AWARE_PROMPTS")
2974 .ok()
2975 .map(|v| {
2976 let normalized = v.trim().to_ascii_lowercase();
2977 !(normalized == "0" || normalized == "false" || normalized == "off")
2978 })
2979 .unwrap_or(true)
2980}
2981
2982fn host_os_label(os: HostOs) -> &'static str {
2983 match os {
2984 HostOs::Windows => "windows",
2985 HostOs::Linux => "linux",
2986 HostOs::Macos => "macos",
2987 }
2988}
2989
2990fn shell_family_label(shell: ShellFamily) -> &'static str {
2991 match shell {
2992 ShellFamily::Powershell => "powershell",
2993 ShellFamily::Posix => "posix",
2994 }
2995}
2996
2997fn path_style_label(path_style: PathStyle) -> &'static str {
2998 match path_style {
2999 PathStyle::Windows => "windows",
3000 PathStyle::Posix => "posix",
3001 }
3002}
3003
3004fn should_force_workspace_probe(user_text: &str, completion: &str) -> bool {
3005 let user = user_text.to_lowercase();
3006 let reply = completion.to_lowercase();
3007
3008 let asked_for_project_context = [
3009 "what is this project",
3010 "what's this project",
3011 "what project is this",
3012 "explain this project",
3013 "analyze this project",
3014 "inspect this project",
3015 "look at the project",
3016 "summarize this project",
3017 "show me this project",
3018 "what files are in",
3019 "show files",
3020 "list files",
3021 "read files",
3022 "browse files",
3023 "use glob",
3024 "run glob",
3025 ]
3026 .iter()
3027 .any(|needle| user.contains(needle));
3028
3029 if !asked_for_project_context {
3030 return false;
3031 }
3032
3033 let assistant_claimed_no_access = [
3034 "can't inspect",
3035 "cannot inspect",
3036 "unable to inspect",
3037 "unable to directly inspect",
3038 "can't access",
3039 "cannot access",
3040 "unable to access",
3041 "can't read files",
3042 "cannot read files",
3043 "unable to read files",
3044 "tool restriction",
3045 "tool restrictions",
3046 "don't have visibility",
3047 "no visibility",
3048 "haven't been able to inspect",
3049 "i don't know what this project is",
3050 "need your help to",
3051 "sandbox",
3052 "restriction",
3053 "system restriction",
3054 "permissions restrictions",
3055 ]
3056 .iter()
3057 .any(|needle| reply.contains(needle));
3058
3059 asked_for_project_context && assistant_claimed_no_access
3062}
3063
3064fn parse_tool_invocation(input: &str) -> Option<(String, serde_json::Value)> {
3065 let raw = input.trim();
3066 if !raw.starts_with("/tool ") {
3067 return None;
3068 }
3069 let rest = raw.trim_start_matches("/tool ").trim();
3070 let mut split = rest.splitn(2, ' ');
3071 let tool = normalize_tool_name(split.next()?.trim());
3072 let args = split
3073 .next()
3074 .and_then(|v| serde_json::from_str::<serde_json::Value>(v).ok())
3075 .unwrap_or_else(|| json!({}));
3076 Some((tool, args))
3077}
3078
3079fn parse_tool_invocations_from_response(input: &str) -> Vec<(String, serde_json::Value)> {
3080 let trimmed = input.trim();
3081 if trimmed.is_empty() {
3082 return Vec::new();
3083 }
3084
3085 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
3086 if let Some(found) = extract_tool_call_from_value(&parsed) {
3087 return vec![found];
3088 }
3089 }
3090
3091 if let Some(block) = extract_first_json_object(trimmed) {
3092 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&block) {
3093 if let Some(found) = extract_tool_call_from_value(&parsed) {
3094 return vec![found];
3095 }
3096 }
3097 }
3098
3099 parse_function_style_tool_calls(trimmed)
3100}
3101
3102#[cfg(test)]
3103fn parse_tool_invocation_from_response(input: &str) -> Option<(String, serde_json::Value)> {
3104 parse_tool_invocations_from_response(input)
3105 .into_iter()
3106 .next()
3107}
3108
3109fn parse_function_style_tool_calls(input: &str) -> Vec<(String, Value)> {
3110 let mut calls = Vec::new();
3111 let lower = input.to_lowercase();
3112 let names = [
3113 "todo_write",
3114 "todowrite",
3115 "update_todo_list",
3116 "update_todos",
3117 ];
3118 let mut cursor = 0usize;
3119
3120 while cursor < lower.len() {
3121 let mut best: Option<(usize, &str)> = None;
3122 for name in names {
3123 let needle = format!("{name}(");
3124 if let Some(rel_idx) = lower[cursor..].find(&needle) {
3125 let idx = cursor + rel_idx;
3126 if best.as_ref().is_none_or(|(best_idx, _)| idx < *best_idx) {
3127 best = Some((idx, name));
3128 }
3129 }
3130 }
3131
3132 let Some((tool_start, tool_name)) = best else {
3133 break;
3134 };
3135
3136 let open_paren = tool_start + tool_name.len();
3137 if let Some(close_paren) = find_matching_paren(input, open_paren) {
3138 if let Some(args_text) = input.get(open_paren + 1..close_paren) {
3139 let args = parse_function_style_args(args_text.trim());
3140 calls.push((normalize_tool_name(tool_name), Value::Object(args)));
3141 }
3142 cursor = close_paren.saturating_add(1);
3143 } else {
3144 cursor = tool_start.saturating_add(tool_name.len());
3145 }
3146 }
3147
3148 calls
3149}
3150
3151fn find_matching_paren(input: &str, open_paren: usize) -> Option<usize> {
3152 if input.as_bytes().get(open_paren).copied()? != b'(' {
3153 return None;
3154 }
3155
3156 let mut depth = 0usize;
3157 let mut in_single = false;
3158 let mut in_double = false;
3159 let mut escaped = false;
3160
3161 for (offset, ch) in input.get(open_paren..)?.char_indices() {
3162 if escaped {
3163 escaped = false;
3164 continue;
3165 }
3166 if ch == '\\' && (in_single || in_double) {
3167 escaped = true;
3168 continue;
3169 }
3170 if ch == '\'' && !in_double {
3171 in_single = !in_single;
3172 continue;
3173 }
3174 if ch == '"' && !in_single {
3175 in_double = !in_double;
3176 continue;
3177 }
3178 if in_single || in_double {
3179 continue;
3180 }
3181
3182 match ch {
3183 '(' => depth += 1,
3184 ')' => {
3185 depth = depth.saturating_sub(1);
3186 if depth == 0 {
3187 return Some(open_paren + offset);
3188 }
3189 }
3190 _ => {}
3191 }
3192 }
3193
3194 None
3195}
3196
3197fn parse_function_style_args(input: &str) -> Map<String, Value> {
3198 let mut args = Map::new();
3199 if input.trim().is_empty() {
3200 return args;
3201 }
3202
3203 let mut parts = Vec::<String>::new();
3204 let mut current = String::new();
3205 let mut in_single = false;
3206 let mut in_double = false;
3207 let mut escaped = false;
3208 let mut depth_paren = 0usize;
3209 let mut depth_bracket = 0usize;
3210 let mut depth_brace = 0usize;
3211
3212 for ch in input.chars() {
3213 if escaped {
3214 current.push(ch);
3215 escaped = false;
3216 continue;
3217 }
3218 if ch == '\\' && (in_single || in_double) {
3219 current.push(ch);
3220 escaped = true;
3221 continue;
3222 }
3223 if ch == '\'' && !in_double {
3224 in_single = !in_single;
3225 current.push(ch);
3226 continue;
3227 }
3228 if ch == '"' && !in_single {
3229 in_double = !in_double;
3230 current.push(ch);
3231 continue;
3232 }
3233 if in_single || in_double {
3234 current.push(ch);
3235 continue;
3236 }
3237
3238 match ch {
3239 '(' => depth_paren += 1,
3240 ')' => depth_paren = depth_paren.saturating_sub(1),
3241 '[' => depth_bracket += 1,
3242 ']' => depth_bracket = depth_bracket.saturating_sub(1),
3243 '{' => depth_brace += 1,
3244 '}' => depth_brace = depth_brace.saturating_sub(1),
3245 ',' if depth_paren == 0 && depth_bracket == 0 && depth_brace == 0 => {
3246 let part = current.trim();
3247 if !part.is_empty() {
3248 parts.push(part.to_string());
3249 }
3250 current.clear();
3251 continue;
3252 }
3253 _ => {}
3254 }
3255 current.push(ch);
3256 }
3257 let tail = current.trim();
3258 if !tail.is_empty() {
3259 parts.push(tail.to_string());
3260 }
3261
3262 for part in parts {
3263 let Some((raw_key, raw_value)) = part
3264 .split_once('=')
3265 .or_else(|| part.split_once(':'))
3266 .map(|(k, v)| (k.trim(), v.trim()))
3267 else {
3268 continue;
3269 };
3270 let key = raw_key.trim_matches(|c| c == '"' || c == '\'' || c == '`');
3271 if key.is_empty() {
3272 continue;
3273 }
3274 let value = parse_scalar_like_value(raw_value);
3275 args.insert(key.to_string(), value);
3276 }
3277
3278 args
3279}
3280
3281fn parse_scalar_like_value(raw: &str) -> Value {
3282 let trimmed = raw.trim();
3283 if trimmed.is_empty() {
3284 return Value::Null;
3285 }
3286
3287 if (trimmed.starts_with('"') && trimmed.ends_with('"'))
3288 || (trimmed.starts_with('\'') && trimmed.ends_with('\''))
3289 {
3290 return Value::String(trimmed[1..trimmed.len().saturating_sub(1)].to_string());
3291 }
3292
3293 if trimmed.eq_ignore_ascii_case("true") {
3294 return Value::Bool(true);
3295 }
3296 if trimmed.eq_ignore_ascii_case("false") {
3297 return Value::Bool(false);
3298 }
3299 if trimmed.eq_ignore_ascii_case("null") {
3300 return Value::Null;
3301 }
3302
3303 if let Ok(v) = serde_json::from_str::<Value>(trimmed) {
3304 return v;
3305 }
3306 if let Ok(v) = trimmed.parse::<i64>() {
3307 return Value::Number(Number::from(v));
3308 }
3309 if let Ok(v) = trimmed.parse::<f64>() {
3310 if let Some(n) = Number::from_f64(v) {
3311 return Value::Number(n);
3312 }
3313 }
3314
3315 Value::String(trimmed.to_string())
3316}
3317
3318fn normalize_todo_write_args(args: Value, completion: &str) -> Value {
3319 if is_todo_status_update_args(&args) {
3320 return args;
3321 }
3322
3323 let mut obj = match args {
3324 Value::Object(map) => map,
3325 Value::Array(items) => {
3326 return json!({ "todos": normalize_todo_arg_items(items) });
3327 }
3328 Value::String(text) => {
3329 let derived = extract_todo_candidates_from_text(&text);
3330 if !derived.is_empty() {
3331 return json!({ "todos": derived });
3332 }
3333 return json!({});
3334 }
3335 _ => return json!({}),
3336 };
3337
3338 if obj
3339 .get("todos")
3340 .and_then(|v| v.as_array())
3341 .map(|arr| !arr.is_empty())
3342 .unwrap_or(false)
3343 {
3344 return Value::Object(obj);
3345 }
3346
3347 for alias in ["tasks", "items", "list", "checklist"] {
3348 if let Some(items) = obj.get(alias).and_then(|v| v.as_array()) {
3349 let normalized = normalize_todo_arg_items(items.clone());
3350 if !normalized.is_empty() {
3351 obj.insert("todos".to_string(), Value::Array(normalized));
3352 return Value::Object(obj);
3353 }
3354 }
3355 }
3356
3357 let derived = extract_todo_candidates_from_text(completion);
3358 if !derived.is_empty() {
3359 obj.insert("todos".to_string(), Value::Array(derived));
3360 }
3361 Value::Object(obj)
3362}
3363
3364fn normalize_todo_arg_items(items: Vec<Value>) -> Vec<Value> {
3365 items
3366 .into_iter()
3367 .filter_map(|item| match item {
3368 Value::String(text) => {
3369 let content = text.trim();
3370 if content.is_empty() {
3371 None
3372 } else {
3373 Some(json!({"content": content}))
3374 }
3375 }
3376 Value::Object(mut obj) => {
3377 if !obj.contains_key("content") {
3378 if let Some(text) = obj.get("text").cloned() {
3379 obj.insert("content".to_string(), text);
3380 } else if let Some(title) = obj.get("title").cloned() {
3381 obj.insert("content".to_string(), title);
3382 } else if let Some(name) = obj.get("name").cloned() {
3383 obj.insert("content".to_string(), name);
3384 }
3385 }
3386 let content = obj
3387 .get("content")
3388 .and_then(|v| v.as_str())
3389 .map(str::trim)
3390 .unwrap_or("");
3391 if content.is_empty() {
3392 None
3393 } else {
3394 Some(Value::Object(obj))
3395 }
3396 }
3397 _ => None,
3398 })
3399 .collect()
3400}
3401
3402fn is_todo_status_update_args(args: &Value) -> bool {
3403 let Some(obj) = args.as_object() else {
3404 return false;
3405 };
3406 let has_status = obj
3407 .get("status")
3408 .and_then(|v| v.as_str())
3409 .map(|s| !s.trim().is_empty())
3410 .unwrap_or(false);
3411 let has_target =
3412 obj.get("task_id").is_some() || obj.get("todo_id").is_some() || obj.get("id").is_some();
3413 has_status && has_target
3414}
3415
3416fn is_empty_todo_write_args(args: &Value) -> bool {
3417 if is_todo_status_update_args(args) {
3418 return false;
3419 }
3420 let Some(obj) = args.as_object() else {
3421 return true;
3422 };
3423 !obj.get("todos")
3424 .and_then(|v| v.as_array())
3425 .map(|arr| !arr.is_empty())
3426 .unwrap_or(false)
3427}
3428
3429fn parse_streamed_tool_args(tool_name: &str, raw_args: &str) -> Value {
3430 let trimmed = raw_args.trim();
3431 if trimmed.is_empty() {
3432 return json!({});
3433 }
3434
3435 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
3436 return normalize_streamed_tool_args(tool_name, parsed, trimmed);
3437 }
3438
3439 let kv_args = parse_function_style_args(trimmed);
3442 if !kv_args.is_empty() {
3443 return normalize_streamed_tool_args(tool_name, Value::Object(kv_args), trimmed);
3444 }
3445
3446 if normalize_tool_name(tool_name) == "websearch" {
3447 if let Some(query) = sanitize_websearch_query_candidate(trimmed) {
3448 return json!({ "query": query });
3449 }
3450 return json!({});
3451 }
3452
3453 json!({})
3454}
3455
3456fn normalize_streamed_tool_args(tool_name: &str, parsed: Value, raw: &str) -> Value {
3457 let normalized_tool = normalize_tool_name(tool_name);
3458 if normalized_tool != "websearch" {
3459 return parsed;
3460 }
3461
3462 match parsed {
3463 Value::Object(mut obj) => {
3464 if !has_websearch_query(&obj) && !raw.trim().is_empty() {
3465 if let Some(query) = sanitize_websearch_query_candidate(raw) {
3466 obj.insert("query".to_string(), Value::String(query));
3467 }
3468 }
3469 Value::Object(obj)
3470 }
3471 Value::String(s) => match sanitize_websearch_query_candidate(&s) {
3472 Some(query) => json!({ "query": query }),
3473 None => json!({}),
3474 },
3475 other => other,
3476 }
3477}
3478
3479fn has_websearch_query(obj: &Map<String, Value>) -> bool {
3480 const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
3481 QUERY_KEYS.iter().any(|key| {
3482 obj.get(*key)
3483 .and_then(|v| v.as_str())
3484 .map(|s| !s.trim().is_empty())
3485 .unwrap_or(false)
3486 })
3487}
3488
3489fn extract_tool_call_from_value(value: &Value) -> Option<(String, Value)> {
3490 if let Some(obj) = value.as_object() {
3491 if let Some(tool) = obj.get("tool").and_then(|v| v.as_str()) {
3492 return Some((
3493 normalize_tool_name(tool),
3494 obj.get("args").cloned().unwrap_or_else(|| json!({})),
3495 ));
3496 }
3497
3498 if let Some(tool) = obj.get("name").and_then(|v| v.as_str()) {
3499 let args = obj
3500 .get("args")
3501 .cloned()
3502 .or_else(|| obj.get("arguments").cloned())
3503 .unwrap_or_else(|| json!({}));
3504 let normalized_tool = normalize_tool_name(tool);
3505 let args = if let Some(raw) = args.as_str() {
3506 parse_streamed_tool_args(&normalized_tool, raw)
3507 } else {
3508 args
3509 };
3510 return Some((normalized_tool, args));
3511 }
3512
3513 for key in [
3514 "tool_call",
3515 "toolCall",
3516 "call",
3517 "function_call",
3518 "functionCall",
3519 ] {
3520 if let Some(nested) = obj.get(key) {
3521 if let Some(found) = extract_tool_call_from_value(nested) {
3522 return Some(found);
3523 }
3524 }
3525 }
3526 }
3527
3528 if let Some(items) = value.as_array() {
3529 for item in items {
3530 if let Some(found) = extract_tool_call_from_value(item) {
3531 return Some(found);
3532 }
3533 }
3534 }
3535
3536 None
3537}
3538
3539fn extract_first_json_object(input: &str) -> Option<String> {
3540 let mut start = None;
3541 let mut depth = 0usize;
3542 for (idx, ch) in input.char_indices() {
3543 if ch == '{' {
3544 if start.is_none() {
3545 start = Some(idx);
3546 }
3547 depth += 1;
3548 } else if ch == '}' {
3549 if depth == 0 {
3550 continue;
3551 }
3552 depth -= 1;
3553 if depth == 0 {
3554 let begin = start?;
3555 let block = input.get(begin..=idx)?;
3556 return Some(block.to_string());
3557 }
3558 }
3559 }
3560 None
3561}
3562
3563fn extract_todo_candidates_from_text(input: &str) -> Vec<Value> {
3564 let mut seen = HashSet::<String>::new();
3565 let mut todos = Vec::new();
3566
3567 for raw_line in input.lines() {
3568 let mut line = raw_line.trim();
3569 let mut structured_line = false;
3570 if line.is_empty() {
3571 continue;
3572 }
3573 if line.starts_with("```") {
3574 continue;
3575 }
3576 if line.ends_with(':') {
3577 continue;
3578 }
3579 if let Some(rest) = line
3580 .strip_prefix("- [ ]")
3581 .or_else(|| line.strip_prefix("* [ ]"))
3582 .or_else(|| line.strip_prefix("- [x]"))
3583 .or_else(|| line.strip_prefix("* [x]"))
3584 {
3585 line = rest.trim();
3586 structured_line = true;
3587 } else if let Some(rest) = line.strip_prefix("- ").or_else(|| line.strip_prefix("* ")) {
3588 line = rest.trim();
3589 structured_line = true;
3590 } else {
3591 let bytes = line.as_bytes();
3592 let mut i = 0usize;
3593 while i < bytes.len() && bytes[i].is_ascii_digit() {
3594 i += 1;
3595 }
3596 if i > 0 && i + 1 < bytes.len() && (bytes[i] == b'.' || bytes[i] == b')') {
3597 line = line[i + 1..].trim();
3598 structured_line = true;
3599 }
3600 }
3601 if !structured_line {
3602 continue;
3603 }
3604
3605 let content = line.trim_matches(|c: char| c.is_whitespace() || c == '-' || c == '*');
3606 if content.len() < 5 || content.len() > 180 {
3607 continue;
3608 }
3609 let key = content.to_lowercase();
3610 if seen.contains(&key) {
3611 continue;
3612 }
3613 seen.insert(key);
3614 todos.push(json!({ "content": content }));
3615 if todos.len() >= 25 {
3616 break;
3617 }
3618 }
3619
3620 todos
3621}
3622
3623async fn emit_plan_todo_fallback(
3624 storage: std::sync::Arc<Storage>,
3625 bus: &EventBus,
3626 session_id: &str,
3627 message_id: &str,
3628 completion: &str,
3629) {
3630 let todos = extract_todo_candidates_from_text(completion);
3631 if todos.is_empty() {
3632 return;
3633 }
3634
3635 let invoke_part = WireMessagePart::tool_invocation(
3636 session_id,
3637 message_id,
3638 "todo_write",
3639 json!({"todos": todos.clone()}),
3640 );
3641 let call_id = invoke_part.id.clone();
3642 bus.publish(EngineEvent::new(
3643 "message.part.updated",
3644 json!({"part": invoke_part}),
3645 ));
3646
3647 if storage.set_todos(session_id, todos).await.is_err() {
3648 let mut failed_part =
3649 WireMessagePart::tool_result(session_id, message_id, "todo_write", json!(null));
3650 failed_part.id = call_id;
3651 failed_part.state = Some("failed".to_string());
3652 failed_part.error = Some("failed to persist plan todos".to_string());
3653 bus.publish(EngineEvent::new(
3654 "message.part.updated",
3655 json!({"part": failed_part}),
3656 ));
3657 return;
3658 }
3659
3660 let normalized = storage.get_todos(session_id).await;
3661 let mut result_part = WireMessagePart::tool_result(
3662 session_id,
3663 message_id,
3664 "todo_write",
3665 json!({ "todos": normalized }),
3666 );
3667 result_part.id = call_id;
3668 bus.publish(EngineEvent::new(
3669 "message.part.updated",
3670 json!({"part": result_part}),
3671 ));
3672 bus.publish(EngineEvent::new(
3673 "todo.updated",
3674 json!({
3675 "sessionID": session_id,
3676 "todos": normalized
3677 }),
3678 ));
3679}
3680
3681async fn emit_plan_question_fallback(
3682 storage: std::sync::Arc<Storage>,
3683 bus: &EventBus,
3684 session_id: &str,
3685 message_id: &str,
3686 completion: &str,
3687) {
3688 let trimmed = completion.trim();
3689 if trimmed.is_empty() {
3690 return;
3691 }
3692
3693 let hints = extract_todo_candidates_from_text(trimmed)
3694 .into_iter()
3695 .take(6)
3696 .filter_map(|v| {
3697 v.get("content")
3698 .and_then(|c| c.as_str())
3699 .map(ToString::to_string)
3700 })
3701 .collect::<Vec<_>>();
3702
3703 let mut options = hints
3704 .iter()
3705 .map(|label| json!({"label": label, "description": "Use this as a starting task"}))
3706 .collect::<Vec<_>>();
3707 if options.is_empty() {
3708 options = vec![
3709 json!({"label":"Define scope", "description":"Clarify the intended outcome"}),
3710 json!({"label":"Provide constraints", "description":"Budget, timeline, and constraints"}),
3711 json!({"label":"Draft a starter list", "description":"Generate a first-pass task list"}),
3712 ];
3713 }
3714
3715 let question_payload = vec![json!({
3716 "header":"Planning Input",
3717 "question":"I couldn't produce a concrete task list yet. Which tasks should I include first?",
3718 "options": options,
3719 "multiple": true,
3720 "custom": true
3721 })];
3722
3723 let request = storage
3724 .add_question_request(session_id, message_id, question_payload.clone())
3725 .await
3726 .ok();
3727 bus.publish(EngineEvent::new(
3728 "question.asked",
3729 json!({
3730 "id": request
3731 .as_ref()
3732 .map(|req| req.id.clone())
3733 .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
3734 "sessionID": session_id,
3735 "messageID": message_id,
3736 "questions": question_payload,
3737 "tool": request.and_then(|req| {
3738 req.tool.map(|tool| {
3739 json!({
3740 "callID": tool.call_id,
3741 "messageID": tool.message_id
3742 })
3743 })
3744 })
3745 }),
3746 ));
3747}
3748
3749async fn load_chat_history(storage: std::sync::Arc<Storage>, session_id: &str) -> Vec<ChatMessage> {
3750 let Some(session) = storage.get_session(session_id).await else {
3751 return Vec::new();
3752 };
3753 let messages = session
3754 .messages
3755 .into_iter()
3756 .map(|m| {
3757 let role = format!("{:?}", m.role).to_lowercase();
3758 let content = m
3759 .parts
3760 .into_iter()
3761 .map(|part| match part {
3762 MessagePart::Text { text } => text,
3763 MessagePart::Reasoning { text } => text,
3764 MessagePart::ToolInvocation { tool, result, .. } => {
3765 format!("Tool {tool} => {}", result.unwrap_or_else(|| json!({})))
3766 }
3767 })
3768 .collect::<Vec<_>>()
3769 .join("\n");
3770 ChatMessage {
3771 role,
3772 content,
3773 attachments: Vec::new(),
3774 }
3775 })
3776 .collect::<Vec<_>>();
3777 compact_chat_history(messages)
3778}
3779
3780fn attach_to_last_user_message(messages: &mut [ChatMessage], attachments: &[ChatAttachment]) {
3781 if attachments.is_empty() {
3782 return;
3783 }
3784 if let Some(message) = messages.iter_mut().rev().find(|m| m.role == "user") {
3785 message.attachments = attachments.to_vec();
3786 }
3787}
3788
3789async fn build_runtime_attachments(
3790 provider_id: &str,
3791 parts: &[MessagePartInput],
3792) -> Vec<ChatAttachment> {
3793 if !supports_image_attachments(provider_id) {
3794 return Vec::new();
3795 }
3796
3797 let mut attachments = Vec::new();
3798 for part in parts {
3799 let MessagePartInput::File { mime, url, .. } = part else {
3800 continue;
3801 };
3802 if !mime.to_ascii_lowercase().starts_with("image/") {
3803 continue;
3804 }
3805 if let Some(source_url) = normalize_attachment_source_url(url, mime).await {
3806 attachments.push(ChatAttachment::ImageUrl { url: source_url });
3807 }
3808 }
3809
3810 attachments
3811}
3812
3813fn supports_image_attachments(provider_id: &str) -> bool {
3814 matches!(
3815 provider_id,
3816 "openai"
3817 | "openrouter"
3818 | "ollama"
3819 | "groq"
3820 | "mistral"
3821 | "together"
3822 | "azure"
3823 | "bedrock"
3824 | "vertex"
3825 | "copilot"
3826 )
3827}
3828
3829async fn normalize_attachment_source_url(url: &str, mime: &str) -> Option<String> {
3830 let trimmed = url.trim();
3831 if trimmed.is_empty() {
3832 return None;
3833 }
3834 if trimmed.starts_with("http://")
3835 || trimmed.starts_with("https://")
3836 || trimmed.starts_with("data:")
3837 {
3838 return Some(trimmed.to_string());
3839 }
3840
3841 let file_path = trimmed
3842 .strip_prefix("file://")
3843 .map(PathBuf::from)
3844 .unwrap_or_else(|| PathBuf::from(trimmed));
3845 if !file_path.exists() {
3846 return None;
3847 }
3848
3849 let max_bytes = std::env::var("TANDEM_CHANNEL_MAX_ATTACHMENT_BYTES")
3850 .ok()
3851 .and_then(|v| v.parse::<usize>().ok())
3852 .unwrap_or(20 * 1024 * 1024);
3853
3854 let bytes = match tokio::fs::read(&file_path).await {
3855 Ok(bytes) => bytes,
3856 Err(err) => {
3857 tracing::warn!(
3858 "failed reading local attachment '{}': {}",
3859 file_path.to_string_lossy(),
3860 err
3861 );
3862 return None;
3863 }
3864 };
3865 if bytes.len() > max_bytes {
3866 tracing::warn!(
3867 "local attachment '{}' exceeds max bytes ({} > {})",
3868 file_path.to_string_lossy(),
3869 bytes.len(),
3870 max_bytes
3871 );
3872 return None;
3873 }
3874
3875 use base64::Engine as _;
3876 let b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
3877 Some(format!("data:{mime};base64,{b64}"))
3878}
3879
3880struct ToolSideEventContext<'a> {
3881 session_id: &'a str,
3882 message_id: &'a str,
3883 tool: &'a str,
3884 args: &'a serde_json::Value,
3885 metadata: &'a serde_json::Value,
3886 workspace_root: Option<&'a str>,
3887 effective_cwd: Option<&'a str>,
3888}
3889
3890async fn emit_tool_side_events(
3891 storage: std::sync::Arc<Storage>,
3892 bus: &EventBus,
3893 ctx: ToolSideEventContext<'_>,
3894) {
3895 let ToolSideEventContext {
3896 session_id,
3897 message_id,
3898 tool,
3899 args,
3900 metadata,
3901 workspace_root,
3902 effective_cwd,
3903 } = ctx;
3904 if tool == "todo_write" {
3905 let todos_from_metadata = metadata
3906 .get("todos")
3907 .and_then(|v| v.as_array())
3908 .cloned()
3909 .unwrap_or_default();
3910
3911 if !todos_from_metadata.is_empty() {
3912 let _ = storage.set_todos(session_id, todos_from_metadata).await;
3913 } else {
3914 let current = storage.get_todos(session_id).await;
3915 if let Some(updated) = apply_todo_updates_from_args(current, args) {
3916 let _ = storage.set_todos(session_id, updated).await;
3917 }
3918 }
3919
3920 let normalized = storage.get_todos(session_id).await;
3921 bus.publish(EngineEvent::new(
3922 "todo.updated",
3923 json!({
3924 "sessionID": session_id,
3925 "todos": normalized,
3926 "workspaceRoot": workspace_root,
3927 "effectiveCwd": effective_cwd
3928 }),
3929 ));
3930 }
3931 if tool == "question" {
3932 let questions = metadata
3933 .get("questions")
3934 .and_then(|v| v.as_array())
3935 .cloned()
3936 .unwrap_or_default();
3937 if questions.is_empty() {
3938 tracing::warn!(
3939 "question tool produced empty questions payload; skipping question.asked event session_id={} message_id={}",
3940 session_id,
3941 message_id
3942 );
3943 } else {
3944 let request = storage
3945 .add_question_request(session_id, message_id, questions.clone())
3946 .await
3947 .ok();
3948 bus.publish(EngineEvent::new(
3949 "question.asked",
3950 json!({
3951 "id": request
3952 .as_ref()
3953 .map(|req| req.id.clone())
3954 .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
3955 "sessionID": session_id,
3956 "messageID": message_id,
3957 "questions": questions,
3958 "tool": request.and_then(|req| {
3959 req.tool.map(|tool| {
3960 json!({
3961 "callID": tool.call_id,
3962 "messageID": tool.message_id
3963 })
3964 })
3965 }),
3966 "workspaceRoot": workspace_root,
3967 "effectiveCwd": effective_cwd
3968 }),
3969 ));
3970 }
3971 }
3972 if let Some(events) = metadata.get("events").and_then(|v| v.as_array()) {
3973 for event in events {
3974 let Some(event_type) = event.get("type").and_then(|v| v.as_str()) else {
3975 continue;
3976 };
3977 if !event_type.starts_with("agent_team.") {
3978 continue;
3979 }
3980 let mut properties = event
3981 .get("properties")
3982 .and_then(|v| v.as_object())
3983 .cloned()
3984 .unwrap_or_default();
3985 properties
3986 .entry("sessionID".to_string())
3987 .or_insert(json!(session_id));
3988 properties
3989 .entry("messageID".to_string())
3990 .or_insert(json!(message_id));
3991 properties
3992 .entry("workspaceRoot".to_string())
3993 .or_insert(json!(workspace_root));
3994 properties
3995 .entry("effectiveCwd".to_string())
3996 .or_insert(json!(effective_cwd));
3997 bus.publish(EngineEvent::new(event_type, Value::Object(properties)));
3998 }
3999 }
4000}
4001
4002fn apply_todo_updates_from_args(current: Vec<Value>, args: &Value) -> Option<Vec<Value>> {
4003 let obj = args.as_object()?;
4004 let mut todos = current;
4005 let mut changed = false;
4006
4007 if let Some(items) = obj.get("todos").and_then(|v| v.as_array()) {
4008 for item in items {
4009 let Some(item_obj) = item.as_object() else {
4010 continue;
4011 };
4012 let status = item_obj
4013 .get("status")
4014 .and_then(|v| v.as_str())
4015 .map(normalize_todo_status);
4016 let target = item_obj
4017 .get("task_id")
4018 .or_else(|| item_obj.get("todo_id"))
4019 .or_else(|| item_obj.get("id"));
4020
4021 if let (Some(status), Some(target)) = (status, target) {
4022 changed |= apply_single_todo_status_update(&mut todos, target, &status);
4023 }
4024 }
4025 }
4026
4027 let status = obj
4028 .get("status")
4029 .and_then(|v| v.as_str())
4030 .map(normalize_todo_status);
4031 let target = obj
4032 .get("task_id")
4033 .or_else(|| obj.get("todo_id"))
4034 .or_else(|| obj.get("id"));
4035 if let (Some(status), Some(target)) = (status, target) {
4036 changed |= apply_single_todo_status_update(&mut todos, target, &status);
4037 }
4038
4039 if changed {
4040 Some(todos)
4041 } else {
4042 None
4043 }
4044}
4045
4046fn apply_single_todo_status_update(todos: &mut [Value], target: &Value, status: &str) -> bool {
4047 let idx_from_value = match target {
4048 Value::Number(n) => n.as_u64().map(|v| v.saturating_sub(1) as usize),
4049 Value::String(s) => {
4050 let trimmed = s.trim();
4051 trimmed
4052 .parse::<usize>()
4053 .ok()
4054 .map(|v| v.saturating_sub(1))
4055 .or_else(|| {
4056 let digits = trimmed
4057 .chars()
4058 .rev()
4059 .take_while(|c| c.is_ascii_digit())
4060 .collect::<String>()
4061 .chars()
4062 .rev()
4063 .collect::<String>();
4064 digits.parse::<usize>().ok().map(|v| v.saturating_sub(1))
4065 })
4066 }
4067 _ => None,
4068 };
4069
4070 if let Some(idx) = idx_from_value {
4071 if idx < todos.len() {
4072 if let Some(obj) = todos[idx].as_object_mut() {
4073 obj.insert("status".to_string(), Value::String(status.to_string()));
4074 return true;
4075 }
4076 }
4077 }
4078
4079 let id_target = target.as_str().map(|s| s.trim()).filter(|s| !s.is_empty());
4080 if let Some(id_target) = id_target {
4081 for todo in todos.iter_mut() {
4082 if let Some(obj) = todo.as_object_mut() {
4083 if obj.get("id").and_then(|v| v.as_str()) == Some(id_target) {
4084 obj.insert("status".to_string(), Value::String(status.to_string()));
4085 return true;
4086 }
4087 }
4088 }
4089 }
4090
4091 false
4092}
4093
4094fn normalize_todo_status(raw: &str) -> String {
4095 match raw.trim().to_lowercase().as_str() {
4096 "in_progress" | "inprogress" | "running" | "working" => "in_progress".to_string(),
4097 "done" | "complete" | "completed" => "completed".to_string(),
4098 "cancelled" | "canceled" | "aborted" | "skipped" => "cancelled".to_string(),
4099 "open" | "todo" | "pending" => "pending".to_string(),
4100 other => other.to_string(),
4101 }
4102}
4103
4104fn compact_chat_history(messages: Vec<ChatMessage>) -> Vec<ChatMessage> {
4105 const MAX_CONTEXT_CHARS: usize = 80_000;
4106 const KEEP_RECENT_MESSAGES: usize = 40;
4107
4108 if messages.len() <= KEEP_RECENT_MESSAGES {
4109 let total_chars = messages.iter().map(|m| m.content.len()).sum::<usize>();
4110 if total_chars <= MAX_CONTEXT_CHARS {
4111 return messages;
4112 }
4113 }
4114
4115 let mut kept = messages;
4116 let mut dropped_count = 0usize;
4117 let mut total_chars = kept.iter().map(|m| m.content.len()).sum::<usize>();
4118
4119 while kept.len() > KEEP_RECENT_MESSAGES || total_chars > MAX_CONTEXT_CHARS {
4120 if kept.is_empty() {
4121 break;
4122 }
4123 let removed = kept.remove(0);
4124 total_chars = total_chars.saturating_sub(removed.content.len());
4125 dropped_count += 1;
4126 }
4127
4128 if dropped_count > 0 {
4129 kept.insert(
4130 0,
4131 ChatMessage {
4132 role: "system".to_string(),
4133 content: format!(
4134 "[history compacted: omitted {} older messages to fit context window]",
4135 dropped_count
4136 ),
4137 attachments: Vec::new(),
4138 },
4139 );
4140 }
4141 kept
4142}
4143
4144#[cfg(test)]
4145mod tests {
4146 use super::*;
4147 use crate::{EventBus, Storage};
4148 use uuid::Uuid;
4149
4150 #[tokio::test]
4151 async fn todo_updated_event_is_normalized() {
4152 let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
4153 let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
4154 let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
4155 let session_id = session.id.clone();
4156 storage.save_session(session).await.expect("save session");
4157
4158 let bus = EventBus::new();
4159 let mut rx = bus.subscribe();
4160 emit_tool_side_events(
4161 storage.clone(),
4162 &bus,
4163 ToolSideEventContext {
4164 session_id: &session_id,
4165 message_id: "m1",
4166 tool: "todo_write",
4167 args: &json!({"todos":[{"content":"ship parity"}]}),
4168 metadata: &json!({"todos":[{"content":"ship parity"}]}),
4169 workspace_root: Some("."),
4170 effective_cwd: Some("."),
4171 },
4172 )
4173 .await;
4174
4175 let event = rx.recv().await.expect("event");
4176 assert_eq!(event.event_type, "todo.updated");
4177 let todos = event
4178 .properties
4179 .get("todos")
4180 .and_then(|v| v.as_array())
4181 .cloned()
4182 .unwrap_or_default();
4183 assert_eq!(todos.len(), 1);
4184 assert!(todos[0].get("id").and_then(|v| v.as_str()).is_some());
4185 assert_eq!(
4186 todos[0].get("content").and_then(|v| v.as_str()),
4187 Some("ship parity")
4188 );
4189 assert!(todos[0].get("status").and_then(|v| v.as_str()).is_some());
4190 }
4191
4192 #[tokio::test]
4193 async fn question_asked_event_contains_tool_reference() {
4194 let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
4195 let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
4196 let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
4197 let session_id = session.id.clone();
4198 storage.save_session(session).await.expect("save session");
4199
4200 let bus = EventBus::new();
4201 let mut rx = bus.subscribe();
4202 emit_tool_side_events(
4203 storage,
4204 &bus,
4205 ToolSideEventContext {
4206 session_id: &session_id,
4207 message_id: "msg-1",
4208 tool: "question",
4209 args: &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
4210 metadata: &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
4211 workspace_root: Some("."),
4212 effective_cwd: Some("."),
4213 },
4214 )
4215 .await;
4216
4217 let event = rx.recv().await.expect("event");
4218 assert_eq!(event.event_type, "question.asked");
4219 assert_eq!(
4220 event
4221 .properties
4222 .get("sessionID")
4223 .and_then(|v| v.as_str())
4224 .unwrap_or(""),
4225 session_id
4226 );
4227 let tool = event
4228 .properties
4229 .get("tool")
4230 .cloned()
4231 .unwrap_or_else(|| json!({}));
4232 assert!(tool.get("callID").and_then(|v| v.as_str()).is_some());
4233 assert_eq!(
4234 tool.get("messageID").and_then(|v| v.as_str()),
4235 Some("msg-1")
4236 );
4237 }
4238
4239 #[test]
4240 fn compact_chat_history_keeps_recent_and_inserts_summary() {
4241 let mut messages = Vec::new();
4242 for i in 0..60 {
4243 messages.push(ChatMessage {
4244 role: "user".to_string(),
4245 content: format!("message-{i}"),
4246 attachments: Vec::new(),
4247 });
4248 }
4249 let compacted = compact_chat_history(messages);
4250 assert!(compacted.len() <= 41);
4251 assert_eq!(compacted[0].role, "system");
4252 assert!(compacted[0].content.contains("history compacted"));
4253 assert!(compacted.iter().any(|m| m.content.contains("message-59")));
4254 }
4255
4256 #[test]
4257 fn extracts_todos_from_checklist_and_numbered_lines() {
4258 let input = r#"
4259Plan:
4260- [ ] Audit current implementation
4261- [ ] Add planner fallback
42621. Add regression test coverage
4263"#;
4264 let todos = extract_todo_candidates_from_text(input);
4265 assert_eq!(todos.len(), 3);
4266 assert_eq!(
4267 todos[0].get("content").and_then(|v| v.as_str()),
4268 Some("Audit current implementation")
4269 );
4270 }
4271
4272 #[test]
4273 fn does_not_extract_todos_from_plain_prose_lines() {
4274 let input = r#"
4275I need more information to proceed.
4276Can you tell me the event size and budget?
4277Once I have that, I can provide a detailed plan.
4278"#;
4279 let todos = extract_todo_candidates_from_text(input);
4280 assert!(todos.is_empty());
4281 }
4282
4283 #[test]
4284 fn parses_wrapped_tool_call_from_markdown_response() {
4285 let input = r#"
4286Here is the tool call:
4287```json
4288{"tool_call":{"name":"todo_write","arguments":{"todos":[{"content":"a"}]}}}
4289```
4290"#;
4291 let parsed = parse_tool_invocation_from_response(input).expect("tool call");
4292 assert_eq!(parsed.0, "todo_write");
4293 assert!(parsed.1.get("todos").is_some());
4294 }
4295
4296 #[test]
4297 fn parses_function_style_todowrite_call() {
4298 let input = r#"Status: Completed
4299Call: todowrite(task_id=2, status="completed")"#;
4300 let parsed = parse_tool_invocation_from_response(input).expect("function-style tool call");
4301 assert_eq!(parsed.0, "todo_write");
4302 assert_eq!(parsed.1.get("task_id").and_then(|v| v.as_i64()), Some(2));
4303 assert_eq!(
4304 parsed.1.get("status").and_then(|v| v.as_str()),
4305 Some("completed")
4306 );
4307 }
4308
4309 #[test]
4310 fn parses_multiple_function_style_todowrite_calls() {
4311 let input = r#"
4312Call: todowrite(task_id=2, status="completed")
4313Call: todowrite(task_id=3, status="in_progress")
4314"#;
4315 let parsed = parse_tool_invocations_from_response(input);
4316 assert_eq!(parsed.len(), 2);
4317 assert_eq!(parsed[0].0, "todo_write");
4318 assert_eq!(parsed[0].1.get("task_id").and_then(|v| v.as_i64()), Some(2));
4319 assert_eq!(
4320 parsed[0].1.get("status").and_then(|v| v.as_str()),
4321 Some("completed")
4322 );
4323 assert_eq!(parsed[1].1.get("task_id").and_then(|v| v.as_i64()), Some(3));
4324 assert_eq!(
4325 parsed[1].1.get("status").and_then(|v| v.as_str()),
4326 Some("in_progress")
4327 );
4328 }
4329
4330 #[test]
4331 fn applies_todo_status_update_from_task_id_args() {
4332 let current = vec![
4333 json!({"id":"todo-1","content":"a","status":"pending"}),
4334 json!({"id":"todo-2","content":"b","status":"pending"}),
4335 json!({"id":"todo-3","content":"c","status":"pending"}),
4336 ];
4337 let updated =
4338 apply_todo_updates_from_args(current, &json!({"task_id":2, "status":"completed"}))
4339 .expect("status update");
4340 assert_eq!(
4341 updated[1].get("status").and_then(|v| v.as_str()),
4342 Some("completed")
4343 );
4344 }
4345
4346 #[test]
4347 fn normalizes_todo_write_tasks_alias() {
4348 let normalized = normalize_todo_write_args(
4349 json!({"tasks":[{"title":"Book venue"},{"name":"Send invites"}]}),
4350 "",
4351 );
4352 let todos = normalized
4353 .get("todos")
4354 .and_then(|v| v.as_array())
4355 .cloned()
4356 .unwrap_or_default();
4357 assert_eq!(todos.len(), 2);
4358 assert_eq!(
4359 todos[0].get("content").and_then(|v| v.as_str()),
4360 Some("Book venue")
4361 );
4362 assert_eq!(
4363 todos[1].get("content").and_then(|v| v.as_str()),
4364 Some("Send invites")
4365 );
4366 }
4367
4368 #[test]
4369 fn normalizes_todo_write_from_completion_when_args_empty() {
4370 let completion = "Plan:\n1. Secure venue\n2. Create playlist\n3. Send invites";
4371 let normalized = normalize_todo_write_args(json!({}), completion);
4372 let todos = normalized
4373 .get("todos")
4374 .and_then(|v| v.as_array())
4375 .cloned()
4376 .unwrap_or_default();
4377 assert_eq!(todos.len(), 3);
4378 assert!(!is_empty_todo_write_args(&normalized));
4379 }
4380
4381 #[test]
4382 fn empty_todo_write_args_allows_status_updates() {
4383 let args = json!({"task_id": 2, "status":"completed"});
4384 assert!(!is_empty_todo_write_args(&args));
4385 }
4386
4387 #[test]
4388 fn streamed_websearch_args_fallback_to_query_string() {
4389 let parsed = parse_streamed_tool_args("websearch", "meaning of life");
4390 assert_eq!(
4391 parsed.get("query").and_then(|v| v.as_str()),
4392 Some("meaning of life")
4393 );
4394 }
4395
4396 #[test]
4397 fn streamed_websearch_stringified_json_args_are_unwrapped() {
4398 let parsed = parse_streamed_tool_args("websearch", r#""donkey gestation period""#);
4399 assert_eq!(
4400 parsed.get("query").and_then(|v| v.as_str()),
4401 Some("donkey gestation period")
4402 );
4403 }
4404
4405 #[test]
4406 fn streamed_websearch_args_strip_arg_key_value_wrappers() {
4407 let parsed = parse_streamed_tool_args(
4408 "websearch",
4409 "query</arg_key><arg_value>taj card what is it benefits how to apply</arg_value>",
4410 );
4411 assert_eq!(
4412 parsed.get("query").and_then(|v| v.as_str()),
4413 Some("taj card what is it benefits how to apply")
4414 );
4415 }
4416
4417 #[test]
4418 fn normalize_tool_args_websearch_infers_from_user_text() {
4419 let normalized =
4420 normalize_tool_args("websearch", json!({}), "web search meaning of life", "");
4421 assert_eq!(
4422 normalized.args.get("query").and_then(|v| v.as_str()),
4423 Some("meaning of life")
4424 );
4425 assert_eq!(normalized.args_source, "inferred_from_user");
4426 assert_eq!(normalized.args_integrity, "recovered");
4427 }
4428
4429 #[test]
4430 fn normalize_tool_args_websearch_keeps_existing_query() {
4431 let normalized = normalize_tool_args(
4432 "websearch",
4433 json!({"query":"already set"}),
4434 "web search should not override",
4435 "",
4436 );
4437 assert_eq!(
4438 normalized.args.get("query").and_then(|v| v.as_str()),
4439 Some("already set")
4440 );
4441 assert_eq!(normalized.args_source, "provider_json");
4442 assert_eq!(normalized.args_integrity, "ok");
4443 }
4444
4445 #[test]
4446 fn normalize_tool_args_websearch_fails_when_unrecoverable() {
4447 let normalized = normalize_tool_args("websearch", json!({}), "search", "");
4448 assert!(normalized.query.is_none());
4449 assert!(normalized.missing_terminal);
4450 assert_eq!(normalized.args_source, "missing");
4451 assert_eq!(normalized.args_integrity, "empty");
4452 }
4453
4454 #[test]
4455 fn normalize_tool_args_webfetch_infers_url_from_user_prompt() {
4456 let normalized = normalize_tool_args(
4457 "webfetch",
4458 json!({}),
4459 "Please fetch `https://tandem.frumu.ai/docs/` in markdown mode",
4460 "",
4461 );
4462 assert!(!normalized.missing_terminal);
4463 assert_eq!(
4464 normalized.args.get("url").and_then(|v| v.as_str()),
4465 Some("https://tandem.frumu.ai/docs/")
4466 );
4467 assert_eq!(normalized.args_source, "inferred_from_user");
4468 assert_eq!(normalized.args_integrity, "recovered");
4469 }
4470
4471 #[test]
4472 fn normalize_tool_args_webfetch_recovers_nested_url_alias() {
4473 let normalized = normalize_tool_args(
4474 "webfetch",
4475 json!({"args":{"uri":"https://example.com/page"}}),
4476 "",
4477 "",
4478 );
4479 assert!(!normalized.missing_terminal);
4480 assert_eq!(
4481 normalized.args.get("url").and_then(|v| v.as_str()),
4482 Some("https://example.com/page")
4483 );
4484 assert_eq!(normalized.args_source, "provider_json");
4485 }
4486
4487 #[test]
4488 fn normalize_tool_args_webfetch_fails_when_url_unrecoverable() {
4489 let normalized = normalize_tool_args("webfetch", json!({}), "fetch the site", "");
4490 assert!(normalized.missing_terminal);
4491 assert_eq!(
4492 normalized.missing_terminal_reason.as_deref(),
4493 Some("WEBFETCH_URL_MISSING")
4494 );
4495 }
4496
4497 #[test]
4498 fn normalize_tool_args_write_requires_path() {
4499 let normalized = normalize_tool_args("write", json!({}), "", "");
4500 assert!(normalized.missing_terminal);
4501 assert_eq!(
4502 normalized.missing_terminal_reason.as_deref(),
4503 Some("FILE_PATH_MISSING")
4504 );
4505 }
4506
4507 #[test]
4508 fn normalize_tool_args_write_recovers_alias_path_key() {
4509 let normalized = normalize_tool_args(
4510 "write",
4511 json!({"filePath":"docs/CONCEPT.md","content":"hello"}),
4512 "",
4513 "",
4514 );
4515 assert!(!normalized.missing_terminal);
4516 assert_eq!(
4517 normalized.args.get("path").and_then(|v| v.as_str()),
4518 Some("docs/CONCEPT.md")
4519 );
4520 assert_eq!(
4521 normalized.args.get("content").and_then(|v| v.as_str()),
4522 Some("hello")
4523 );
4524 }
4525
4526 #[test]
4527 fn normalize_tool_args_read_infers_path_from_user_prompt() {
4528 let normalized = normalize_tool_args(
4529 "read",
4530 json!({}),
4531 "Please inspect `FEATURE_LIST.md` and summarize key sections.",
4532 "",
4533 );
4534 assert!(!normalized.missing_terminal);
4535 assert_eq!(
4536 normalized.args.get("path").and_then(|v| v.as_str()),
4537 Some("FEATURE_LIST.md")
4538 );
4539 assert_eq!(normalized.args_source, "inferred_from_user");
4540 assert_eq!(normalized.args_integrity, "recovered");
4541 }
4542
4543 #[test]
4544 fn normalize_tool_args_read_does_not_infer_path_from_assistant_context() {
4545 let normalized = normalize_tool_args(
4546 "read",
4547 json!({}),
4548 "generic instruction",
4549 "I will read src-tauri/src/orchestrator/engine.rs first.",
4550 );
4551 assert!(normalized.missing_terminal);
4552 assert_eq!(
4553 normalized.missing_terminal_reason.as_deref(),
4554 Some("FILE_PATH_MISSING")
4555 );
4556 }
4557
4558 #[test]
4559 fn normalize_tool_args_write_recovers_path_from_nested_array_payload() {
4560 let normalized = normalize_tool_args(
4561 "write",
4562 json!({"args":[{"file_path":"docs/CONCEPT.md"}],"content":"hello"}),
4563 "",
4564 "",
4565 );
4566 assert!(!normalized.missing_terminal);
4567 assert_eq!(
4568 normalized.args.get("path").and_then(|v| v.as_str()),
4569 Some("docs/CONCEPT.md")
4570 );
4571 }
4572
4573 #[test]
4574 fn normalize_tool_args_write_recovers_content_alias() {
4575 let normalized = normalize_tool_args(
4576 "write",
4577 json!({"path":"docs/FEATURES.md","body":"feature notes"}),
4578 "",
4579 "",
4580 );
4581 assert!(!normalized.missing_terminal);
4582 assert_eq!(
4583 normalized.args.get("content").and_then(|v| v.as_str()),
4584 Some("feature notes")
4585 );
4586 }
4587
4588 #[test]
4589 fn normalize_tool_args_write_fails_when_content_missing() {
4590 let normalized = normalize_tool_args("write", json!({"path":"docs/FEATURES.md"}), "", "");
4591 assert!(normalized.missing_terminal);
4592 assert_eq!(
4593 normalized.missing_terminal_reason.as_deref(),
4594 Some("WRITE_CONTENT_MISSING")
4595 );
4596 }
4597
4598 #[test]
4599 fn normalize_tool_args_write_recovers_raw_nested_string_content() {
4600 let normalized = normalize_tool_args(
4601 "write",
4602 json!({"path":"docs/FEATURES.md","args":"Line 1\nLine 2"}),
4603 "",
4604 "",
4605 );
4606 assert!(!normalized.missing_terminal);
4607 assert_eq!(
4608 normalized.args.get("path").and_then(|v| v.as_str()),
4609 Some("docs/FEATURES.md")
4610 );
4611 assert_eq!(
4612 normalized.args.get("content").and_then(|v| v.as_str()),
4613 Some("Line 1\nLine 2")
4614 );
4615 }
4616
4617 #[test]
4618 fn normalize_tool_args_write_does_not_treat_path_as_content() {
4619 let normalized = normalize_tool_args("write", json!("docs/FEATURES.md"), "", "");
4620 assert!(normalized.missing_terminal);
4621 assert_eq!(
4622 normalized.missing_terminal_reason.as_deref(),
4623 Some("WRITE_CONTENT_MISSING")
4624 );
4625 }
4626
4627 #[test]
4628 fn normalize_tool_args_read_infers_path_from_bold_markdown() {
4629 let normalized = normalize_tool_args(
4630 "read",
4631 json!({}),
4632 "Please read **FEATURE_LIST.md** and summarize.",
4633 "",
4634 );
4635 assert!(!normalized.missing_terminal);
4636 assert_eq!(
4637 normalized.args.get("path").and_then(|v| v.as_str()),
4638 Some("FEATURE_LIST.md")
4639 );
4640 }
4641
4642 #[test]
4643 fn normalize_tool_args_shell_infers_command_from_user_prompt() {
4644 let normalized = normalize_tool_args("bash", json!({}), "Run `rg -n \"TODO\" .`", "");
4645 assert!(!normalized.missing_terminal);
4646 assert_eq!(
4647 normalized.args.get("command").and_then(|v| v.as_str()),
4648 Some("rg -n \"TODO\" .")
4649 );
4650 assert_eq!(normalized.args_source, "inferred_from_user");
4651 assert_eq!(normalized.args_integrity, "recovered");
4652 }
4653
4654 #[test]
4655 fn normalize_tool_args_read_rejects_root_only_path() {
4656 let normalized = normalize_tool_args("read", json!({"path":"/"}), "", "");
4657 assert!(normalized.missing_terminal);
4658 assert_eq!(
4659 normalized.missing_terminal_reason.as_deref(),
4660 Some("FILE_PATH_MISSING")
4661 );
4662 }
4663
4664 #[test]
4665 fn normalize_tool_args_read_recovers_when_provider_path_is_root_only() {
4666 let normalized =
4667 normalize_tool_args("read", json!({"path":"/"}), "Please open `CONCEPT.md`", "");
4668 assert!(!normalized.missing_terminal);
4669 assert_eq!(
4670 normalized.args.get("path").and_then(|v| v.as_str()),
4671 Some("CONCEPT.md")
4672 );
4673 assert_eq!(normalized.args_source, "inferred_from_user");
4674 assert_eq!(normalized.args_integrity, "recovered");
4675 }
4676
4677 #[test]
4678 fn normalize_tool_args_read_rejects_tool_call_markup_path() {
4679 let normalized = normalize_tool_args(
4680 "read",
4681 json!({
4682 "path":"<tool_call>\n<function=glob>\n<parameter=pattern>**/*</parameter>\n</function>\n</tool_call>"
4683 }),
4684 "",
4685 "",
4686 );
4687 assert!(normalized.missing_terminal);
4688 assert_eq!(
4689 normalized.missing_terminal_reason.as_deref(),
4690 Some("FILE_PATH_MISSING")
4691 );
4692 }
4693
4694 #[test]
4695 fn normalize_tool_args_read_rejects_glob_pattern_path() {
4696 let normalized = normalize_tool_args("read", json!({"path":"**/*"}), "", "");
4697 assert!(normalized.missing_terminal);
4698 assert_eq!(
4699 normalized.missing_terminal_reason.as_deref(),
4700 Some("FILE_PATH_MISSING")
4701 );
4702 }
4703
4704 #[test]
4705 fn normalize_tool_args_read_rejects_placeholder_path() {
4706 let normalized = normalize_tool_args("read", json!({"path":"files/directories"}), "", "");
4707 assert!(normalized.missing_terminal);
4708 assert_eq!(
4709 normalized.missing_terminal_reason.as_deref(),
4710 Some("FILE_PATH_MISSING")
4711 );
4712 }
4713
4714 #[test]
4715 fn normalize_tool_args_read_rejects_tool_policy_placeholder_path() {
4716 let normalized = normalize_tool_args("read", json!({"path":"tool/policy"}), "", "");
4717 assert!(normalized.missing_terminal);
4718 assert_eq!(
4719 normalized.missing_terminal_reason.as_deref(),
4720 Some("FILE_PATH_MISSING")
4721 );
4722 }
4723
4724 #[test]
4725 fn normalize_tool_args_read_recovers_pdf_path_from_user_text() {
4726 let normalized = normalize_tool_args(
4727 "read",
4728 json!({"path":"tool/policy"}),
4729 "Read `T1011U kitöltési útmutató.pdf` and summarize.",
4730 "",
4731 );
4732 assert!(!normalized.missing_terminal);
4733 assert_eq!(
4734 normalized.args.get("path").and_then(|v| v.as_str()),
4735 Some("T1011U kitöltési útmutató.pdf")
4736 );
4737 assert_eq!(normalized.args_source, "inferred_from_user");
4738 assert_eq!(normalized.args_integrity, "recovered");
4739 }
4740
4741 #[test]
4742 fn normalize_tool_name_strips_default_api_namespace() {
4743 assert_eq!(normalize_tool_name("default_api:read"), "read");
4744 assert_eq!(normalize_tool_name("functions.shell"), "bash");
4745 }
4746
4747 #[test]
4748 fn mcp_server_from_tool_name_parses_server_segment() {
4749 assert_eq!(
4750 mcp_server_from_tool_name("mcp.arcade.jira_getboards"),
4751 Some("arcade")
4752 );
4753 assert_eq!(mcp_server_from_tool_name("read"), None);
4754 assert_eq!(mcp_server_from_tool_name("mcp"), None);
4755 }
4756
4757 #[test]
4758 fn batch_helpers_use_name_when_tool_is_wrapper() {
4759 let args = json!({
4760 "tool_calls":[
4761 {"tool":"default_api","name":"read","args":{"path":"CONCEPT.md"}},
4762 {"tool":"default_api:glob","args":{"pattern":"*.md"}}
4763 ]
4764 });
4765 let calls = extract_batch_calls(&args);
4766 assert_eq!(calls.len(), 2);
4767 assert_eq!(calls[0].0, "read");
4768 assert_eq!(calls[1].0, "glob");
4769 assert!(is_read_only_batch_call(&args));
4770 let sig = batch_tool_signature(&args).unwrap_or_default();
4771 assert!(sig.contains("read:"));
4772 assert!(sig.contains("glob:"));
4773 }
4774
4775 #[test]
4776 fn batch_helpers_resolve_nested_function_name() {
4777 let args = json!({
4778 "tool_calls":[
4779 {"tool":"default_api","function":{"name":"read"},"args":{"path":"CONCEPT.md"}}
4780 ]
4781 });
4782 let calls = extract_batch_calls(&args);
4783 assert_eq!(calls.len(), 1);
4784 assert_eq!(calls[0].0, "read");
4785 assert!(is_read_only_batch_call(&args));
4786 }
4787
4788 #[test]
4789 fn batch_output_classifier_detects_non_productive_unknown_results() {
4790 let output = r#"
4791[
4792 {"tool":"default_api","output":"Unknown tool: default_api","metadata":{}},
4793 {"tool":"default_api","output":"Unknown tool: default_api","metadata":{}}
4794]
4795"#;
4796 assert!(is_non_productive_batch_output(output));
4797 }
4798
4799 #[test]
4800 fn runtime_prompt_includes_execution_environment_block() {
4801 let prompt = tandem_runtime_system_prompt(&HostRuntimeContext {
4802 os: HostOs::Windows,
4803 arch: "x86_64".to_string(),
4804 shell_family: ShellFamily::Powershell,
4805 path_style: PathStyle::Windows,
4806 });
4807 assert!(prompt.contains("[Execution Environment]"));
4808 assert!(prompt.contains("Host OS: windows"));
4809 assert!(prompt.contains("Shell: powershell"));
4810 assert!(prompt.contains("Path style: windows"));
4811 }
4812
4813 #[test]
4814 fn extract_mcp_auth_required_metadata_parses_expected_shape() {
4815 let metadata = json!({
4816 "server": "arcade",
4817 "mcpAuth": {
4818 "required": true,
4819 "challengeId": "abc123",
4820 "authorizationUrl": "https://example.com/oauth",
4821 "message": "Authorize first",
4822 "pending": true,
4823 "blocked": true,
4824 "retryAfterMs": 8000
4825 }
4826 });
4827 let parsed = extract_mcp_auth_required_metadata(&metadata).expect("expected metadata");
4828 assert_eq!(parsed.challenge_id, "abc123");
4829 assert_eq!(parsed.authorization_url, "https://example.com/oauth");
4830 assert_eq!(parsed.message, "Authorize first");
4831 assert_eq!(parsed.server.as_deref(), Some("arcade"));
4832 assert!(parsed.pending);
4833 assert!(parsed.blocked);
4834 assert_eq!(parsed.retry_after_ms, Some(8000));
4835 }
4836
4837 #[test]
4838 fn auth_required_output_detector_matches_auth_text() {
4839 assert!(is_auth_required_tool_output(
4840 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com"
4841 ));
4842 assert!(is_auth_required_tool_output(
4843 "Authorization pending for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com\nRetry after 8s."
4844 ));
4845 assert!(!is_auth_required_tool_output("Tool `read` result: ok"));
4846 }
4847
4848 #[test]
4849 fn guard_budget_output_detector_matches_expected_text() {
4850 assert!(is_guard_budget_tool_output(
4851 "Tool `mcp.arcade.gmail_sendemail` call skipped: per-run guard budget exceeded (10)."
4852 ));
4853 assert!(!is_guard_budget_tool_output("Tool `read` result: ok"));
4854 }
4855
4856 #[test]
4857 fn summarize_guard_budget_outputs_returns_run_scoped_message() {
4858 let outputs = vec![
4859 "Tool `mcp.arcade.gmail_sendemail` call skipped: per-run guard budget exceeded (10)."
4860 .to_string(),
4861 "Tool `mcp.arcade.jira_getboards` call skipped: per-run guard budget exceeded (10)."
4862 .to_string(),
4863 ];
4864 let summary = summarize_guard_budget_outputs(&outputs).expect("expected summary");
4865 assert!(summary.contains("per-run tool guard budget"));
4866 assert!(summary.contains("fresh run"));
4867 }
4868
4869 #[test]
4870 fn summarize_auth_pending_outputs_returns_summary_when_all_are_auth_related() {
4871 let outputs = vec![
4872 "Authorization pending for `mcp.arcade.gmail_sendemail`.\nAuthorize here: https://example.com/a".to_string(),
4873 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com/b".to_string(),
4874 ];
4875 let summary = summarize_auth_pending_outputs(&outputs).expect("summary expected");
4876 assert!(summary.contains("Authorization is required before I can continue"));
4877 assert!(summary.contains("gmail_sendemail"));
4878 assert!(summary.contains("gmail_whoami"));
4879 }
4880
4881 #[test]
4882 fn summarize_auth_pending_outputs_returns_none_for_mixed_outputs() {
4883 let outputs = vec![
4884 "Authorization required for `mcp.arcade.gmail_whoami`.\nAuthorize here: https://example.com".to_string(),
4885 "Tool `read` result:\nok".to_string(),
4886 ];
4887 assert!(summarize_auth_pending_outputs(&outputs).is_none());
4888 }
4889}