1use chrono::Utc;
2use futures::future::BoxFuture;
3use futures::StreamExt;
4use serde_json::{json, Map, Number, Value};
5use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
6use std::hash::{Hash, Hasher};
7use std::path::{Path, PathBuf};
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12 EngineEvent, Message, MessagePart, MessagePartInput, MessageRole, SendMessageRequest,
13};
14use tandem_wire::WireMessagePart;
15use tokio_util::sync::CancellationToken;
16use tracing::Level;
17
18use crate::{
19 derive_session_title_from_prompt, title_needs_repair, AgentDefinition, AgentRegistry,
20 CancellationRegistry, EventBus, PermissionAction, PermissionManager, PluginRegistry, Storage,
21};
22use tokio::sync::RwLock;
23
24#[derive(Default)]
25struct StreamedToolCall {
26 name: String,
27 args: String,
28}
29
30#[derive(Debug, Clone)]
31pub struct SpawnAgentToolContext {
32 pub session_id: String,
33 pub message_id: String,
34 pub tool_call_id: Option<String>,
35 pub args: Value,
36}
37
38#[derive(Debug, Clone)]
39pub struct SpawnAgentToolResult {
40 pub output: String,
41 pub metadata: Value,
42}
43
44#[derive(Debug, Clone)]
45pub struct ToolPolicyContext {
46 pub session_id: String,
47 pub message_id: String,
48 pub tool: String,
49 pub args: Value,
50}
51
52#[derive(Debug, Clone)]
53pub struct ToolPolicyDecision {
54 pub allowed: bool,
55 pub reason: Option<String>,
56}
57
58pub trait SpawnAgentHook: Send + Sync {
59 fn spawn_agent(
60 &self,
61 ctx: SpawnAgentToolContext,
62 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>>;
63}
64
65pub trait ToolPolicyHook: Send + Sync {
66 fn evaluate_tool(
67 &self,
68 ctx: ToolPolicyContext,
69 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>>;
70}
71
72#[derive(Clone)]
73pub struct EngineLoop {
74 storage: std::sync::Arc<Storage>,
75 event_bus: EventBus,
76 providers: ProviderRegistry,
77 plugins: PluginRegistry,
78 agents: AgentRegistry,
79 permissions: PermissionManager,
80 tools: ToolRegistry,
81 cancellations: CancellationRegistry,
82 workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
83 spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
84 tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
85}
86
87impl EngineLoop {
88 #[allow(clippy::too_many_arguments)]
89 pub fn new(
90 storage: std::sync::Arc<Storage>,
91 event_bus: EventBus,
92 providers: ProviderRegistry,
93 plugins: PluginRegistry,
94 agents: AgentRegistry,
95 permissions: PermissionManager,
96 tools: ToolRegistry,
97 cancellations: CancellationRegistry,
98 ) -> Self {
99 Self {
100 storage,
101 event_bus,
102 providers,
103 plugins,
104 agents,
105 permissions,
106 tools,
107 cancellations,
108 workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
109 spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
110 tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
111 }
112 }
113
114 pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
115 *self.spawn_agent_hook.write().await = Some(hook);
116 }
117
118 pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
119 *self.tool_policy_hook.write().await = Some(hook);
120 }
121
122 pub async fn grant_workspace_override_for_session(
123 &self,
124 session_id: &str,
125 ttl_seconds: u64,
126 ) -> u64 {
127 let expires_at = chrono::Utc::now()
128 .timestamp_millis()
129 .max(0)
130 .saturating_add((ttl_seconds as i64).saturating_mul(1000))
131 as u64;
132 self.workspace_overrides
133 .write()
134 .await
135 .insert(session_id.to_string(), expires_at);
136 expires_at
137 }
138
139 pub async fn run_prompt_async(
140 &self,
141 session_id: String,
142 req: SendMessageRequest,
143 ) -> anyhow::Result<()> {
144 self.run_prompt_async_with_context(session_id, req, None)
145 .await
146 }
147
148 pub async fn run_prompt_async_with_context(
149 &self,
150 session_id: String,
151 req: SendMessageRequest,
152 correlation_id: Option<String>,
153 ) -> anyhow::Result<()> {
154 let session_provider = self
155 .storage
156 .get_session(&session_id)
157 .await
158 .and_then(|s| s.provider);
159 let provider_hint = req
160 .model
161 .as_ref()
162 .map(|m| m.provider_id.clone())
163 .or(session_provider);
164 let correlation_ref = correlation_id.as_deref();
165 let model_id = req.model.as_ref().map(|m| m.model_id.as_str());
166 let cancel = self.cancellations.create(&session_id).await;
167 emit_event(
168 Level::INFO,
169 ProcessKind::Engine,
170 ObservabilityEvent {
171 event: "provider.call.start",
172 component: "engine.loop",
173 correlation_id: correlation_ref,
174 session_id: Some(&session_id),
175 run_id: None,
176 message_id: None,
177 provider_id: provider_hint.as_deref(),
178 model_id,
179 status: Some("start"),
180 error_code: None,
181 detail: Some("run_prompt_async dispatch"),
182 },
183 );
184 self.event_bus.publish(EngineEvent::new(
185 "session.status",
186 json!({"sessionID": session_id, "status":"running"}),
187 ));
188 let text = req
189 .parts
190 .iter()
191 .map(|p| match p {
192 MessagePartInput::Text { text } => text.clone(),
193 MessagePartInput::File {
194 mime,
195 filename,
196 url,
197 } => format!(
198 "[file mime={} name={} url={}]",
199 mime,
200 filename.clone().unwrap_or_else(|| "unknown".to_string()),
201 url
202 ),
203 })
204 .collect::<Vec<_>>()
205 .join("\n");
206 self.auto_rename_session_from_user_text(&session_id, &text)
207 .await;
208 let active_agent = self.agents.get(req.agent.as_deref()).await;
209 let mut user_message_id = self
210 .find_recent_matching_user_message_id(&session_id, &text)
211 .await;
212 if user_message_id.is_none() {
213 let user_message = Message::new(
214 MessageRole::User,
215 vec![MessagePart::Text { text: text.clone() }],
216 );
217 let created_message_id = user_message.id.clone();
218 self.storage
219 .append_message(&session_id, user_message)
220 .await?;
221
222 let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
223 self.event_bus.publish(EngineEvent::new(
224 "message.part.updated",
225 json!({
226 "part": user_part,
227 "delta": text,
228 "agent": active_agent.name
229 }),
230 ));
231 user_message_id = Some(created_message_id);
232 }
233 let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
234
235 if cancel.is_cancelled() {
236 self.event_bus.publish(EngineEvent::new(
237 "session.status",
238 json!({"sessionID": session_id, "status":"cancelled"}),
239 ));
240 self.cancellations.remove(&session_id).await;
241 return Ok(());
242 }
243
244 let mut question_tool_used = false;
245 let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
246 if normalize_tool_name(&tool) == "question" {
247 question_tool_used = true;
248 }
249 if !agent_can_use_tool(&active_agent, &tool) {
250 format!(
251 "Tool `{tool}` is not enabled for agent `{}`.",
252 active_agent.name
253 )
254 } else {
255 self.execute_tool_with_permission(
256 &session_id,
257 &user_message_id,
258 tool.clone(),
259 args,
260 active_agent.skills.as_deref(),
261 &text,
262 None,
263 cancel.clone(),
264 )
265 .await?
266 .unwrap_or_default()
267 }
268 } else {
269 let mut completion = String::new();
270 let mut max_iterations = 25usize;
271 let mut followup_context: Option<String> = None;
272 let mut last_tool_outputs: Vec<String> = Vec::new();
273 let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
274 let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
275 let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
276 let mut websearch_query_blocked = false;
277 let mut auto_workspace_probe_attempted = false;
278
279 while max_iterations > 0 && !cancel.is_cancelled() {
280 max_iterations -= 1;
281 let mut messages = load_chat_history(self.storage.clone(), &session_id).await;
282 let mut system_parts = vec![tandem_runtime_system_prompt().to_string()];
283 if let Some(system) = active_agent.system_prompt.as_ref() {
284 system_parts.push(system.clone());
285 }
286 messages.insert(
287 0,
288 ChatMessage {
289 role: "system".to_string(),
290 content: system_parts.join("\n\n"),
291 },
292 );
293 if let Some(extra) = followup_context.take() {
294 messages.push(ChatMessage {
295 role: "user".to_string(),
296 content: extra,
297 });
298 }
299 let tool_schemas = self.tools.list().await;
300 if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
301 let detail = validation_err.to_string();
302 emit_event(
303 Level::ERROR,
304 ProcessKind::Engine,
305 ObservabilityEvent {
306 event: "provider.call.error",
307 component: "engine.loop",
308 correlation_id: correlation_ref,
309 session_id: Some(&session_id),
310 run_id: None,
311 message_id: Some(&user_message_id),
312 provider_id: provider_hint.as_deref(),
313 model_id,
314 status: Some("failed"),
315 error_code: Some("TOOL_SCHEMA_INVALID"),
316 detail: Some(&detail),
317 },
318 );
319 anyhow::bail!("{detail}");
320 }
321 let stream = self
322 .providers
323 .stream_for_provider(
324 provider_hint.as_deref(),
325 messages,
326 Some(tool_schemas),
327 cancel.clone(),
328 )
329 .await
330 .inspect_err(|err| {
331 let error_text = err.to_string();
332 let error_code = provider_error_code(&error_text);
333 let detail = truncate_text(&error_text, 500);
334 emit_event(
335 Level::ERROR,
336 ProcessKind::Engine,
337 ObservabilityEvent {
338 event: "provider.call.error",
339 component: "engine.loop",
340 correlation_id: correlation_ref,
341 session_id: Some(&session_id),
342 run_id: None,
343 message_id: Some(&user_message_id),
344 provider_id: provider_hint.as_deref(),
345 model_id,
346 status: Some("failed"),
347 error_code: Some(error_code),
348 detail: Some(&detail),
349 },
350 );
351 })?;
352 tokio::pin!(stream);
353 completion.clear();
354 let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
355 let mut provider_usage: Option<TokenUsage> = None;
356 while let Some(chunk) = stream.next().await {
357 let chunk = match chunk {
358 Ok(chunk) => chunk,
359 Err(err) => {
360 let error_text = err.to_string();
361 let error_code = provider_error_code(&error_text);
362 let detail = truncate_text(&error_text, 500);
363 emit_event(
364 Level::ERROR,
365 ProcessKind::Engine,
366 ObservabilityEvent {
367 event: "provider.call.error",
368 component: "engine.loop",
369 correlation_id: correlation_ref,
370 session_id: Some(&session_id),
371 run_id: None,
372 message_id: Some(&user_message_id),
373 provider_id: provider_hint.as_deref(),
374 model_id,
375 status: Some("failed"),
376 error_code: Some(error_code),
377 detail: Some(&detail),
378 },
379 );
380 return Err(anyhow::anyhow!(
381 "provider stream chunk error: {error_text}"
382 ));
383 }
384 };
385 match chunk {
386 StreamChunk::TextDelta(delta) => {
387 if completion.is_empty() {
388 emit_event(
389 Level::INFO,
390 ProcessKind::Engine,
391 ObservabilityEvent {
392 event: "provider.call.first_byte",
393 component: "engine.loop",
394 correlation_id: correlation_ref,
395 session_id: Some(&session_id),
396 run_id: None,
397 message_id: Some(&user_message_id),
398 provider_id: provider_hint.as_deref(),
399 model_id,
400 status: Some("streaming"),
401 error_code: None,
402 detail: Some("first text delta"),
403 },
404 );
405 }
406 completion.push_str(&delta);
407 let delta = truncate_text(&delta, 4_000);
408 let delta_part =
409 WireMessagePart::text(&session_id, &user_message_id, delta.clone());
410 self.event_bus.publish(EngineEvent::new(
411 "message.part.updated",
412 json!({"part": delta_part, "delta": delta}),
413 ));
414 }
415 StreamChunk::ReasoningDelta(_reasoning) => {}
416 StreamChunk::Done {
417 finish_reason: _,
418 usage,
419 } => {
420 if usage.is_some() {
421 provider_usage = usage;
422 }
423 break;
424 }
425 StreamChunk::ToolCallStart { id, name } => {
426 let entry = streamed_tool_calls.entry(id).or_default();
427 if entry.name.is_empty() {
428 entry.name = name;
429 }
430 }
431 StreamChunk::ToolCallDelta { id, args_delta } => {
432 let entry = streamed_tool_calls.entry(id).or_default();
433 entry.args.push_str(&args_delta);
434 }
435 StreamChunk::ToolCallEnd { id: _ } => {}
436 }
437 if cancel.is_cancelled() {
438 break;
439 }
440 }
441
442 let mut tool_calls = streamed_tool_calls
443 .into_values()
444 .filter_map(|call| {
445 if call.name.trim().is_empty() {
446 return None;
447 }
448 let tool_name = normalize_tool_name(&call.name);
449 let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
450 Some((tool_name, parsed_args))
451 })
452 .collect::<Vec<_>>();
453 if tool_calls.is_empty() {
454 tool_calls = parse_tool_invocations_from_response(&completion);
455 }
456 if tool_calls.is_empty()
457 && !auto_workspace_probe_attempted
458 && should_force_workspace_probe(&text, &completion)
459 {
460 auto_workspace_probe_attempted = true;
461 tool_calls = vec![("glob".to_string(), json!({ "pattern": "*" }))];
462 }
463 if !tool_calls.is_empty() {
464 let mut outputs = Vec::new();
465 for (tool, args) in tool_calls {
466 if !agent_can_use_tool(&active_agent, &tool) {
467 continue;
468 }
469 let tool_key = normalize_tool_name(&tool);
470 if tool_key == "question" {
471 question_tool_used = true;
472 }
473 if websearch_query_blocked && tool_key == "websearch" {
474 outputs.push(
475 "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
476 .to_string(),
477 );
478 continue;
479 }
480 let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
481 *entry += 1;
482 let budget = tool_budget_for(&tool_key);
483 if *entry > budget {
484 outputs.push(format!(
485 "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
486 tool_key, budget
487 ));
488 continue;
489 }
490 let mut effective_args = args.clone();
491 if tool_key == "todo_write" {
492 effective_args = normalize_todo_write_args(effective_args, &completion);
493 if is_empty_todo_write_args(&effective_args) {
494 outputs.push(
495 "Tool `todo_write` call skipped: empty todo payload."
496 .to_string(),
497 );
498 continue;
499 }
500 }
501 let signature = tool_signature(&tool_key, &args);
502 let mut signature_count = 1usize;
503 if is_read_only_tool(&tool_key) {
504 let count = readonly_signature_counts
505 .entry(signature.clone())
506 .and_modify(|v| *v = v.saturating_add(1))
507 .or_insert(1);
508 signature_count = *count;
509 if tool_key == "websearch" && *count > 2 {
510 self.event_bus.publish(EngineEvent::new(
511 "tool.loop_guard.triggered",
512 json!({
513 "sessionID": session_id,
514 "messageID": user_message_id,
515 "tool": tool_key,
516 "reason": "duplicate_signature_retry_exhausted",
517 "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
518 "loop_guard_triggered": true
519 }),
520 ));
521 outputs.push(
522 "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
523 .to_string(),
524 );
525 continue;
526 }
527 if tool_key != "websearch" && *count > 1 {
528 if let Some(cached) = readonly_tool_cache.get(&signature) {
529 outputs.push(cached.clone());
530 } else {
531 outputs.push(format!(
532 "Tool `{}` call skipped: duplicate call signature detected.",
533 tool_key
534 ));
535 }
536 continue;
537 }
538 }
539 if let Some(output) = self
540 .execute_tool_with_permission(
541 &session_id,
542 &user_message_id,
543 tool,
544 effective_args,
545 active_agent.skills.as_deref(),
546 &text,
547 Some(&completion),
548 cancel.clone(),
549 )
550 .await?
551 {
552 if output.contains("WEBSEARCH_QUERY_MISSING") {
553 websearch_query_blocked = true;
554 }
555 if is_read_only_tool(&tool_key)
556 && tool_key != "websearch"
557 && signature_count == 1
558 {
559 readonly_tool_cache.insert(signature, output.clone());
560 }
561 outputs.push(output);
562 }
563 }
564 if !outputs.is_empty() {
565 last_tool_outputs = outputs.clone();
566 followup_context = Some(format!(
567 "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
568 summarize_tool_outputs(&outputs)
569 ));
570 continue;
571 }
572 }
573
574 if let Some(usage) = provider_usage {
575 self.event_bus.publish(EngineEvent::new(
576 "provider.usage",
577 json!({
578 "sessionID": session_id,
579 "messageID": user_message_id,
580 "promptTokens": usage.prompt_tokens,
581 "completionTokens": usage.completion_tokens,
582 "totalTokens": usage.total_tokens,
583 }),
584 ));
585 }
586
587 break;
588 }
589 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
590 if let Some(narrative) = self
591 .generate_final_narrative_without_tools(
592 &session_id,
593 &active_agent,
594 provider_hint.as_deref(),
595 cancel.clone(),
596 &last_tool_outputs,
597 )
598 .await
599 {
600 completion = narrative;
601 }
602 }
603 if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
604 let preview = last_tool_outputs
605 .iter()
606 .take(3)
607 .map(|o| truncate_text(o, 240))
608 .collect::<Vec<_>>()
609 .join("\n");
610 completion = format!(
611 "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
612 preview
613 );
614 }
615 truncate_text(&completion, 16_000)
616 };
617 emit_event(
618 Level::INFO,
619 ProcessKind::Engine,
620 ObservabilityEvent {
621 event: "provider.call.finish",
622 component: "engine.loop",
623 correlation_id: correlation_ref,
624 session_id: Some(&session_id),
625 run_id: None,
626 message_id: Some(&user_message_id),
627 provider_id: provider_hint.as_deref(),
628 model_id,
629 status: Some("ok"),
630 error_code: None,
631 detail: Some("provider stream complete"),
632 },
633 );
634 if active_agent.name.eq_ignore_ascii_case("plan") {
635 emit_plan_todo_fallback(
636 self.storage.clone(),
637 &self.event_bus,
638 &session_id,
639 &user_message_id,
640 &completion,
641 )
642 .await;
643 let todos_after_fallback = self.storage.get_todos(&session_id).await;
644 if todos_after_fallback.is_empty() && !question_tool_used {
645 emit_plan_question_fallback(
646 self.storage.clone(),
647 &self.event_bus,
648 &session_id,
649 &user_message_id,
650 &completion,
651 )
652 .await;
653 }
654 }
655 if cancel.is_cancelled() {
656 self.event_bus.publish(EngineEvent::new(
657 "session.status",
658 json!({"sessionID": session_id, "status":"cancelled"}),
659 ));
660 self.cancellations.remove(&session_id).await;
661 return Ok(());
662 }
663 let assistant = Message::new(
664 MessageRole::Assistant,
665 vec![MessagePart::Text {
666 text: completion.clone(),
667 }],
668 );
669 let assistant_message_id = assistant.id.clone();
670 self.storage.append_message(&session_id, assistant).await?;
671 let final_part = WireMessagePart::text(
672 &session_id,
673 &assistant_message_id,
674 truncate_text(&completion, 16_000),
675 );
676 self.event_bus.publish(EngineEvent::new(
677 "message.part.updated",
678 json!({"part": final_part}),
679 ));
680 self.event_bus.publish(EngineEvent::new(
681 "session.updated",
682 json!({"sessionID": session_id, "status":"idle"}),
683 ));
684 self.event_bus.publish(EngineEvent::new(
685 "session.status",
686 json!({"sessionID": session_id, "status":"idle"}),
687 ));
688 self.cancellations.remove(&session_id).await;
689 Ok(())
690 }
691
692 pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
693 self.providers.default_complete(&prompt).await
694 }
695
696 pub async fn run_oneshot_for_provider(
697 &self,
698 prompt: String,
699 provider_id: Option<&str>,
700 ) -> anyhow::Result<String> {
701 self.providers
702 .complete_for_provider(provider_id, &prompt)
703 .await
704 }
705
706 #[allow(clippy::too_many_arguments)]
707 async fn execute_tool_with_permission(
708 &self,
709 session_id: &str,
710 message_id: &str,
711 tool: String,
712 args: Value,
713 equipped_skills: Option<&[String]>,
714 latest_user_text: &str,
715 latest_assistant_context: Option<&str>,
716 cancel: CancellationToken,
717 ) -> anyhow::Result<Option<String>> {
718 let tool = normalize_tool_name(&tool);
719 let normalized = normalize_tool_args(
720 &tool,
721 args,
722 latest_user_text,
723 latest_assistant_context.unwrap_or_default(),
724 );
725 self.event_bus.publish(EngineEvent::new(
726 "tool.args.normalized",
727 json!({
728 "sessionID": session_id,
729 "messageID": message_id,
730 "tool": tool,
731 "argsSource": normalized.args_source,
732 "argsIntegrity": normalized.args_integrity,
733 "query": normalized.query,
734 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
735 "requestID": Value::Null
736 }),
737 ));
738 if normalized.args_integrity == "recovered" {
739 self.event_bus.publish(EngineEvent::new(
740 "tool.args.recovered",
741 json!({
742 "sessionID": session_id,
743 "messageID": message_id,
744 "tool": tool,
745 "argsSource": normalized.args_source,
746 "query": normalized.query,
747 "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
748 "requestID": Value::Null
749 }),
750 ));
751 }
752 if normalized.missing_terminal {
753 self.event_bus.publish(EngineEvent::new(
754 "tool.args.missing_terminal",
755 json!({
756 "sessionID": session_id,
757 "messageID": message_id,
758 "tool": tool,
759 "argsSource": normalized.args_source,
760 "argsIntegrity": normalized.args_integrity,
761 "requestID": Value::Null,
762 "error": "WEBSEARCH_QUERY_MISSING"
763 }),
764 ));
765 let mut failed_part =
766 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
767 failed_part.state = Some("failed".to_string());
768 failed_part.error = Some("WEBSEARCH_QUERY_MISSING".to_string());
769 self.event_bus.publish(EngineEvent::new(
770 "message.part.updated",
771 json!({"part": failed_part}),
772 ));
773 return Ok(Some("WEBSEARCH_QUERY_MISSING".to_string()));
774 }
775
776 let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
777 Ok(args) => args,
778 Err(message) => return Ok(Some(message)),
779 };
780 if let Some(hook) = self.tool_policy_hook.read().await.clone() {
781 let decision = hook
782 .evaluate_tool(ToolPolicyContext {
783 session_id: session_id.to_string(),
784 message_id: message_id.to_string(),
785 tool: tool.clone(),
786 args: args.clone(),
787 })
788 .await?;
789 if !decision.allowed {
790 let reason = decision
791 .reason
792 .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
793 let mut blocked_part =
794 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
795 blocked_part.state = Some("failed".to_string());
796 blocked_part.error = Some(reason.clone());
797 self.event_bus.publish(EngineEvent::new(
798 "message.part.updated",
799 json!({"part": blocked_part}),
800 ));
801 return Ok(Some(reason));
802 }
803 }
804 let mut tool_call_id: Option<String> = None;
805 if let Some(violation) = self
806 .workspace_sandbox_violation(session_id, &tool, &args)
807 .await
808 {
809 let mut blocked_part =
810 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
811 blocked_part.state = Some("failed".to_string());
812 blocked_part.error = Some(violation.clone());
813 self.event_bus.publish(EngineEvent::new(
814 "message.part.updated",
815 json!({"part": blocked_part}),
816 ));
817 return Ok(Some(violation));
818 }
819 let rule = self
820 .plugins
821 .permission_override(&tool)
822 .await
823 .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
824 if matches!(rule, PermissionAction::Deny) {
825 return Ok(Some(format!(
826 "Permission denied for tool `{tool}` by policy."
827 )));
828 }
829
830 let mut effective_args = args.clone();
831 if matches!(rule, PermissionAction::Ask) {
832 let pending = self
833 .permissions
834 .ask_for_session_with_context(
835 Some(session_id),
836 &tool,
837 args.clone(),
838 Some(crate::PermissionArgsContext {
839 args_source: normalized.args_source.clone(),
840 args_integrity: normalized.args_integrity.clone(),
841 query: normalized.query.clone(),
842 }),
843 )
844 .await;
845 let mut pending_part = WireMessagePart::tool_invocation(
846 session_id,
847 message_id,
848 tool.clone(),
849 args.clone(),
850 );
851 pending_part.id = Some(pending.id.clone());
852 tool_call_id = Some(pending.id.clone());
853 pending_part.state = Some("pending".to_string());
854 self.event_bus.publish(EngineEvent::new(
855 "message.part.updated",
856 json!({"part": pending_part}),
857 ));
858 let reply = self
859 .permissions
860 .wait_for_reply(&pending.id, cancel.clone())
861 .await;
862 if cancel.is_cancelled() {
863 return Ok(None);
864 }
865 let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
866 if !approved {
867 let mut denied_part =
868 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
869 denied_part.id = Some(pending.id);
870 denied_part.state = Some("denied".to_string());
871 denied_part.error = Some("Permission denied by user".to_string());
872 self.event_bus.publish(EngineEvent::new(
873 "message.part.updated",
874 json!({"part": denied_part}),
875 ));
876 return Ok(Some(format!(
877 "Permission denied for tool `{tool}` by user."
878 )));
879 }
880 effective_args = args;
881 }
882
883 let args = self.plugins.inject_tool_args(&tool, effective_args).await;
884 let mut invoke_part =
885 WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
886 if let Some(call_id) = tool_call_id.clone() {
887 invoke_part.id = Some(call_id);
888 }
889 let invoke_part_id = invoke_part.id.clone();
890 self.event_bus.publish(EngineEvent::new(
891 "message.part.updated",
892 json!({"part": invoke_part}),
893 ));
894 let args_for_side_events = args.clone();
895 if tool == "spawn_agent" {
896 let hook = self.spawn_agent_hook.read().await.clone();
897 if let Some(hook) = hook {
898 let spawned = hook
899 .spawn_agent(SpawnAgentToolContext {
900 session_id: session_id.to_string(),
901 message_id: message_id.to_string(),
902 tool_call_id: invoke_part_id.clone(),
903 args: args_for_side_events.clone(),
904 })
905 .await?;
906 let output = self.plugins.transform_tool_output(spawned.output).await;
907 let output = truncate_text(&output, 16_000);
908 emit_tool_side_events(
909 self.storage.clone(),
910 &self.event_bus,
911 session_id,
912 message_id,
913 &tool,
914 &args_for_side_events,
915 &spawned.metadata,
916 )
917 .await;
918 let mut result_part = WireMessagePart::tool_result(
919 session_id,
920 message_id,
921 tool.clone(),
922 json!(output.clone()),
923 );
924 result_part.id = invoke_part_id;
925 self.event_bus.publish(EngineEvent::new(
926 "message.part.updated",
927 json!({"part": result_part}),
928 ));
929 return Ok(Some(truncate_text(
930 &format!("Tool `{tool}` result:\n{output}"),
931 16_000,
932 )));
933 }
934 let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
935 let mut failed_part =
936 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
937 failed_part.id = invoke_part_id.clone();
938 failed_part.state = Some("failed".to_string());
939 failed_part.error = Some(output.to_string());
940 self.event_bus.publish(EngineEvent::new(
941 "message.part.updated",
942 json!({"part": failed_part}),
943 ));
944 return Ok(Some(output.to_string()));
945 }
946 let result = match self
947 .tools
948 .execute_with_cancel(&tool, args, cancel.clone())
949 .await
950 {
951 Ok(result) => result,
952 Err(err) => {
953 let mut failed_part =
954 WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
955 failed_part.id = invoke_part_id.clone();
956 failed_part.state = Some("failed".to_string());
957 failed_part.error = Some(err.to_string());
958 self.event_bus.publish(EngineEvent::new(
959 "message.part.updated",
960 json!({"part": failed_part}),
961 ));
962 return Err(err);
963 }
964 };
965 emit_tool_side_events(
966 self.storage.clone(),
967 &self.event_bus,
968 session_id,
969 message_id,
970 &tool,
971 &args_for_side_events,
972 &result.metadata,
973 )
974 .await;
975 let output = self.plugins.transform_tool_output(result.output).await;
976 let output = truncate_text(&output, 16_000);
977 let mut result_part = WireMessagePart::tool_result(
978 session_id,
979 message_id,
980 tool.clone(),
981 json!(output.clone()),
982 );
983 result_part.id = invoke_part_id;
984 self.event_bus.publish(EngineEvent::new(
985 "message.part.updated",
986 json!({"part": result_part}),
987 ));
988 Ok(Some(truncate_text(
989 &format!("Tool `{tool}` result:\n{output}"),
990 16_000,
991 )))
992 }
993
994 async fn find_recent_matching_user_message_id(
995 &self,
996 session_id: &str,
997 text: &str,
998 ) -> Option<String> {
999 let session = self.storage.get_session(session_id).await?;
1000 let last = session.messages.last()?;
1001 if !matches!(last.role, MessageRole::User) {
1002 return None;
1003 }
1004 let age_ms = (Utc::now() - last.created_at).num_milliseconds().max(0) as u64;
1005 if age_ms > 10_000 {
1006 return None;
1007 }
1008 let last_text = last
1009 .parts
1010 .iter()
1011 .filter_map(|part| match part {
1012 MessagePart::Text { text } => Some(text.clone()),
1013 _ => None,
1014 })
1015 .collect::<Vec<_>>()
1016 .join("\n");
1017 if last_text == text {
1018 return Some(last.id.clone());
1019 }
1020 None
1021 }
1022
1023 async fn auto_rename_session_from_user_text(&self, session_id: &str, fallback_text: &str) {
1024 let Some(mut session) = self.storage.get_session(session_id).await else {
1025 return;
1026 };
1027 if !title_needs_repair(&session.title) {
1028 return;
1029 }
1030
1031 let first_user_text = session.messages.iter().find_map(|message| {
1032 if !matches!(message.role, MessageRole::User) {
1033 return None;
1034 }
1035 message.parts.iter().find_map(|part| match part {
1036 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.clone()),
1037 _ => None,
1038 })
1039 });
1040
1041 let source = first_user_text.unwrap_or_else(|| fallback_text.to_string());
1042 let Some(title) = derive_session_title_from_prompt(&source, 60) else {
1043 return;
1044 };
1045
1046 session.title = title;
1047 session.time.updated = Utc::now();
1048 let _ = self.storage.save_session(session).await;
1049 }
1050
1051 async fn workspace_sandbox_violation(
1052 &self,
1053 session_id: &str,
1054 tool: &str,
1055 args: &Value,
1056 ) -> Option<String> {
1057 if self.workspace_override_active(session_id).await {
1058 return None;
1059 }
1060 let session = self.storage.get_session(session_id).await?;
1061 let workspace = session
1062 .workspace_root
1063 .or_else(|| crate::normalize_workspace_path(&session.directory))?;
1064 let workspace_path = PathBuf::from(&workspace);
1065 let candidate_paths = extract_tool_candidate_paths(tool, args);
1066 if candidate_paths.is_empty() {
1067 return None;
1068 }
1069 let outside = candidate_paths
1070 .iter()
1071 .find(|path| !crate::is_within_workspace_root(Path::new(path), &workspace_path))?;
1072 Some(format!(
1073 "Sandbox blocked `{tool}` path `{outside}` (workspace root: `{workspace}`)"
1074 ))
1075 }
1076
1077 async fn workspace_override_active(&self, session_id: &str) -> bool {
1078 let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
1079 let mut overrides = self.workspace_overrides.write().await;
1080 overrides.retain(|_, expires_at| *expires_at > now);
1081 overrides
1082 .get(session_id)
1083 .map(|expires_at| *expires_at > now)
1084 .unwrap_or(false)
1085 }
1086
1087 async fn generate_final_narrative_without_tools(
1088 &self,
1089 session_id: &str,
1090 active_agent: &AgentDefinition,
1091 provider_hint: Option<&str>,
1092 cancel: CancellationToken,
1093 tool_outputs: &[String],
1094 ) -> Option<String> {
1095 if cancel.is_cancelled() {
1096 return None;
1097 }
1098 let mut messages = load_chat_history(self.storage.clone(), session_id).await;
1099 let mut system_parts = vec![tandem_runtime_system_prompt().to_string()];
1100 if let Some(system) = active_agent.system_prompt.as_ref() {
1101 system_parts.push(system.clone());
1102 }
1103 messages.insert(
1104 0,
1105 ChatMessage {
1106 role: "system".to_string(),
1107 content: system_parts.join("\n\n"),
1108 },
1109 );
1110 messages.push(ChatMessage {
1111 role: "user".to_string(),
1112 content: format!(
1113 "Tool observations:\n{}\n\nProvide a direct final answer now. Do not call tools.",
1114 summarize_tool_outputs(tool_outputs)
1115 ),
1116 });
1117 let stream = self
1118 .providers
1119 .stream_for_provider(provider_hint, messages, None, cancel.clone())
1120 .await
1121 .ok()?;
1122 tokio::pin!(stream);
1123 let mut completion = String::new();
1124 while let Some(chunk) = stream.next().await {
1125 if cancel.is_cancelled() {
1126 return None;
1127 }
1128 match chunk {
1129 Ok(StreamChunk::TextDelta(delta)) => completion.push_str(&delta),
1130 Ok(StreamChunk::Done { .. }) => break,
1131 Ok(_) => {}
1132 Err(_) => return None,
1133 }
1134 }
1135 let completion = truncate_text(&completion, 16_000);
1136 if completion.trim().is_empty() {
1137 None
1138 } else {
1139 Some(completion)
1140 }
1141 }
1142}
1143
1144fn truncate_text(input: &str, max_len: usize) -> String {
1145 if input.len() <= max_len {
1146 return input.to_string();
1147 }
1148 let mut out = input[..max_len].to_string();
1149 out.push_str("...<truncated>");
1150 out
1151}
1152
1153fn provider_error_code(error_text: &str) -> &'static str {
1154 let lower = error_text.to_lowercase();
1155 if lower.contains("invalid_function_parameters")
1156 || lower.contains("array schema missing items")
1157 || lower.contains("tool schema")
1158 {
1159 return "TOOL_SCHEMA_INVALID";
1160 }
1161 if lower.contains("rate limit") || lower.contains("too many requests") || lower.contains("429")
1162 {
1163 return "RATE_LIMIT_EXCEEDED";
1164 }
1165 if lower.contains("context length")
1166 || lower.contains("max tokens")
1167 || lower.contains("token limit")
1168 {
1169 return "CONTEXT_LENGTH_EXCEEDED";
1170 }
1171 if lower.contains("unauthorized")
1172 || lower.contains("authentication")
1173 || lower.contains("401")
1174 || lower.contains("403")
1175 {
1176 return "AUTHENTICATION_ERROR";
1177 }
1178 if lower.contains("timeout") || lower.contains("timed out") {
1179 return "TIMEOUT";
1180 }
1181 if lower.contains("server error")
1182 || lower.contains("500")
1183 || lower.contains("502")
1184 || lower.contains("503")
1185 || lower.contains("504")
1186 {
1187 return "PROVIDER_SERVER_ERROR";
1188 }
1189 "PROVIDER_REQUEST_FAILED"
1190}
1191
1192fn normalize_tool_name(name: &str) -> String {
1193 match name.trim().to_lowercase().replace('-', "_").as_str() {
1194 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1195 other => other.to_string(),
1196 }
1197}
1198
1199fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1200 let Some(obj) = args.as_object() else {
1201 return Vec::new();
1202 };
1203 let keys: &[&str] = match tool {
1204 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1205 "glob" => &["pattern"],
1206 "lsp" => &["filePath", "path"],
1207 "bash" => &["cwd"],
1208 "apply_patch" => &[],
1209 _ => &["path", "cwd"],
1210 };
1211 keys.iter()
1212 .filter_map(|key| obj.get(*key))
1213 .filter_map(|value| value.as_str())
1214 .filter(|s| !s.trim().is_empty())
1215 .map(ToString::to_string)
1216 .collect()
1217}
1218
1219fn agent_can_use_tool(agent: &AgentDefinition, tool_name: &str) -> bool {
1220 let target = normalize_tool_name(tool_name);
1221 match agent.tools.as_ref() {
1222 None => true,
1223 Some(list) => list.iter().any(|t| normalize_tool_name(t) == target),
1224 }
1225}
1226
1227fn enforce_skill_scope(
1228 tool_name: &str,
1229 args: Value,
1230 equipped_skills: Option<&[String]>,
1231) -> Result<Value, String> {
1232 if normalize_tool_name(tool_name) != "skill" {
1233 return Ok(args);
1234 }
1235 let Some(configured) = equipped_skills else {
1236 return Ok(args);
1237 };
1238
1239 let mut allowed = configured
1240 .iter()
1241 .map(|s| s.trim().to_string())
1242 .filter(|s| !s.is_empty())
1243 .collect::<Vec<_>>();
1244 if allowed
1245 .iter()
1246 .any(|s| s == "*" || s.eq_ignore_ascii_case("all"))
1247 {
1248 return Ok(args);
1249 }
1250 allowed.sort();
1251 allowed.dedup();
1252 if allowed.is_empty() {
1253 return Err("No skills are equipped for this agent.".to_string());
1254 }
1255
1256 let requested = args
1257 .get("name")
1258 .and_then(|v| v.as_str())
1259 .map(|v| v.trim().to_string())
1260 .unwrap_or_default();
1261 if !requested.is_empty() && !allowed.iter().any(|s| s == &requested) {
1262 return Err(format!(
1263 "Skill '{}' is not equipped for this agent. Equipped skills: {}",
1264 requested,
1265 allowed.join(", ")
1266 ));
1267 }
1268
1269 let mut out = if let Some(obj) = args.as_object() {
1270 Value::Object(obj.clone())
1271 } else {
1272 json!({})
1273 };
1274 if let Some(obj) = out.as_object_mut() {
1275 obj.insert("allowed_skills".to_string(), json!(allowed));
1276 }
1277 Ok(out)
1278}
1279
1280fn is_read_only_tool(tool_name: &str) -> bool {
1281 matches!(
1282 normalize_tool_name(tool_name).as_str(),
1283 "glob"
1284 | "read"
1285 | "grep"
1286 | "search"
1287 | "codesearch"
1288 | "list"
1289 | "ls"
1290 | "lsp"
1291 | "websearch"
1292 | "webfetch_document"
1293 )
1294}
1295
1296fn tool_budget_for(tool_name: &str) -> usize {
1297 match normalize_tool_name(tool_name).as_str() {
1298 "glob" => 4,
1299 "read" => 8,
1300 "websearch" => 3,
1301 "grep" | "search" | "codesearch" => 6,
1302 _ => 10,
1303 }
1304}
1305
1306#[derive(Debug, Clone)]
1307struct NormalizedToolArgs {
1308 args: Value,
1309 args_source: String,
1310 args_integrity: String,
1311 query: Option<String>,
1312 missing_terminal: bool,
1313}
1314
1315fn normalize_tool_args(
1316 tool_name: &str,
1317 raw_args: Value,
1318 latest_user_text: &str,
1319 latest_assistant_context: &str,
1320) -> NormalizedToolArgs {
1321 let normalized_tool = normalize_tool_name(tool_name);
1322 let mut args = raw_args;
1323 let mut args_source = if args.is_string() {
1324 "provider_string".to_string()
1325 } else {
1326 "provider_json".to_string()
1327 };
1328 let mut args_integrity = "ok".to_string();
1329 let mut query = None;
1330 let mut missing_terminal = false;
1331
1332 if normalized_tool == "websearch" {
1333 if let Some(found) = extract_websearch_query(&args) {
1334 query = Some(found);
1335 args = set_websearch_query_and_source(args, query.clone(), "tool_args");
1336 } else if let Some(inferred) = infer_websearch_query_from_text(latest_user_text) {
1337 args_source = "inferred_from_user".to_string();
1338 args_integrity = "recovered".to_string();
1339 query = Some(inferred);
1340 args = set_websearch_query_and_source(args, query.clone(), "inferred_from_user");
1341 } else if let Some(recovered) = infer_websearch_query_from_text(latest_assistant_context) {
1342 args_source = "recovered_from_context".to_string();
1343 args_integrity = "recovered".to_string();
1344 query = Some(recovered);
1345 args = set_websearch_query_and_source(args, query.clone(), "recovered_from_context");
1346 } else {
1347 args_source = "missing".to_string();
1348 args_integrity = "empty".to_string();
1349 missing_terminal = true;
1350 }
1351 }
1352
1353 NormalizedToolArgs {
1354 args,
1355 args_source,
1356 args_integrity,
1357 query,
1358 missing_terminal,
1359 }
1360}
1361
1362fn set_websearch_query_and_source(args: Value, query: Option<String>, query_source: &str) -> Value {
1363 let mut obj = args.as_object().cloned().unwrap_or_default();
1364 if let Some(q) = query {
1365 obj.insert("query".to_string(), Value::String(q));
1366 }
1367 obj.insert(
1368 "__query_source".to_string(),
1369 Value::String(query_source.to_string()),
1370 );
1371 Value::Object(obj)
1372}
1373
1374fn extract_websearch_query(args: &Value) -> Option<String> {
1375 const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
1376 for key in QUERY_KEYS {
1377 if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
1378 let trimmed = value.trim();
1379 if !trimmed.is_empty() {
1380 return Some(trimmed.to_string());
1381 }
1382 }
1383 }
1384 for container in ["arguments", "args", "input", "params"] {
1385 if let Some(obj) = args.get(container) {
1386 for key in QUERY_KEYS {
1387 if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
1388 let trimmed = value.trim();
1389 if !trimmed.is_empty() {
1390 return Some(trimmed.to_string());
1391 }
1392 }
1393 }
1394 }
1395 }
1396 args.as_str()
1397 .map(str::trim)
1398 .filter(|s| !s.is_empty())
1399 .map(ToString::to_string)
1400}
1401
1402fn infer_websearch_query_from_text(text: &str) -> Option<String> {
1403 let trimmed = text.trim();
1404 if trimmed.is_empty() {
1405 return None;
1406 }
1407
1408 let lower = trimmed.to_lowercase();
1409 const PREFIXES: [&str; 11] = [
1410 "web search",
1411 "websearch",
1412 "search web for",
1413 "search web",
1414 "search for",
1415 "search",
1416 "look up",
1417 "lookup",
1418 "find",
1419 "web lookup",
1420 "query",
1421 ];
1422
1423 let mut candidate = trimmed;
1424 for prefix in PREFIXES {
1425 if lower.starts_with(prefix) && lower.len() >= prefix.len() {
1426 let remainder = trimmed[prefix.len()..]
1427 .trim_start_matches(|c: char| c.is_whitespace() || c == ':' || c == '-');
1428 candidate = remainder;
1429 break;
1430 }
1431 }
1432
1433 let normalized = candidate
1434 .trim()
1435 .trim_matches(|c: char| c == '"' || c == '\'' || c.is_whitespace())
1436 .trim_matches(|c: char| matches!(c, '.' | ',' | '!' | '?'))
1437 .trim()
1438 .to_string();
1439
1440 if normalized.split_whitespace().count() < 2 {
1441 return None;
1442 }
1443 Some(normalized)
1444}
1445
1446fn tool_signature(tool_name: &str, args: &Value) -> String {
1447 let normalized = normalize_tool_name(tool_name);
1448 if normalized == "websearch" {
1449 let query = extract_websearch_query(args)
1450 .unwrap_or_default()
1451 .to_lowercase();
1452 let limit = args
1453 .get("limit")
1454 .or_else(|| args.get("numResults"))
1455 .or_else(|| args.get("num_results"))
1456 .and_then(|v| v.as_u64())
1457 .unwrap_or(8);
1458 let domains = args
1459 .get("domains")
1460 .or_else(|| args.get("domain"))
1461 .map(|v| v.to_string())
1462 .unwrap_or_default();
1463 let recency = args.get("recency").and_then(|v| v.as_u64()).unwrap_or(0);
1464 return format!("websearch:q={query}|limit={limit}|domains={domains}|recency={recency}");
1465 }
1466 format!("{}:{}", normalized, args)
1467}
1468
1469fn stable_hash(input: &str) -> String {
1470 let mut hasher = DefaultHasher::new();
1471 input.hash(&mut hasher);
1472 format!("{:016x}", hasher.finish())
1473}
1474
1475fn summarize_tool_outputs(outputs: &[String]) -> String {
1476 outputs
1477 .iter()
1478 .take(6)
1479 .map(|output| truncate_text(output, 600))
1480 .collect::<Vec<_>>()
1481 .join("\n\n")
1482}
1483
1484fn tandem_runtime_system_prompt() -> &'static str {
1485 "You are operating inside Tandem (Desktop/TUI) as an engine-backed coding assistant.
1486Use tool calls to inspect and modify the workspace when needed instead of asking the user
1487to manually run basic discovery steps. Permission prompts may occur for some tools; if
1488a tool is denied or blocked, explain what was blocked and suggest a concrete next step."
1489}
1490
1491fn should_force_workspace_probe(user_text: &str, completion: &str) -> bool {
1492 let user = user_text.to_lowercase();
1493 let reply = completion.to_lowercase();
1494
1495 let asked_for_project_context = [
1496 "what is this project",
1497 "what's this project",
1498 "explain this project",
1499 "analyze this project",
1500 "inspect this project",
1501 "look at the project",
1502 "use glob",
1503 "run glob",
1504 ]
1505 .iter()
1506 .any(|needle| user.contains(needle));
1507
1508 if !asked_for_project_context {
1509 return false;
1510 }
1511
1512 let assistant_claimed_no_access = [
1513 "can't inspect",
1514 "cannot inspect",
1515 "don't have visibility",
1516 "haven't been able to inspect",
1517 "i don't know what this project is",
1518 "need your help to",
1519 "sandbox",
1520 "system restriction",
1521 ]
1522 .iter()
1523 .any(|needle| reply.contains(needle));
1524
1525 asked_for_project_context && assistant_claimed_no_access
1528}
1529
1530fn parse_tool_invocation(input: &str) -> Option<(String, serde_json::Value)> {
1531 let raw = input.trim();
1532 if !raw.starts_with("/tool ") {
1533 return None;
1534 }
1535 let rest = raw.trim_start_matches("/tool ").trim();
1536 let mut split = rest.splitn(2, ' ');
1537 let tool = normalize_tool_name(split.next()?.trim());
1538 let args = split
1539 .next()
1540 .and_then(|v| serde_json::from_str::<serde_json::Value>(v).ok())
1541 .unwrap_or_else(|| json!({}));
1542 Some((tool, args))
1543}
1544
1545fn parse_tool_invocations_from_response(input: &str) -> Vec<(String, serde_json::Value)> {
1546 let trimmed = input.trim();
1547 if trimmed.is_empty() {
1548 return Vec::new();
1549 }
1550
1551 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
1552 if let Some(found) = extract_tool_call_from_value(&parsed) {
1553 return vec![found];
1554 }
1555 }
1556
1557 if let Some(block) = extract_first_json_object(trimmed) {
1558 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&block) {
1559 if let Some(found) = extract_tool_call_from_value(&parsed) {
1560 return vec![found];
1561 }
1562 }
1563 }
1564
1565 parse_function_style_tool_calls(trimmed)
1566}
1567
1568#[cfg(test)]
1569fn parse_tool_invocation_from_response(input: &str) -> Option<(String, serde_json::Value)> {
1570 parse_tool_invocations_from_response(input)
1571 .into_iter()
1572 .next()
1573}
1574
1575fn parse_function_style_tool_calls(input: &str) -> Vec<(String, Value)> {
1576 let mut calls = Vec::new();
1577 let lower = input.to_lowercase();
1578 let names = [
1579 "todo_write",
1580 "todowrite",
1581 "update_todo_list",
1582 "update_todos",
1583 ];
1584 let mut cursor = 0usize;
1585
1586 while cursor < lower.len() {
1587 let mut best: Option<(usize, &str)> = None;
1588 for name in names {
1589 let needle = format!("{name}(");
1590 if let Some(rel_idx) = lower[cursor..].find(&needle) {
1591 let idx = cursor + rel_idx;
1592 if best.as_ref().is_none_or(|(best_idx, _)| idx < *best_idx) {
1593 best = Some((idx, name));
1594 }
1595 }
1596 }
1597
1598 let Some((tool_start, tool_name)) = best else {
1599 break;
1600 };
1601
1602 let open_paren = tool_start + tool_name.len();
1603 if let Some(close_paren) = find_matching_paren(input, open_paren) {
1604 if let Some(args_text) = input.get(open_paren + 1..close_paren) {
1605 let args = parse_function_style_args(args_text.trim());
1606 calls.push((normalize_tool_name(tool_name), Value::Object(args)));
1607 }
1608 cursor = close_paren.saturating_add(1);
1609 } else {
1610 cursor = tool_start.saturating_add(tool_name.len());
1611 }
1612 }
1613
1614 calls
1615}
1616
1617fn find_matching_paren(input: &str, open_paren: usize) -> Option<usize> {
1618 if input.as_bytes().get(open_paren).copied()? != b'(' {
1619 return None;
1620 }
1621
1622 let mut depth = 0usize;
1623 let mut in_single = false;
1624 let mut in_double = false;
1625 let mut escaped = false;
1626
1627 for (offset, ch) in input.get(open_paren..)?.char_indices() {
1628 if escaped {
1629 escaped = false;
1630 continue;
1631 }
1632 if ch == '\\' && (in_single || in_double) {
1633 escaped = true;
1634 continue;
1635 }
1636 if ch == '\'' && !in_double {
1637 in_single = !in_single;
1638 continue;
1639 }
1640 if ch == '"' && !in_single {
1641 in_double = !in_double;
1642 continue;
1643 }
1644 if in_single || in_double {
1645 continue;
1646 }
1647
1648 match ch {
1649 '(' => depth += 1,
1650 ')' => {
1651 depth = depth.saturating_sub(1);
1652 if depth == 0 {
1653 return Some(open_paren + offset);
1654 }
1655 }
1656 _ => {}
1657 }
1658 }
1659
1660 None
1661}
1662
1663fn parse_function_style_args(input: &str) -> Map<String, Value> {
1664 let mut args = Map::new();
1665 if input.trim().is_empty() {
1666 return args;
1667 }
1668
1669 let mut parts = Vec::<String>::new();
1670 let mut current = String::new();
1671 let mut in_single = false;
1672 let mut in_double = false;
1673 let mut escaped = false;
1674 let mut depth_paren = 0usize;
1675 let mut depth_bracket = 0usize;
1676 let mut depth_brace = 0usize;
1677
1678 for ch in input.chars() {
1679 if escaped {
1680 current.push(ch);
1681 escaped = false;
1682 continue;
1683 }
1684 if ch == '\\' && (in_single || in_double) {
1685 current.push(ch);
1686 escaped = true;
1687 continue;
1688 }
1689 if ch == '\'' && !in_double {
1690 in_single = !in_single;
1691 current.push(ch);
1692 continue;
1693 }
1694 if ch == '"' && !in_single {
1695 in_double = !in_double;
1696 current.push(ch);
1697 continue;
1698 }
1699 if in_single || in_double {
1700 current.push(ch);
1701 continue;
1702 }
1703
1704 match ch {
1705 '(' => depth_paren += 1,
1706 ')' => depth_paren = depth_paren.saturating_sub(1),
1707 '[' => depth_bracket += 1,
1708 ']' => depth_bracket = depth_bracket.saturating_sub(1),
1709 '{' => depth_brace += 1,
1710 '}' => depth_brace = depth_brace.saturating_sub(1),
1711 ',' if depth_paren == 0 && depth_bracket == 0 && depth_brace == 0 => {
1712 let part = current.trim();
1713 if !part.is_empty() {
1714 parts.push(part.to_string());
1715 }
1716 current.clear();
1717 continue;
1718 }
1719 _ => {}
1720 }
1721 current.push(ch);
1722 }
1723 let tail = current.trim();
1724 if !tail.is_empty() {
1725 parts.push(tail.to_string());
1726 }
1727
1728 for part in parts {
1729 let Some((raw_key, raw_value)) = part
1730 .split_once('=')
1731 .or_else(|| part.split_once(':'))
1732 .map(|(k, v)| (k.trim(), v.trim()))
1733 else {
1734 continue;
1735 };
1736 let key = raw_key.trim_matches(|c| c == '"' || c == '\'' || c == '`');
1737 if key.is_empty() {
1738 continue;
1739 }
1740 let value = parse_scalar_like_value(raw_value);
1741 args.insert(key.to_string(), value);
1742 }
1743
1744 args
1745}
1746
1747fn parse_scalar_like_value(raw: &str) -> Value {
1748 let trimmed = raw.trim();
1749 if trimmed.is_empty() {
1750 return Value::Null;
1751 }
1752
1753 if (trimmed.starts_with('"') && trimmed.ends_with('"'))
1754 || (trimmed.starts_with('\'') && trimmed.ends_with('\''))
1755 {
1756 return Value::String(trimmed[1..trimmed.len().saturating_sub(1)].to_string());
1757 }
1758
1759 if trimmed.eq_ignore_ascii_case("true") {
1760 return Value::Bool(true);
1761 }
1762 if trimmed.eq_ignore_ascii_case("false") {
1763 return Value::Bool(false);
1764 }
1765 if trimmed.eq_ignore_ascii_case("null") {
1766 return Value::Null;
1767 }
1768
1769 if let Ok(v) = serde_json::from_str::<Value>(trimmed) {
1770 return v;
1771 }
1772 if let Ok(v) = trimmed.parse::<i64>() {
1773 return Value::Number(Number::from(v));
1774 }
1775 if let Ok(v) = trimmed.parse::<f64>() {
1776 if let Some(n) = Number::from_f64(v) {
1777 return Value::Number(n);
1778 }
1779 }
1780
1781 Value::String(trimmed.to_string())
1782}
1783
1784fn normalize_todo_write_args(args: Value, completion: &str) -> Value {
1785 if is_todo_status_update_args(&args) {
1786 return args;
1787 }
1788
1789 let mut obj = match args {
1790 Value::Object(map) => map,
1791 Value::Array(items) => {
1792 return json!({ "todos": normalize_todo_arg_items(items) });
1793 }
1794 Value::String(text) => {
1795 let derived = extract_todo_candidates_from_text(&text);
1796 if !derived.is_empty() {
1797 return json!({ "todos": derived });
1798 }
1799 return json!({});
1800 }
1801 _ => return json!({}),
1802 };
1803
1804 if obj
1805 .get("todos")
1806 .and_then(|v| v.as_array())
1807 .map(|arr| !arr.is_empty())
1808 .unwrap_or(false)
1809 {
1810 return Value::Object(obj);
1811 }
1812
1813 for alias in ["tasks", "items", "list", "checklist"] {
1814 if let Some(items) = obj.get(alias).and_then(|v| v.as_array()) {
1815 let normalized = normalize_todo_arg_items(items.clone());
1816 if !normalized.is_empty() {
1817 obj.insert("todos".to_string(), Value::Array(normalized));
1818 return Value::Object(obj);
1819 }
1820 }
1821 }
1822
1823 let derived = extract_todo_candidates_from_text(completion);
1824 if !derived.is_empty() {
1825 obj.insert("todos".to_string(), Value::Array(derived));
1826 }
1827 Value::Object(obj)
1828}
1829
1830fn normalize_todo_arg_items(items: Vec<Value>) -> Vec<Value> {
1831 items
1832 .into_iter()
1833 .filter_map(|item| match item {
1834 Value::String(text) => {
1835 let content = text.trim();
1836 if content.is_empty() {
1837 None
1838 } else {
1839 Some(json!({"content": content}))
1840 }
1841 }
1842 Value::Object(mut obj) => {
1843 if !obj.contains_key("content") {
1844 if let Some(text) = obj.get("text").cloned() {
1845 obj.insert("content".to_string(), text);
1846 } else if let Some(title) = obj.get("title").cloned() {
1847 obj.insert("content".to_string(), title);
1848 } else if let Some(name) = obj.get("name").cloned() {
1849 obj.insert("content".to_string(), name);
1850 }
1851 }
1852 let content = obj
1853 .get("content")
1854 .and_then(|v| v.as_str())
1855 .map(str::trim)
1856 .unwrap_or("");
1857 if content.is_empty() {
1858 None
1859 } else {
1860 Some(Value::Object(obj))
1861 }
1862 }
1863 _ => None,
1864 })
1865 .collect()
1866}
1867
1868fn is_todo_status_update_args(args: &Value) -> bool {
1869 let Some(obj) = args.as_object() else {
1870 return false;
1871 };
1872 let has_status = obj
1873 .get("status")
1874 .and_then(|v| v.as_str())
1875 .map(|s| !s.trim().is_empty())
1876 .unwrap_or(false);
1877 let has_target =
1878 obj.get("task_id").is_some() || obj.get("todo_id").is_some() || obj.get("id").is_some();
1879 has_status && has_target
1880}
1881
1882fn is_empty_todo_write_args(args: &Value) -> bool {
1883 if is_todo_status_update_args(args) {
1884 return false;
1885 }
1886 let Some(obj) = args.as_object() else {
1887 return true;
1888 };
1889 !obj.get("todos")
1890 .and_then(|v| v.as_array())
1891 .map(|arr| !arr.is_empty())
1892 .unwrap_or(false)
1893}
1894
1895fn parse_streamed_tool_args(tool_name: &str, raw_args: &str) -> Value {
1896 let trimmed = raw_args.trim();
1897 if trimmed.is_empty() {
1898 return json!({});
1899 }
1900
1901 if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
1902 return normalize_streamed_tool_args(tool_name, parsed, trimmed);
1903 }
1904
1905 let kv_args = parse_function_style_args(trimmed);
1908 if !kv_args.is_empty() {
1909 return normalize_streamed_tool_args(tool_name, Value::Object(kv_args), trimmed);
1910 }
1911
1912 if normalize_tool_name(tool_name) == "websearch" {
1913 return json!({ "query": trimmed });
1914 }
1915
1916 json!({})
1917}
1918
1919fn normalize_streamed_tool_args(tool_name: &str, parsed: Value, raw: &str) -> Value {
1920 let normalized_tool = normalize_tool_name(tool_name);
1921 if normalized_tool != "websearch" {
1922 return parsed;
1923 }
1924
1925 match parsed {
1926 Value::Object(mut obj) => {
1927 if !has_websearch_query(&obj) && !raw.trim().is_empty() {
1928 obj.insert("query".to_string(), Value::String(raw.trim().to_string()));
1929 }
1930 Value::Object(obj)
1931 }
1932 Value::String(s) => {
1933 let q = s.trim();
1934 if q.is_empty() {
1935 json!({})
1936 } else {
1937 json!({ "query": q })
1938 }
1939 }
1940 other => other,
1941 }
1942}
1943
1944fn has_websearch_query(obj: &Map<String, Value>) -> bool {
1945 const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
1946 QUERY_KEYS.iter().any(|key| {
1947 obj.get(*key)
1948 .and_then(|v| v.as_str())
1949 .map(|s| !s.trim().is_empty())
1950 .unwrap_or(false)
1951 })
1952}
1953
1954fn extract_tool_call_from_value(value: &Value) -> Option<(String, Value)> {
1955 if let Some(obj) = value.as_object() {
1956 if let Some(tool) = obj.get("tool").and_then(|v| v.as_str()) {
1957 return Some((
1958 normalize_tool_name(tool),
1959 obj.get("args").cloned().unwrap_or_else(|| json!({})),
1960 ));
1961 }
1962
1963 if let Some(tool) = obj.get("name").and_then(|v| v.as_str()) {
1964 let args = obj
1965 .get("args")
1966 .cloned()
1967 .or_else(|| obj.get("arguments").cloned())
1968 .unwrap_or_else(|| json!({}));
1969 let normalized_tool = normalize_tool_name(tool);
1970 let args = if let Some(raw) = args.as_str() {
1971 parse_streamed_tool_args(&normalized_tool, raw)
1972 } else {
1973 args
1974 };
1975 return Some((normalized_tool, args));
1976 }
1977
1978 for key in [
1979 "tool_call",
1980 "toolCall",
1981 "call",
1982 "function_call",
1983 "functionCall",
1984 ] {
1985 if let Some(nested) = obj.get(key) {
1986 if let Some(found) = extract_tool_call_from_value(nested) {
1987 return Some(found);
1988 }
1989 }
1990 }
1991 }
1992
1993 if let Some(items) = value.as_array() {
1994 for item in items {
1995 if let Some(found) = extract_tool_call_from_value(item) {
1996 return Some(found);
1997 }
1998 }
1999 }
2000
2001 None
2002}
2003
2004fn extract_first_json_object(input: &str) -> Option<String> {
2005 let mut start = None;
2006 let mut depth = 0usize;
2007 for (idx, ch) in input.char_indices() {
2008 if ch == '{' {
2009 if start.is_none() {
2010 start = Some(idx);
2011 }
2012 depth += 1;
2013 } else if ch == '}' {
2014 if depth == 0 {
2015 continue;
2016 }
2017 depth -= 1;
2018 if depth == 0 {
2019 let begin = start?;
2020 let block = input.get(begin..=idx)?;
2021 return Some(block.to_string());
2022 }
2023 }
2024 }
2025 None
2026}
2027
2028fn extract_todo_candidates_from_text(input: &str) -> Vec<Value> {
2029 let mut seen = HashSet::<String>::new();
2030 let mut todos = Vec::new();
2031
2032 for raw_line in input.lines() {
2033 let mut line = raw_line.trim();
2034 let mut structured_line = false;
2035 if line.is_empty() {
2036 continue;
2037 }
2038 if line.starts_with("```") {
2039 continue;
2040 }
2041 if line.ends_with(':') {
2042 continue;
2043 }
2044 if let Some(rest) = line
2045 .strip_prefix("- [ ]")
2046 .or_else(|| line.strip_prefix("* [ ]"))
2047 .or_else(|| line.strip_prefix("- [x]"))
2048 .or_else(|| line.strip_prefix("* [x]"))
2049 {
2050 line = rest.trim();
2051 structured_line = true;
2052 } else if let Some(rest) = line.strip_prefix("- ").or_else(|| line.strip_prefix("* ")) {
2053 line = rest.trim();
2054 structured_line = true;
2055 } else {
2056 let bytes = line.as_bytes();
2057 let mut i = 0usize;
2058 while i < bytes.len() && bytes[i].is_ascii_digit() {
2059 i += 1;
2060 }
2061 if i > 0 && i + 1 < bytes.len() && (bytes[i] == b'.' || bytes[i] == b')') {
2062 line = line[i + 1..].trim();
2063 structured_line = true;
2064 }
2065 }
2066 if !structured_line {
2067 continue;
2068 }
2069
2070 let content = line.trim_matches(|c: char| c.is_whitespace() || c == '-' || c == '*');
2071 if content.len() < 5 || content.len() > 180 {
2072 continue;
2073 }
2074 let key = content.to_lowercase();
2075 if seen.contains(&key) {
2076 continue;
2077 }
2078 seen.insert(key);
2079 todos.push(json!({ "content": content }));
2080 if todos.len() >= 25 {
2081 break;
2082 }
2083 }
2084
2085 todos
2086}
2087
2088async fn emit_plan_todo_fallback(
2089 storage: std::sync::Arc<Storage>,
2090 bus: &EventBus,
2091 session_id: &str,
2092 message_id: &str,
2093 completion: &str,
2094) {
2095 let todos = extract_todo_candidates_from_text(completion);
2096 if todos.is_empty() {
2097 return;
2098 }
2099
2100 let invoke_part = WireMessagePart::tool_invocation(
2101 session_id,
2102 message_id,
2103 "todo_write",
2104 json!({"todos": todos.clone()}),
2105 );
2106 let call_id = invoke_part.id.clone();
2107 bus.publish(EngineEvent::new(
2108 "message.part.updated",
2109 json!({"part": invoke_part}),
2110 ));
2111
2112 if storage.set_todos(session_id, todos).await.is_err() {
2113 let mut failed_part =
2114 WireMessagePart::tool_result(session_id, message_id, "todo_write", json!(null));
2115 failed_part.id = call_id;
2116 failed_part.state = Some("failed".to_string());
2117 failed_part.error = Some("failed to persist plan todos".to_string());
2118 bus.publish(EngineEvent::new(
2119 "message.part.updated",
2120 json!({"part": failed_part}),
2121 ));
2122 return;
2123 }
2124
2125 let normalized = storage.get_todos(session_id).await;
2126 let mut result_part = WireMessagePart::tool_result(
2127 session_id,
2128 message_id,
2129 "todo_write",
2130 json!({ "todos": normalized }),
2131 );
2132 result_part.id = call_id;
2133 bus.publish(EngineEvent::new(
2134 "message.part.updated",
2135 json!({"part": result_part}),
2136 ));
2137 bus.publish(EngineEvent::new(
2138 "todo.updated",
2139 json!({
2140 "sessionID": session_id,
2141 "todos": normalized
2142 }),
2143 ));
2144}
2145
2146async fn emit_plan_question_fallback(
2147 storage: std::sync::Arc<Storage>,
2148 bus: &EventBus,
2149 session_id: &str,
2150 message_id: &str,
2151 completion: &str,
2152) {
2153 let trimmed = completion.trim();
2154 if trimmed.is_empty() {
2155 return;
2156 }
2157
2158 let hints = extract_todo_candidates_from_text(trimmed)
2159 .into_iter()
2160 .take(6)
2161 .filter_map(|v| {
2162 v.get("content")
2163 .and_then(|c| c.as_str())
2164 .map(ToString::to_string)
2165 })
2166 .collect::<Vec<_>>();
2167
2168 let mut options = hints
2169 .iter()
2170 .map(|label| json!({"label": label, "description": "Use this as a starting task"}))
2171 .collect::<Vec<_>>();
2172 if options.is_empty() {
2173 options = vec![
2174 json!({"label":"Define scope", "description":"Clarify the intended outcome"}),
2175 json!({"label":"Provide constraints", "description":"Budget, timeline, and constraints"}),
2176 json!({"label":"Draft a starter list", "description":"Generate a first-pass task list"}),
2177 ];
2178 }
2179
2180 let question_payload = vec![json!({
2181 "header":"Planning Input",
2182 "question":"I couldn't produce a concrete task list yet. Which tasks should I include first?",
2183 "options": options,
2184 "multiple": true,
2185 "custom": true
2186 })];
2187
2188 let request = storage
2189 .add_question_request(session_id, message_id, question_payload.clone())
2190 .await
2191 .ok();
2192 bus.publish(EngineEvent::new(
2193 "question.asked",
2194 json!({
2195 "id": request
2196 .as_ref()
2197 .map(|req| req.id.clone())
2198 .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
2199 "sessionID": session_id,
2200 "messageID": message_id,
2201 "questions": question_payload,
2202 "tool": request.and_then(|req| {
2203 req.tool.map(|tool| {
2204 json!({
2205 "callID": tool.call_id,
2206 "messageID": tool.message_id
2207 })
2208 })
2209 })
2210 }),
2211 ));
2212}
2213
2214async fn load_chat_history(storage: std::sync::Arc<Storage>, session_id: &str) -> Vec<ChatMessage> {
2215 let Some(session) = storage.get_session(session_id).await else {
2216 return Vec::new();
2217 };
2218 let messages = session
2219 .messages
2220 .into_iter()
2221 .map(|m| {
2222 let role = format!("{:?}", m.role).to_lowercase();
2223 let content = m
2224 .parts
2225 .into_iter()
2226 .map(|part| match part {
2227 MessagePart::Text { text } => text,
2228 MessagePart::Reasoning { text } => text,
2229 MessagePart::ToolInvocation { tool, result, .. } => {
2230 format!("Tool {tool} => {}", result.unwrap_or_else(|| json!({})))
2231 }
2232 })
2233 .collect::<Vec<_>>()
2234 .join("\n");
2235 ChatMessage { role, content }
2236 })
2237 .collect::<Vec<_>>();
2238 compact_chat_history(messages)
2239}
2240
2241async fn emit_tool_side_events(
2242 storage: std::sync::Arc<Storage>,
2243 bus: &EventBus,
2244 session_id: &str,
2245 message_id: &str,
2246 tool: &str,
2247 args: &serde_json::Value,
2248 metadata: &serde_json::Value,
2249) {
2250 if tool == "todo_write" {
2251 let todos_from_metadata = metadata
2252 .get("todos")
2253 .and_then(|v| v.as_array())
2254 .cloned()
2255 .unwrap_or_default();
2256
2257 if !todos_from_metadata.is_empty() {
2258 let _ = storage.set_todos(session_id, todos_from_metadata).await;
2259 } else {
2260 let current = storage.get_todos(session_id).await;
2261 if let Some(updated) = apply_todo_updates_from_args(current, args) {
2262 let _ = storage.set_todos(session_id, updated).await;
2263 }
2264 }
2265
2266 let normalized = storage.get_todos(session_id).await;
2267 bus.publish(EngineEvent::new(
2268 "todo.updated",
2269 json!({
2270 "sessionID": session_id,
2271 "todos": normalized
2272 }),
2273 ));
2274 }
2275 if tool == "question" {
2276 let questions = metadata
2277 .get("questions")
2278 .and_then(|v| v.as_array())
2279 .cloned()
2280 .unwrap_or_default();
2281 let request = storage
2282 .add_question_request(session_id, message_id, questions.clone())
2283 .await
2284 .ok();
2285 bus.publish(EngineEvent::new(
2286 "question.asked",
2287 json!({
2288 "id": request
2289 .as_ref()
2290 .map(|req| req.id.clone())
2291 .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
2292 "sessionID": session_id,
2293 "messageID": message_id,
2294 "questions": questions,
2295 "tool": request.and_then(|req| {
2296 req.tool.map(|tool| {
2297 json!({
2298 "callID": tool.call_id,
2299 "messageID": tool.message_id
2300 })
2301 })
2302 })
2303 }),
2304 ));
2305 }
2306}
2307
2308fn apply_todo_updates_from_args(current: Vec<Value>, args: &Value) -> Option<Vec<Value>> {
2309 let obj = args.as_object()?;
2310 let mut todos = current;
2311 let mut changed = false;
2312
2313 if let Some(items) = obj.get("todos").and_then(|v| v.as_array()) {
2314 for item in items {
2315 let Some(item_obj) = item.as_object() else {
2316 continue;
2317 };
2318 let status = item_obj
2319 .get("status")
2320 .and_then(|v| v.as_str())
2321 .map(normalize_todo_status);
2322 let target = item_obj
2323 .get("task_id")
2324 .or_else(|| item_obj.get("todo_id"))
2325 .or_else(|| item_obj.get("id"));
2326
2327 if let (Some(status), Some(target)) = (status, target) {
2328 changed |= apply_single_todo_status_update(&mut todos, target, &status);
2329 }
2330 }
2331 }
2332
2333 let status = obj
2334 .get("status")
2335 .and_then(|v| v.as_str())
2336 .map(normalize_todo_status);
2337 let target = obj
2338 .get("task_id")
2339 .or_else(|| obj.get("todo_id"))
2340 .or_else(|| obj.get("id"));
2341 if let (Some(status), Some(target)) = (status, target) {
2342 changed |= apply_single_todo_status_update(&mut todos, target, &status);
2343 }
2344
2345 if changed {
2346 Some(todos)
2347 } else {
2348 None
2349 }
2350}
2351
2352fn apply_single_todo_status_update(todos: &mut [Value], target: &Value, status: &str) -> bool {
2353 let idx_from_value = match target {
2354 Value::Number(n) => n.as_u64().map(|v| v.saturating_sub(1) as usize),
2355 Value::String(s) => {
2356 let trimmed = s.trim();
2357 trimmed
2358 .parse::<usize>()
2359 .ok()
2360 .map(|v| v.saturating_sub(1))
2361 .or_else(|| {
2362 let digits = trimmed
2363 .chars()
2364 .rev()
2365 .take_while(|c| c.is_ascii_digit())
2366 .collect::<String>()
2367 .chars()
2368 .rev()
2369 .collect::<String>();
2370 digits.parse::<usize>().ok().map(|v| v.saturating_sub(1))
2371 })
2372 }
2373 _ => None,
2374 };
2375
2376 if let Some(idx) = idx_from_value {
2377 if idx < todos.len() {
2378 if let Some(obj) = todos[idx].as_object_mut() {
2379 obj.insert("status".to_string(), Value::String(status.to_string()));
2380 return true;
2381 }
2382 }
2383 }
2384
2385 let id_target = target.as_str().map(|s| s.trim()).filter(|s| !s.is_empty());
2386 if let Some(id_target) = id_target {
2387 for todo in todos.iter_mut() {
2388 if let Some(obj) = todo.as_object_mut() {
2389 if obj.get("id").and_then(|v| v.as_str()) == Some(id_target) {
2390 obj.insert("status".to_string(), Value::String(status.to_string()));
2391 return true;
2392 }
2393 }
2394 }
2395 }
2396
2397 false
2398}
2399
2400fn normalize_todo_status(raw: &str) -> String {
2401 match raw.trim().to_lowercase().as_str() {
2402 "in_progress" | "inprogress" | "running" | "working" => "in_progress".to_string(),
2403 "done" | "complete" | "completed" => "completed".to_string(),
2404 "cancelled" | "canceled" | "aborted" | "skipped" => "cancelled".to_string(),
2405 "open" | "todo" | "pending" => "pending".to_string(),
2406 other => other.to_string(),
2407 }
2408}
2409
2410fn compact_chat_history(messages: Vec<ChatMessage>) -> Vec<ChatMessage> {
2411 const MAX_CONTEXT_CHARS: usize = 80_000;
2412 const KEEP_RECENT_MESSAGES: usize = 40;
2413
2414 if messages.len() <= KEEP_RECENT_MESSAGES {
2415 let total_chars = messages.iter().map(|m| m.content.len()).sum::<usize>();
2416 if total_chars <= MAX_CONTEXT_CHARS {
2417 return messages;
2418 }
2419 }
2420
2421 let mut kept = messages;
2422 let mut dropped_count = 0usize;
2423 let mut total_chars = kept.iter().map(|m| m.content.len()).sum::<usize>();
2424
2425 while kept.len() > KEEP_RECENT_MESSAGES || total_chars > MAX_CONTEXT_CHARS {
2426 if kept.is_empty() {
2427 break;
2428 }
2429 let removed = kept.remove(0);
2430 total_chars = total_chars.saturating_sub(removed.content.len());
2431 dropped_count += 1;
2432 }
2433
2434 if dropped_count > 0 {
2435 kept.insert(
2436 0,
2437 ChatMessage {
2438 role: "system".to_string(),
2439 content: format!(
2440 "[history compacted: omitted {} older messages to fit context window]",
2441 dropped_count
2442 ),
2443 },
2444 );
2445 }
2446 kept
2447}
2448
2449#[cfg(test)]
2450mod tests {
2451 use super::*;
2452 use crate::{EventBus, Storage};
2453 use uuid::Uuid;
2454
2455 #[tokio::test]
2456 async fn todo_updated_event_is_normalized() {
2457 let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
2458 let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
2459 let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
2460 let session_id = session.id.clone();
2461 storage.save_session(session).await.expect("save session");
2462
2463 let bus = EventBus::new();
2464 let mut rx = bus.subscribe();
2465 emit_tool_side_events(
2466 storage.clone(),
2467 &bus,
2468 &session_id,
2469 "m1",
2470 "todo_write",
2471 &json!({"todos":[{"content":"ship parity"}]}),
2472 &json!({"todos":[{"content":"ship parity"}]}),
2473 )
2474 .await;
2475
2476 let event = rx.recv().await.expect("event");
2477 assert_eq!(event.event_type, "todo.updated");
2478 let todos = event
2479 .properties
2480 .get("todos")
2481 .and_then(|v| v.as_array())
2482 .cloned()
2483 .unwrap_or_default();
2484 assert_eq!(todos.len(), 1);
2485 assert!(todos[0].get("id").and_then(|v| v.as_str()).is_some());
2486 assert_eq!(
2487 todos[0].get("content").and_then(|v| v.as_str()),
2488 Some("ship parity")
2489 );
2490 assert!(todos[0].get("status").and_then(|v| v.as_str()).is_some());
2491 }
2492
2493 #[tokio::test]
2494 async fn question_asked_event_contains_tool_reference() {
2495 let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
2496 let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
2497 let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
2498 let session_id = session.id.clone();
2499 storage.save_session(session).await.expect("save session");
2500
2501 let bus = EventBus::new();
2502 let mut rx = bus.subscribe();
2503 emit_tool_side_events(
2504 storage,
2505 &bus,
2506 &session_id,
2507 "msg-1",
2508 "question",
2509 &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
2510 &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
2511 )
2512 .await;
2513
2514 let event = rx.recv().await.expect("event");
2515 assert_eq!(event.event_type, "question.asked");
2516 assert_eq!(
2517 event
2518 .properties
2519 .get("sessionID")
2520 .and_then(|v| v.as_str())
2521 .unwrap_or(""),
2522 session_id
2523 );
2524 let tool = event
2525 .properties
2526 .get("tool")
2527 .cloned()
2528 .unwrap_or_else(|| json!({}));
2529 assert!(tool.get("callID").and_then(|v| v.as_str()).is_some());
2530 assert_eq!(
2531 tool.get("messageID").and_then(|v| v.as_str()),
2532 Some("msg-1")
2533 );
2534 }
2535
2536 #[test]
2537 fn compact_chat_history_keeps_recent_and_inserts_summary() {
2538 let mut messages = Vec::new();
2539 for i in 0..60 {
2540 messages.push(ChatMessage {
2541 role: "user".to_string(),
2542 content: format!("message-{i}"),
2543 });
2544 }
2545 let compacted = compact_chat_history(messages);
2546 assert!(compacted.len() <= 41);
2547 assert_eq!(compacted[0].role, "system");
2548 assert!(compacted[0].content.contains("history compacted"));
2549 assert!(compacted.iter().any(|m| m.content.contains("message-59")));
2550 }
2551
2552 #[test]
2553 fn extracts_todos_from_checklist_and_numbered_lines() {
2554 let input = r#"
2555Plan:
2556- [ ] Audit current implementation
2557- [ ] Add planner fallback
25581. Add regression test coverage
2559"#;
2560 let todos = extract_todo_candidates_from_text(input);
2561 assert_eq!(todos.len(), 3);
2562 assert_eq!(
2563 todos[0].get("content").and_then(|v| v.as_str()),
2564 Some("Audit current implementation")
2565 );
2566 }
2567
2568 #[test]
2569 fn does_not_extract_todos_from_plain_prose_lines() {
2570 let input = r#"
2571I need more information to proceed.
2572Can you tell me the event size and budget?
2573Once I have that, I can provide a detailed plan.
2574"#;
2575 let todos = extract_todo_candidates_from_text(input);
2576 assert!(todos.is_empty());
2577 }
2578
2579 #[test]
2580 fn parses_wrapped_tool_call_from_markdown_response() {
2581 let input = r#"
2582Here is the tool call:
2583```json
2584{"tool_call":{"name":"todo_write","arguments":{"todos":[{"content":"a"}]}}}
2585```
2586"#;
2587 let parsed = parse_tool_invocation_from_response(input).expect("tool call");
2588 assert_eq!(parsed.0, "todo_write");
2589 assert!(parsed.1.get("todos").is_some());
2590 }
2591
2592 #[test]
2593 fn parses_function_style_todowrite_call() {
2594 let input = r#"Status: Completed
2595Call: todowrite(task_id=2, status="completed")"#;
2596 let parsed = parse_tool_invocation_from_response(input).expect("function-style tool call");
2597 assert_eq!(parsed.0, "todo_write");
2598 assert_eq!(parsed.1.get("task_id").and_then(|v| v.as_i64()), Some(2));
2599 assert_eq!(
2600 parsed.1.get("status").and_then(|v| v.as_str()),
2601 Some("completed")
2602 );
2603 }
2604
2605 #[test]
2606 fn parses_multiple_function_style_todowrite_calls() {
2607 let input = r#"
2608Call: todowrite(task_id=2, status="completed")
2609Call: todowrite(task_id=3, status="in_progress")
2610"#;
2611 let parsed = parse_tool_invocations_from_response(input);
2612 assert_eq!(parsed.len(), 2);
2613 assert_eq!(parsed[0].0, "todo_write");
2614 assert_eq!(parsed[0].1.get("task_id").and_then(|v| v.as_i64()), Some(2));
2615 assert_eq!(
2616 parsed[0].1.get("status").and_then(|v| v.as_str()),
2617 Some("completed")
2618 );
2619 assert_eq!(parsed[1].1.get("task_id").and_then(|v| v.as_i64()), Some(3));
2620 assert_eq!(
2621 parsed[1].1.get("status").and_then(|v| v.as_str()),
2622 Some("in_progress")
2623 );
2624 }
2625
2626 #[test]
2627 fn applies_todo_status_update_from_task_id_args() {
2628 let current = vec![
2629 json!({"id":"todo-1","content":"a","status":"pending"}),
2630 json!({"id":"todo-2","content":"b","status":"pending"}),
2631 json!({"id":"todo-3","content":"c","status":"pending"}),
2632 ];
2633 let updated =
2634 apply_todo_updates_from_args(current, &json!({"task_id":2, "status":"completed"}))
2635 .expect("status update");
2636 assert_eq!(
2637 updated[1].get("status").and_then(|v| v.as_str()),
2638 Some("completed")
2639 );
2640 }
2641
2642 #[test]
2643 fn normalizes_todo_write_tasks_alias() {
2644 let normalized = normalize_todo_write_args(
2645 json!({"tasks":[{"title":"Book venue"},{"name":"Send invites"}]}),
2646 "",
2647 );
2648 let todos = normalized
2649 .get("todos")
2650 .and_then(|v| v.as_array())
2651 .cloned()
2652 .unwrap_or_default();
2653 assert_eq!(todos.len(), 2);
2654 assert_eq!(
2655 todos[0].get("content").and_then(|v| v.as_str()),
2656 Some("Book venue")
2657 );
2658 assert_eq!(
2659 todos[1].get("content").and_then(|v| v.as_str()),
2660 Some("Send invites")
2661 );
2662 }
2663
2664 #[test]
2665 fn normalizes_todo_write_from_completion_when_args_empty() {
2666 let completion = "Plan:\n1. Secure venue\n2. Create playlist\n3. Send invites";
2667 let normalized = normalize_todo_write_args(json!({}), completion);
2668 let todos = normalized
2669 .get("todos")
2670 .and_then(|v| v.as_array())
2671 .cloned()
2672 .unwrap_or_default();
2673 assert_eq!(todos.len(), 3);
2674 assert!(!is_empty_todo_write_args(&normalized));
2675 }
2676
2677 #[test]
2678 fn empty_todo_write_args_allows_status_updates() {
2679 let args = json!({"task_id": 2, "status":"completed"});
2680 assert!(!is_empty_todo_write_args(&args));
2681 }
2682
2683 #[test]
2684 fn streamed_websearch_args_fallback_to_query_string() {
2685 let parsed = parse_streamed_tool_args("websearch", "meaning of life");
2686 assert_eq!(
2687 parsed.get("query").and_then(|v| v.as_str()),
2688 Some("meaning of life")
2689 );
2690 }
2691
2692 #[test]
2693 fn streamed_websearch_stringified_json_args_are_unwrapped() {
2694 let parsed = parse_streamed_tool_args("websearch", r#""donkey gestation period""#);
2695 assert_eq!(
2696 parsed.get("query").and_then(|v| v.as_str()),
2697 Some("donkey gestation period")
2698 );
2699 }
2700
2701 #[test]
2702 fn normalize_tool_args_websearch_infers_from_user_text() {
2703 let normalized =
2704 normalize_tool_args("websearch", json!({}), "web search meaning of life", "");
2705 assert_eq!(
2706 normalized.args.get("query").and_then(|v| v.as_str()),
2707 Some("meaning of life")
2708 );
2709 assert_eq!(normalized.args_source, "inferred_from_user");
2710 assert_eq!(normalized.args_integrity, "recovered");
2711 }
2712
2713 #[test]
2714 fn normalize_tool_args_websearch_keeps_existing_query() {
2715 let normalized = normalize_tool_args(
2716 "websearch",
2717 json!({"query":"already set"}),
2718 "web search should not override",
2719 "",
2720 );
2721 assert_eq!(
2722 normalized.args.get("query").and_then(|v| v.as_str()),
2723 Some("already set")
2724 );
2725 assert_eq!(normalized.args_source, "provider_json");
2726 assert_eq!(normalized.args_integrity, "ok");
2727 }
2728
2729 #[test]
2730 fn normalize_tool_args_websearch_fails_when_unrecoverable() {
2731 let normalized = normalize_tool_args("websearch", json!({}), "search", "");
2732 assert!(normalized.query.is_none());
2733 assert!(normalized.missing_terminal);
2734 assert_eq!(normalized.args_source, "missing");
2735 assert_eq!(normalized.args_integrity, "empty");
2736 }
2737}