Skip to main content

tandem_core/
engine_loop.rs

1use chrono::Utc;
2use futures::future::BoxFuture;
3use futures::StreamExt;
4use serde_json::{json, Map, Number, Value};
5use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
6use std::hash::{Hash, Hasher};
7use std::path::{Path, PathBuf};
8use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
9use tandem_providers::{ChatMessage, ProviderRegistry, StreamChunk, TokenUsage};
10use tandem_tools::{validate_tool_schemas, ToolRegistry};
11use tandem_types::{
12    EngineEvent, Message, MessagePart, MessagePartInput, MessageRole, SendMessageRequest,
13};
14use tandem_wire::WireMessagePart;
15use tokio_util::sync::CancellationToken;
16use tracing::Level;
17
18use crate::{
19    derive_session_title_from_prompt, title_needs_repair, AgentDefinition, AgentRegistry,
20    CancellationRegistry, EventBus, PermissionAction, PermissionManager, PluginRegistry, Storage,
21};
22use tokio::sync::RwLock;
23
24#[derive(Default)]
25struct StreamedToolCall {
26    name: String,
27    args: String,
28}
29
30#[derive(Debug, Clone)]
31pub struct SpawnAgentToolContext {
32    pub session_id: String,
33    pub message_id: String,
34    pub tool_call_id: Option<String>,
35    pub args: Value,
36}
37
38#[derive(Debug, Clone)]
39pub struct SpawnAgentToolResult {
40    pub output: String,
41    pub metadata: Value,
42}
43
44#[derive(Debug, Clone)]
45pub struct ToolPolicyContext {
46    pub session_id: String,
47    pub message_id: String,
48    pub tool: String,
49    pub args: Value,
50}
51
52#[derive(Debug, Clone)]
53pub struct ToolPolicyDecision {
54    pub allowed: bool,
55    pub reason: Option<String>,
56}
57
58pub trait SpawnAgentHook: Send + Sync {
59    fn spawn_agent(
60        &self,
61        ctx: SpawnAgentToolContext,
62    ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>>;
63}
64
65pub trait ToolPolicyHook: Send + Sync {
66    fn evaluate_tool(
67        &self,
68        ctx: ToolPolicyContext,
69    ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>>;
70}
71
72#[derive(Clone)]
73pub struct EngineLoop {
74    storage: std::sync::Arc<Storage>,
75    event_bus: EventBus,
76    providers: ProviderRegistry,
77    plugins: PluginRegistry,
78    agents: AgentRegistry,
79    permissions: PermissionManager,
80    tools: ToolRegistry,
81    cancellations: CancellationRegistry,
82    workspace_overrides: std::sync::Arc<RwLock<HashMap<String, u64>>>,
83    spawn_agent_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn SpawnAgentHook>>>>,
84    tool_policy_hook: std::sync::Arc<RwLock<Option<std::sync::Arc<dyn ToolPolicyHook>>>>,
85}
86
87impl EngineLoop {
88    #[allow(clippy::too_many_arguments)]
89    pub fn new(
90        storage: std::sync::Arc<Storage>,
91        event_bus: EventBus,
92        providers: ProviderRegistry,
93        plugins: PluginRegistry,
94        agents: AgentRegistry,
95        permissions: PermissionManager,
96        tools: ToolRegistry,
97        cancellations: CancellationRegistry,
98    ) -> Self {
99        Self {
100            storage,
101            event_bus,
102            providers,
103            plugins,
104            agents,
105            permissions,
106            tools,
107            cancellations,
108            workspace_overrides: std::sync::Arc::new(RwLock::new(HashMap::new())),
109            spawn_agent_hook: std::sync::Arc::new(RwLock::new(None)),
110            tool_policy_hook: std::sync::Arc::new(RwLock::new(None)),
111        }
112    }
113
114    pub async fn set_spawn_agent_hook(&self, hook: std::sync::Arc<dyn SpawnAgentHook>) {
115        *self.spawn_agent_hook.write().await = Some(hook);
116    }
117
118    pub async fn set_tool_policy_hook(&self, hook: std::sync::Arc<dyn ToolPolicyHook>) {
119        *self.tool_policy_hook.write().await = Some(hook);
120    }
121
122    pub async fn grant_workspace_override_for_session(
123        &self,
124        session_id: &str,
125        ttl_seconds: u64,
126    ) -> u64 {
127        let expires_at = chrono::Utc::now()
128            .timestamp_millis()
129            .max(0)
130            .saturating_add((ttl_seconds as i64).saturating_mul(1000))
131            as u64;
132        self.workspace_overrides
133            .write()
134            .await
135            .insert(session_id.to_string(), expires_at);
136        expires_at
137    }
138
139    pub async fn run_prompt_async(
140        &self,
141        session_id: String,
142        req: SendMessageRequest,
143    ) -> anyhow::Result<()> {
144        self.run_prompt_async_with_context(session_id, req, None)
145            .await
146    }
147
148    pub async fn run_prompt_async_with_context(
149        &self,
150        session_id: String,
151        req: SendMessageRequest,
152        correlation_id: Option<String>,
153    ) -> anyhow::Result<()> {
154        let session_provider = self
155            .storage
156            .get_session(&session_id)
157            .await
158            .and_then(|s| s.provider);
159        let provider_hint = req
160            .model
161            .as_ref()
162            .map(|m| m.provider_id.clone())
163            .or(session_provider);
164        let correlation_ref = correlation_id.as_deref();
165        let model_id = req.model.as_ref().map(|m| m.model_id.as_str());
166        let cancel = self.cancellations.create(&session_id).await;
167        emit_event(
168            Level::INFO,
169            ProcessKind::Engine,
170            ObservabilityEvent {
171                event: "provider.call.start",
172                component: "engine.loop",
173                correlation_id: correlation_ref,
174                session_id: Some(&session_id),
175                run_id: None,
176                message_id: None,
177                provider_id: provider_hint.as_deref(),
178                model_id,
179                status: Some("start"),
180                error_code: None,
181                detail: Some("run_prompt_async dispatch"),
182            },
183        );
184        self.event_bus.publish(EngineEvent::new(
185            "session.status",
186            json!({"sessionID": session_id, "status":"running"}),
187        ));
188        let text = req
189            .parts
190            .iter()
191            .map(|p| match p {
192                MessagePartInput::Text { text } => text.clone(),
193                MessagePartInput::File {
194                    mime,
195                    filename,
196                    url,
197                } => format!(
198                    "[file mime={} name={} url={}]",
199                    mime,
200                    filename.clone().unwrap_or_else(|| "unknown".to_string()),
201                    url
202                ),
203            })
204            .collect::<Vec<_>>()
205            .join("\n");
206        self.auto_rename_session_from_user_text(&session_id, &text)
207            .await;
208        let active_agent = self.agents.get(req.agent.as_deref()).await;
209        let mut user_message_id = self
210            .find_recent_matching_user_message_id(&session_id, &text)
211            .await;
212        if user_message_id.is_none() {
213            let user_message = Message::new(
214                MessageRole::User,
215                vec![MessagePart::Text { text: text.clone() }],
216            );
217            let created_message_id = user_message.id.clone();
218            self.storage
219                .append_message(&session_id, user_message)
220                .await?;
221
222            let user_part = WireMessagePart::text(&session_id, &created_message_id, text.clone());
223            self.event_bus.publish(EngineEvent::new(
224                "message.part.updated",
225                json!({
226                    "part": user_part,
227                    "delta": text,
228                    "agent": active_agent.name
229                }),
230            ));
231            user_message_id = Some(created_message_id);
232        }
233        let user_message_id = user_message_id.unwrap_or_else(|| "unknown".to_string());
234
235        if cancel.is_cancelled() {
236            self.event_bus.publish(EngineEvent::new(
237                "session.status",
238                json!({"sessionID": session_id, "status":"cancelled"}),
239            ));
240            self.cancellations.remove(&session_id).await;
241            return Ok(());
242        }
243
244        let mut question_tool_used = false;
245        let completion = if let Some((tool, args)) = parse_tool_invocation(&text) {
246            if normalize_tool_name(&tool) == "question" {
247                question_tool_used = true;
248            }
249            if !agent_can_use_tool(&active_agent, &tool) {
250                format!(
251                    "Tool `{tool}` is not enabled for agent `{}`.",
252                    active_agent.name
253                )
254            } else {
255                self.execute_tool_with_permission(
256                    &session_id,
257                    &user_message_id,
258                    tool.clone(),
259                    args,
260                    active_agent.skills.as_deref(),
261                    &text,
262                    None,
263                    cancel.clone(),
264                )
265                .await?
266                .unwrap_or_default()
267            }
268        } else {
269            let mut completion = String::new();
270            let mut max_iterations = 25usize;
271            let mut followup_context: Option<String> = None;
272            let mut last_tool_outputs: Vec<String> = Vec::new();
273            let mut tool_call_counts: HashMap<String, usize> = HashMap::new();
274            let mut readonly_tool_cache: HashMap<String, String> = HashMap::new();
275            let mut readonly_signature_counts: HashMap<String, usize> = HashMap::new();
276            let mut websearch_query_blocked = false;
277            let mut auto_workspace_probe_attempted = false;
278
279            while max_iterations > 0 && !cancel.is_cancelled() {
280                max_iterations -= 1;
281                let mut messages = load_chat_history(self.storage.clone(), &session_id).await;
282                let mut system_parts = vec![tandem_runtime_system_prompt().to_string()];
283                if let Some(system) = active_agent.system_prompt.as_ref() {
284                    system_parts.push(system.clone());
285                }
286                messages.insert(
287                    0,
288                    ChatMessage {
289                        role: "system".to_string(),
290                        content: system_parts.join("\n\n"),
291                    },
292                );
293                if let Some(extra) = followup_context.take() {
294                    messages.push(ChatMessage {
295                        role: "user".to_string(),
296                        content: extra,
297                    });
298                }
299                let tool_schemas = self.tools.list().await;
300                if let Err(validation_err) = validate_tool_schemas(&tool_schemas) {
301                    let detail = validation_err.to_string();
302                    emit_event(
303                        Level::ERROR,
304                        ProcessKind::Engine,
305                        ObservabilityEvent {
306                            event: "provider.call.error",
307                            component: "engine.loop",
308                            correlation_id: correlation_ref,
309                            session_id: Some(&session_id),
310                            run_id: None,
311                            message_id: Some(&user_message_id),
312                            provider_id: provider_hint.as_deref(),
313                            model_id,
314                            status: Some("failed"),
315                            error_code: Some("TOOL_SCHEMA_INVALID"),
316                            detail: Some(&detail),
317                        },
318                    );
319                    anyhow::bail!("{detail}");
320                }
321                let stream = self
322                    .providers
323                    .stream_for_provider(
324                        provider_hint.as_deref(),
325                        messages,
326                        Some(tool_schemas),
327                        cancel.clone(),
328                    )
329                    .await
330                    .inspect_err(|err| {
331                        let error_text = err.to_string();
332                        let error_code = provider_error_code(&error_text);
333                        let detail = truncate_text(&error_text, 500);
334                        emit_event(
335                            Level::ERROR,
336                            ProcessKind::Engine,
337                            ObservabilityEvent {
338                                event: "provider.call.error",
339                                component: "engine.loop",
340                                correlation_id: correlation_ref,
341                                session_id: Some(&session_id),
342                                run_id: None,
343                                message_id: Some(&user_message_id),
344                                provider_id: provider_hint.as_deref(),
345                                model_id,
346                                status: Some("failed"),
347                                error_code: Some(error_code),
348                                detail: Some(&detail),
349                            },
350                        );
351                    })?;
352                tokio::pin!(stream);
353                completion.clear();
354                let mut streamed_tool_calls: HashMap<String, StreamedToolCall> = HashMap::new();
355                let mut provider_usage: Option<TokenUsage> = None;
356                while let Some(chunk) = stream.next().await {
357                    let chunk = match chunk {
358                        Ok(chunk) => chunk,
359                        Err(err) => {
360                            let error_text = err.to_string();
361                            let error_code = provider_error_code(&error_text);
362                            let detail = truncate_text(&error_text, 500);
363                            emit_event(
364                                Level::ERROR,
365                                ProcessKind::Engine,
366                                ObservabilityEvent {
367                                    event: "provider.call.error",
368                                    component: "engine.loop",
369                                    correlation_id: correlation_ref,
370                                    session_id: Some(&session_id),
371                                    run_id: None,
372                                    message_id: Some(&user_message_id),
373                                    provider_id: provider_hint.as_deref(),
374                                    model_id,
375                                    status: Some("failed"),
376                                    error_code: Some(error_code),
377                                    detail: Some(&detail),
378                                },
379                            );
380                            return Err(anyhow::anyhow!(
381                                "provider stream chunk error: {error_text}"
382                            ));
383                        }
384                    };
385                    match chunk {
386                        StreamChunk::TextDelta(delta) => {
387                            if completion.is_empty() {
388                                emit_event(
389                                    Level::INFO,
390                                    ProcessKind::Engine,
391                                    ObservabilityEvent {
392                                        event: "provider.call.first_byte",
393                                        component: "engine.loop",
394                                        correlation_id: correlation_ref,
395                                        session_id: Some(&session_id),
396                                        run_id: None,
397                                        message_id: Some(&user_message_id),
398                                        provider_id: provider_hint.as_deref(),
399                                        model_id,
400                                        status: Some("streaming"),
401                                        error_code: None,
402                                        detail: Some("first text delta"),
403                                    },
404                                );
405                            }
406                            completion.push_str(&delta);
407                            let delta = truncate_text(&delta, 4_000);
408                            let delta_part =
409                                WireMessagePart::text(&session_id, &user_message_id, delta.clone());
410                            self.event_bus.publish(EngineEvent::new(
411                                "message.part.updated",
412                                json!({"part": delta_part, "delta": delta}),
413                            ));
414                        }
415                        StreamChunk::ReasoningDelta(_reasoning) => {}
416                        StreamChunk::Done {
417                            finish_reason: _,
418                            usage,
419                        } => {
420                            if usage.is_some() {
421                                provider_usage = usage;
422                            }
423                            break;
424                        }
425                        StreamChunk::ToolCallStart { id, name } => {
426                            let entry = streamed_tool_calls.entry(id).or_default();
427                            if entry.name.is_empty() {
428                                entry.name = name;
429                            }
430                        }
431                        StreamChunk::ToolCallDelta { id, args_delta } => {
432                            let entry = streamed_tool_calls.entry(id).or_default();
433                            entry.args.push_str(&args_delta);
434                        }
435                        StreamChunk::ToolCallEnd { id: _ } => {}
436                    }
437                    if cancel.is_cancelled() {
438                        break;
439                    }
440                }
441
442                let mut tool_calls = streamed_tool_calls
443                    .into_values()
444                    .filter_map(|call| {
445                        if call.name.trim().is_empty() {
446                            return None;
447                        }
448                        let tool_name = normalize_tool_name(&call.name);
449                        let parsed_args = parse_streamed_tool_args(&tool_name, &call.args);
450                        Some((tool_name, parsed_args))
451                    })
452                    .collect::<Vec<_>>();
453                if tool_calls.is_empty() {
454                    tool_calls = parse_tool_invocations_from_response(&completion);
455                }
456                if tool_calls.is_empty()
457                    && !auto_workspace_probe_attempted
458                    && should_force_workspace_probe(&text, &completion)
459                {
460                    auto_workspace_probe_attempted = true;
461                    tool_calls = vec![("glob".to_string(), json!({ "pattern": "*" }))];
462                }
463                if !tool_calls.is_empty() {
464                    let mut outputs = Vec::new();
465                    for (tool, args) in tool_calls {
466                        if !agent_can_use_tool(&active_agent, &tool) {
467                            continue;
468                        }
469                        let tool_key = normalize_tool_name(&tool);
470                        if tool_key == "question" {
471                            question_tool_used = true;
472                        }
473                        if websearch_query_blocked && tool_key == "websearch" {
474                            outputs.push(
475                                "Tool `websearch` call skipped: WEBSEARCH_QUERY_MISSING"
476                                    .to_string(),
477                            );
478                            continue;
479                        }
480                        let entry = tool_call_counts.entry(tool_key.clone()).or_insert(0);
481                        *entry += 1;
482                        let budget = tool_budget_for(&tool_key);
483                        if *entry > budget {
484                            outputs.push(format!(
485                                "Tool `{}` call skipped: per-run guard budget exceeded ({}).",
486                                tool_key, budget
487                            ));
488                            continue;
489                        }
490                        let mut effective_args = args.clone();
491                        if tool_key == "todo_write" {
492                            effective_args = normalize_todo_write_args(effective_args, &completion);
493                            if is_empty_todo_write_args(&effective_args) {
494                                outputs.push(
495                                    "Tool `todo_write` call skipped: empty todo payload."
496                                        .to_string(),
497                                );
498                                continue;
499                            }
500                        }
501                        let signature = tool_signature(&tool_key, &args);
502                        let mut signature_count = 1usize;
503                        if is_read_only_tool(&tool_key) {
504                            let count = readonly_signature_counts
505                                .entry(signature.clone())
506                                .and_modify(|v| *v = v.saturating_add(1))
507                                .or_insert(1);
508                            signature_count = *count;
509                            if tool_key == "websearch" && *count > 2 {
510                                self.event_bus.publish(EngineEvent::new(
511                                    "tool.loop_guard.triggered",
512                                    json!({
513                                        "sessionID": session_id,
514                                        "messageID": user_message_id,
515                                        "tool": tool_key,
516                                        "reason": "duplicate_signature_retry_exhausted",
517                                        "queryHash": extract_websearch_query(&args).map(|q| stable_hash(&q)),
518                                        "loop_guard_triggered": true
519                                    }),
520                                ));
521                                outputs.push(
522                                    "Tool `websearch` call skipped: WEBSEARCH_LOOP_GUARD"
523                                        .to_string(),
524                                );
525                                continue;
526                            }
527                            if tool_key != "websearch" && *count > 1 {
528                                if let Some(cached) = readonly_tool_cache.get(&signature) {
529                                    outputs.push(cached.clone());
530                                } else {
531                                    outputs.push(format!(
532                                        "Tool `{}` call skipped: duplicate call signature detected.",
533                                        tool_key
534                                    ));
535                                }
536                                continue;
537                            }
538                        }
539                        if let Some(output) = self
540                            .execute_tool_with_permission(
541                                &session_id,
542                                &user_message_id,
543                                tool,
544                                effective_args,
545                                active_agent.skills.as_deref(),
546                                &text,
547                                Some(&completion),
548                                cancel.clone(),
549                            )
550                            .await?
551                        {
552                            if output.contains("WEBSEARCH_QUERY_MISSING") {
553                                websearch_query_blocked = true;
554                            }
555                            if is_read_only_tool(&tool_key)
556                                && tool_key != "websearch"
557                                && signature_count == 1
558                            {
559                                readonly_tool_cache.insert(signature, output.clone());
560                            }
561                            outputs.push(output);
562                        }
563                    }
564                    if !outputs.is_empty() {
565                        last_tool_outputs = outputs.clone();
566                        followup_context = Some(format!(
567                            "{}\nContinue with a concise final response and avoid repeating identical tool calls.",
568                            summarize_tool_outputs(&outputs)
569                        ));
570                        continue;
571                    }
572                }
573
574                if let Some(usage) = provider_usage {
575                    self.event_bus.publish(EngineEvent::new(
576                        "provider.usage",
577                        json!({
578                            "sessionID": session_id,
579                            "messageID": user_message_id,
580                            "promptTokens": usage.prompt_tokens,
581                            "completionTokens": usage.completion_tokens,
582                            "totalTokens": usage.total_tokens,
583                        }),
584                    ));
585                }
586
587                break;
588            }
589            if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
590                if let Some(narrative) = self
591                    .generate_final_narrative_without_tools(
592                        &session_id,
593                        &active_agent,
594                        provider_hint.as_deref(),
595                        cancel.clone(),
596                        &last_tool_outputs,
597                    )
598                    .await
599                {
600                    completion = narrative;
601                }
602            }
603            if completion.trim().is_empty() && !last_tool_outputs.is_empty() {
604                let preview = last_tool_outputs
605                    .iter()
606                    .take(3)
607                    .map(|o| truncate_text(o, 240))
608                    .collect::<Vec<_>>()
609                    .join("\n");
610                completion = format!(
611                    "I completed project analysis steps using tools, but the model returned no final narrative text.\n\nTool result summary:\n{}",
612                    preview
613                );
614            }
615            truncate_text(&completion, 16_000)
616        };
617        emit_event(
618            Level::INFO,
619            ProcessKind::Engine,
620            ObservabilityEvent {
621                event: "provider.call.finish",
622                component: "engine.loop",
623                correlation_id: correlation_ref,
624                session_id: Some(&session_id),
625                run_id: None,
626                message_id: Some(&user_message_id),
627                provider_id: provider_hint.as_deref(),
628                model_id,
629                status: Some("ok"),
630                error_code: None,
631                detail: Some("provider stream complete"),
632            },
633        );
634        if active_agent.name.eq_ignore_ascii_case("plan") {
635            emit_plan_todo_fallback(
636                self.storage.clone(),
637                &self.event_bus,
638                &session_id,
639                &user_message_id,
640                &completion,
641            )
642            .await;
643            let todos_after_fallback = self.storage.get_todos(&session_id).await;
644            if todos_after_fallback.is_empty() && !question_tool_used {
645                emit_plan_question_fallback(
646                    self.storage.clone(),
647                    &self.event_bus,
648                    &session_id,
649                    &user_message_id,
650                    &completion,
651                )
652                .await;
653            }
654        }
655        if cancel.is_cancelled() {
656            self.event_bus.publish(EngineEvent::new(
657                "session.status",
658                json!({"sessionID": session_id, "status":"cancelled"}),
659            ));
660            self.cancellations.remove(&session_id).await;
661            return Ok(());
662        }
663        let assistant = Message::new(
664            MessageRole::Assistant,
665            vec![MessagePart::Text {
666                text: completion.clone(),
667            }],
668        );
669        let assistant_message_id = assistant.id.clone();
670        self.storage.append_message(&session_id, assistant).await?;
671        let final_part = WireMessagePart::text(
672            &session_id,
673            &assistant_message_id,
674            truncate_text(&completion, 16_000),
675        );
676        self.event_bus.publish(EngineEvent::new(
677            "message.part.updated",
678            json!({"part": final_part}),
679        ));
680        self.event_bus.publish(EngineEvent::new(
681            "session.updated",
682            json!({"sessionID": session_id, "status":"idle"}),
683        ));
684        self.event_bus.publish(EngineEvent::new(
685            "session.status",
686            json!({"sessionID": session_id, "status":"idle"}),
687        ));
688        self.cancellations.remove(&session_id).await;
689        Ok(())
690    }
691
692    pub async fn run_oneshot(&self, prompt: String) -> anyhow::Result<String> {
693        self.providers.default_complete(&prompt).await
694    }
695
696    pub async fn run_oneshot_for_provider(
697        &self,
698        prompt: String,
699        provider_id: Option<&str>,
700    ) -> anyhow::Result<String> {
701        self.providers
702            .complete_for_provider(provider_id, &prompt)
703            .await
704    }
705
706    #[allow(clippy::too_many_arguments)]
707    async fn execute_tool_with_permission(
708        &self,
709        session_id: &str,
710        message_id: &str,
711        tool: String,
712        args: Value,
713        equipped_skills: Option<&[String]>,
714        latest_user_text: &str,
715        latest_assistant_context: Option<&str>,
716        cancel: CancellationToken,
717    ) -> anyhow::Result<Option<String>> {
718        let tool = normalize_tool_name(&tool);
719        let normalized = normalize_tool_args(
720            &tool,
721            args,
722            latest_user_text,
723            latest_assistant_context.unwrap_or_default(),
724        );
725        self.event_bus.publish(EngineEvent::new(
726            "tool.args.normalized",
727            json!({
728                "sessionID": session_id,
729                "messageID": message_id,
730                "tool": tool,
731                "argsSource": normalized.args_source,
732                "argsIntegrity": normalized.args_integrity,
733                "query": normalized.query,
734                "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
735                "requestID": Value::Null
736            }),
737        ));
738        if normalized.args_integrity == "recovered" {
739            self.event_bus.publish(EngineEvent::new(
740                "tool.args.recovered",
741                json!({
742                    "sessionID": session_id,
743                    "messageID": message_id,
744                    "tool": tool,
745                    "argsSource": normalized.args_source,
746                    "query": normalized.query,
747                    "queryHash": normalized.query.as_ref().map(|q| stable_hash(q)),
748                    "requestID": Value::Null
749                }),
750            ));
751        }
752        if normalized.missing_terminal {
753            self.event_bus.publish(EngineEvent::new(
754                "tool.args.missing_terminal",
755                json!({
756                    "sessionID": session_id,
757                    "messageID": message_id,
758                    "tool": tool,
759                    "argsSource": normalized.args_source,
760                    "argsIntegrity": normalized.args_integrity,
761                    "requestID": Value::Null,
762                    "error": "WEBSEARCH_QUERY_MISSING"
763                }),
764            ));
765            let mut failed_part =
766                WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
767            failed_part.state = Some("failed".to_string());
768            failed_part.error = Some("WEBSEARCH_QUERY_MISSING".to_string());
769            self.event_bus.publish(EngineEvent::new(
770                "message.part.updated",
771                json!({"part": failed_part}),
772            ));
773            return Ok(Some("WEBSEARCH_QUERY_MISSING".to_string()));
774        }
775
776        let args = match enforce_skill_scope(&tool, normalized.args, equipped_skills) {
777            Ok(args) => args,
778            Err(message) => return Ok(Some(message)),
779        };
780        if let Some(hook) = self.tool_policy_hook.read().await.clone() {
781            let decision = hook
782                .evaluate_tool(ToolPolicyContext {
783                    session_id: session_id.to_string(),
784                    message_id: message_id.to_string(),
785                    tool: tool.clone(),
786                    args: args.clone(),
787                })
788                .await?;
789            if !decision.allowed {
790                let reason = decision
791                    .reason
792                    .unwrap_or_else(|| "Tool denied by runtime policy".to_string());
793                let mut blocked_part =
794                    WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
795                blocked_part.state = Some("failed".to_string());
796                blocked_part.error = Some(reason.clone());
797                self.event_bus.publish(EngineEvent::new(
798                    "message.part.updated",
799                    json!({"part": blocked_part}),
800                ));
801                return Ok(Some(reason));
802            }
803        }
804        let mut tool_call_id: Option<String> = None;
805        if let Some(violation) = self
806            .workspace_sandbox_violation(session_id, &tool, &args)
807            .await
808        {
809            let mut blocked_part =
810                WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
811            blocked_part.state = Some("failed".to_string());
812            blocked_part.error = Some(violation.clone());
813            self.event_bus.publish(EngineEvent::new(
814                "message.part.updated",
815                json!({"part": blocked_part}),
816            ));
817            return Ok(Some(violation));
818        }
819        let rule = self
820            .plugins
821            .permission_override(&tool)
822            .await
823            .unwrap_or(self.permissions.evaluate(&tool, &tool).await);
824        if matches!(rule, PermissionAction::Deny) {
825            return Ok(Some(format!(
826                "Permission denied for tool `{tool}` by policy."
827            )));
828        }
829
830        let mut effective_args = args.clone();
831        if matches!(rule, PermissionAction::Ask) {
832            let pending = self
833                .permissions
834                .ask_for_session_with_context(
835                    Some(session_id),
836                    &tool,
837                    args.clone(),
838                    Some(crate::PermissionArgsContext {
839                        args_source: normalized.args_source.clone(),
840                        args_integrity: normalized.args_integrity.clone(),
841                        query: normalized.query.clone(),
842                    }),
843                )
844                .await;
845            let mut pending_part = WireMessagePart::tool_invocation(
846                session_id,
847                message_id,
848                tool.clone(),
849                args.clone(),
850            );
851            pending_part.id = Some(pending.id.clone());
852            tool_call_id = Some(pending.id.clone());
853            pending_part.state = Some("pending".to_string());
854            self.event_bus.publish(EngineEvent::new(
855                "message.part.updated",
856                json!({"part": pending_part}),
857            ));
858            let reply = self
859                .permissions
860                .wait_for_reply(&pending.id, cancel.clone())
861                .await;
862            if cancel.is_cancelled() {
863                return Ok(None);
864            }
865            let approved = matches!(reply.as_deref(), Some("once" | "always" | "allow"));
866            if !approved {
867                let mut denied_part =
868                    WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
869                denied_part.id = Some(pending.id);
870                denied_part.state = Some("denied".to_string());
871                denied_part.error = Some("Permission denied by user".to_string());
872                self.event_bus.publish(EngineEvent::new(
873                    "message.part.updated",
874                    json!({"part": denied_part}),
875                ));
876                return Ok(Some(format!(
877                    "Permission denied for tool `{tool}` by user."
878                )));
879            }
880            effective_args = args;
881        }
882
883        let args = self.plugins.inject_tool_args(&tool, effective_args).await;
884        let mut invoke_part =
885            WireMessagePart::tool_invocation(session_id, message_id, tool.clone(), args.clone());
886        if let Some(call_id) = tool_call_id.clone() {
887            invoke_part.id = Some(call_id);
888        }
889        let invoke_part_id = invoke_part.id.clone();
890        self.event_bus.publish(EngineEvent::new(
891            "message.part.updated",
892            json!({"part": invoke_part}),
893        ));
894        let args_for_side_events = args.clone();
895        if tool == "spawn_agent" {
896            let hook = self.spawn_agent_hook.read().await.clone();
897            if let Some(hook) = hook {
898                let spawned = hook
899                    .spawn_agent(SpawnAgentToolContext {
900                        session_id: session_id.to_string(),
901                        message_id: message_id.to_string(),
902                        tool_call_id: invoke_part_id.clone(),
903                        args: args_for_side_events.clone(),
904                    })
905                    .await?;
906                let output = self.plugins.transform_tool_output(spawned.output).await;
907                let output = truncate_text(&output, 16_000);
908                emit_tool_side_events(
909                    self.storage.clone(),
910                    &self.event_bus,
911                    session_id,
912                    message_id,
913                    &tool,
914                    &args_for_side_events,
915                    &spawned.metadata,
916                )
917                .await;
918                let mut result_part = WireMessagePart::tool_result(
919                    session_id,
920                    message_id,
921                    tool.clone(),
922                    json!(output.clone()),
923                );
924                result_part.id = invoke_part_id;
925                self.event_bus.publish(EngineEvent::new(
926                    "message.part.updated",
927                    json!({"part": result_part}),
928                ));
929                return Ok(Some(truncate_text(
930                    &format!("Tool `{tool}` result:\n{output}"),
931                    16_000,
932                )));
933            }
934            let output = "spawn_agent is unavailable in this runtime (no spawn hook installed).";
935            let mut failed_part =
936                WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
937            failed_part.id = invoke_part_id.clone();
938            failed_part.state = Some("failed".to_string());
939            failed_part.error = Some(output.to_string());
940            self.event_bus.publish(EngineEvent::new(
941                "message.part.updated",
942                json!({"part": failed_part}),
943            ));
944            return Ok(Some(output.to_string()));
945        }
946        let result = match self
947            .tools
948            .execute_with_cancel(&tool, args, cancel.clone())
949            .await
950        {
951            Ok(result) => result,
952            Err(err) => {
953                let mut failed_part =
954                    WireMessagePart::tool_result(session_id, message_id, tool.clone(), json!(null));
955                failed_part.id = invoke_part_id.clone();
956                failed_part.state = Some("failed".to_string());
957                failed_part.error = Some(err.to_string());
958                self.event_bus.publish(EngineEvent::new(
959                    "message.part.updated",
960                    json!({"part": failed_part}),
961                ));
962                return Err(err);
963            }
964        };
965        emit_tool_side_events(
966            self.storage.clone(),
967            &self.event_bus,
968            session_id,
969            message_id,
970            &tool,
971            &args_for_side_events,
972            &result.metadata,
973        )
974        .await;
975        let output = self.plugins.transform_tool_output(result.output).await;
976        let output = truncate_text(&output, 16_000);
977        let mut result_part = WireMessagePart::tool_result(
978            session_id,
979            message_id,
980            tool.clone(),
981            json!(output.clone()),
982        );
983        result_part.id = invoke_part_id;
984        self.event_bus.publish(EngineEvent::new(
985            "message.part.updated",
986            json!({"part": result_part}),
987        ));
988        Ok(Some(truncate_text(
989            &format!("Tool `{tool}` result:\n{output}"),
990            16_000,
991        )))
992    }
993
994    async fn find_recent_matching_user_message_id(
995        &self,
996        session_id: &str,
997        text: &str,
998    ) -> Option<String> {
999        let session = self.storage.get_session(session_id).await?;
1000        let last = session.messages.last()?;
1001        if !matches!(last.role, MessageRole::User) {
1002            return None;
1003        }
1004        let age_ms = (Utc::now() - last.created_at).num_milliseconds().max(0) as u64;
1005        if age_ms > 10_000 {
1006            return None;
1007        }
1008        let last_text = last
1009            .parts
1010            .iter()
1011            .filter_map(|part| match part {
1012                MessagePart::Text { text } => Some(text.clone()),
1013                _ => None,
1014            })
1015            .collect::<Vec<_>>()
1016            .join("\n");
1017        if last_text == text {
1018            return Some(last.id.clone());
1019        }
1020        None
1021    }
1022
1023    async fn auto_rename_session_from_user_text(&self, session_id: &str, fallback_text: &str) {
1024        let Some(mut session) = self.storage.get_session(session_id).await else {
1025            return;
1026        };
1027        if !title_needs_repair(&session.title) {
1028            return;
1029        }
1030
1031        let first_user_text = session.messages.iter().find_map(|message| {
1032            if !matches!(message.role, MessageRole::User) {
1033                return None;
1034            }
1035            message.parts.iter().find_map(|part| match part {
1036                MessagePart::Text { text } if !text.trim().is_empty() => Some(text.clone()),
1037                _ => None,
1038            })
1039        });
1040
1041        let source = first_user_text.unwrap_or_else(|| fallback_text.to_string());
1042        let Some(title) = derive_session_title_from_prompt(&source, 60) else {
1043            return;
1044        };
1045
1046        session.title = title;
1047        session.time.updated = Utc::now();
1048        let _ = self.storage.save_session(session).await;
1049    }
1050
1051    async fn workspace_sandbox_violation(
1052        &self,
1053        session_id: &str,
1054        tool: &str,
1055        args: &Value,
1056    ) -> Option<String> {
1057        if self.workspace_override_active(session_id).await {
1058            return None;
1059        }
1060        let session = self.storage.get_session(session_id).await?;
1061        let workspace = session
1062            .workspace_root
1063            .or_else(|| crate::normalize_workspace_path(&session.directory))?;
1064        let workspace_path = PathBuf::from(&workspace);
1065        let candidate_paths = extract_tool_candidate_paths(tool, args);
1066        if candidate_paths.is_empty() {
1067            return None;
1068        }
1069        let outside = candidate_paths
1070            .iter()
1071            .find(|path| !crate::is_within_workspace_root(Path::new(path), &workspace_path))?;
1072        Some(format!(
1073            "Sandbox blocked `{tool}` path `{outside}` (workspace root: `{workspace}`)"
1074        ))
1075    }
1076
1077    async fn workspace_override_active(&self, session_id: &str) -> bool {
1078        let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
1079        let mut overrides = self.workspace_overrides.write().await;
1080        overrides.retain(|_, expires_at| *expires_at > now);
1081        overrides
1082            .get(session_id)
1083            .map(|expires_at| *expires_at > now)
1084            .unwrap_or(false)
1085    }
1086
1087    async fn generate_final_narrative_without_tools(
1088        &self,
1089        session_id: &str,
1090        active_agent: &AgentDefinition,
1091        provider_hint: Option<&str>,
1092        cancel: CancellationToken,
1093        tool_outputs: &[String],
1094    ) -> Option<String> {
1095        if cancel.is_cancelled() {
1096            return None;
1097        }
1098        let mut messages = load_chat_history(self.storage.clone(), session_id).await;
1099        let mut system_parts = vec![tandem_runtime_system_prompt().to_string()];
1100        if let Some(system) = active_agent.system_prompt.as_ref() {
1101            system_parts.push(system.clone());
1102        }
1103        messages.insert(
1104            0,
1105            ChatMessage {
1106                role: "system".to_string(),
1107                content: system_parts.join("\n\n"),
1108            },
1109        );
1110        messages.push(ChatMessage {
1111            role: "user".to_string(),
1112            content: format!(
1113                "Tool observations:\n{}\n\nProvide a direct final answer now. Do not call tools.",
1114                summarize_tool_outputs(tool_outputs)
1115            ),
1116        });
1117        let stream = self
1118            .providers
1119            .stream_for_provider(provider_hint, messages, None, cancel.clone())
1120            .await
1121            .ok()?;
1122        tokio::pin!(stream);
1123        let mut completion = String::new();
1124        while let Some(chunk) = stream.next().await {
1125            if cancel.is_cancelled() {
1126                return None;
1127            }
1128            match chunk {
1129                Ok(StreamChunk::TextDelta(delta)) => completion.push_str(&delta),
1130                Ok(StreamChunk::Done { .. }) => break,
1131                Ok(_) => {}
1132                Err(_) => return None,
1133            }
1134        }
1135        let completion = truncate_text(&completion, 16_000);
1136        if completion.trim().is_empty() {
1137            None
1138        } else {
1139            Some(completion)
1140        }
1141    }
1142}
1143
1144fn truncate_text(input: &str, max_len: usize) -> String {
1145    if input.len() <= max_len {
1146        return input.to_string();
1147    }
1148    let mut out = input[..max_len].to_string();
1149    out.push_str("...<truncated>");
1150    out
1151}
1152
1153fn provider_error_code(error_text: &str) -> &'static str {
1154    let lower = error_text.to_lowercase();
1155    if lower.contains("invalid_function_parameters")
1156        || lower.contains("array schema missing items")
1157        || lower.contains("tool schema")
1158    {
1159        return "TOOL_SCHEMA_INVALID";
1160    }
1161    if lower.contains("rate limit") || lower.contains("too many requests") || lower.contains("429")
1162    {
1163        return "RATE_LIMIT_EXCEEDED";
1164    }
1165    if lower.contains("context length")
1166        || lower.contains("max tokens")
1167        || lower.contains("token limit")
1168    {
1169        return "CONTEXT_LENGTH_EXCEEDED";
1170    }
1171    if lower.contains("unauthorized")
1172        || lower.contains("authentication")
1173        || lower.contains("401")
1174        || lower.contains("403")
1175    {
1176        return "AUTHENTICATION_ERROR";
1177    }
1178    if lower.contains("timeout") || lower.contains("timed out") {
1179        return "TIMEOUT";
1180    }
1181    if lower.contains("server error")
1182        || lower.contains("500")
1183        || lower.contains("502")
1184        || lower.contains("503")
1185        || lower.contains("504")
1186    {
1187        return "PROVIDER_SERVER_ERROR";
1188    }
1189    "PROVIDER_REQUEST_FAILED"
1190}
1191
1192fn normalize_tool_name(name: &str) -> String {
1193    match name.trim().to_lowercase().replace('-', "_").as_str() {
1194        "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1195        other => other.to_string(),
1196    }
1197}
1198
1199fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1200    let Some(obj) = args.as_object() else {
1201        return Vec::new();
1202    };
1203    let keys: &[&str] = match tool {
1204        "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1205        "glob" => &["pattern"],
1206        "lsp" => &["filePath", "path"],
1207        "bash" => &["cwd"],
1208        "apply_patch" => &[],
1209        _ => &["path", "cwd"],
1210    };
1211    keys.iter()
1212        .filter_map(|key| obj.get(*key))
1213        .filter_map(|value| value.as_str())
1214        .filter(|s| !s.trim().is_empty())
1215        .map(ToString::to_string)
1216        .collect()
1217}
1218
1219fn agent_can_use_tool(agent: &AgentDefinition, tool_name: &str) -> bool {
1220    let target = normalize_tool_name(tool_name);
1221    match agent.tools.as_ref() {
1222        None => true,
1223        Some(list) => list.iter().any(|t| normalize_tool_name(t) == target),
1224    }
1225}
1226
1227fn enforce_skill_scope(
1228    tool_name: &str,
1229    args: Value,
1230    equipped_skills: Option<&[String]>,
1231) -> Result<Value, String> {
1232    if normalize_tool_name(tool_name) != "skill" {
1233        return Ok(args);
1234    }
1235    let Some(configured) = equipped_skills else {
1236        return Ok(args);
1237    };
1238
1239    let mut allowed = configured
1240        .iter()
1241        .map(|s| s.trim().to_string())
1242        .filter(|s| !s.is_empty())
1243        .collect::<Vec<_>>();
1244    if allowed
1245        .iter()
1246        .any(|s| s == "*" || s.eq_ignore_ascii_case("all"))
1247    {
1248        return Ok(args);
1249    }
1250    allowed.sort();
1251    allowed.dedup();
1252    if allowed.is_empty() {
1253        return Err("No skills are equipped for this agent.".to_string());
1254    }
1255
1256    let requested = args
1257        .get("name")
1258        .and_then(|v| v.as_str())
1259        .map(|v| v.trim().to_string())
1260        .unwrap_or_default();
1261    if !requested.is_empty() && !allowed.iter().any(|s| s == &requested) {
1262        return Err(format!(
1263            "Skill '{}' is not equipped for this agent. Equipped skills: {}",
1264            requested,
1265            allowed.join(", ")
1266        ));
1267    }
1268
1269    let mut out = if let Some(obj) = args.as_object() {
1270        Value::Object(obj.clone())
1271    } else {
1272        json!({})
1273    };
1274    if let Some(obj) = out.as_object_mut() {
1275        obj.insert("allowed_skills".to_string(), json!(allowed));
1276    }
1277    Ok(out)
1278}
1279
1280fn is_read_only_tool(tool_name: &str) -> bool {
1281    matches!(
1282        normalize_tool_name(tool_name).as_str(),
1283        "glob"
1284            | "read"
1285            | "grep"
1286            | "search"
1287            | "codesearch"
1288            | "list"
1289            | "ls"
1290            | "lsp"
1291            | "websearch"
1292            | "webfetch_document"
1293    )
1294}
1295
1296fn tool_budget_for(tool_name: &str) -> usize {
1297    match normalize_tool_name(tool_name).as_str() {
1298        "glob" => 4,
1299        "read" => 8,
1300        "websearch" => 3,
1301        "grep" | "search" | "codesearch" => 6,
1302        _ => 10,
1303    }
1304}
1305
1306#[derive(Debug, Clone)]
1307struct NormalizedToolArgs {
1308    args: Value,
1309    args_source: String,
1310    args_integrity: String,
1311    query: Option<String>,
1312    missing_terminal: bool,
1313}
1314
1315fn normalize_tool_args(
1316    tool_name: &str,
1317    raw_args: Value,
1318    latest_user_text: &str,
1319    latest_assistant_context: &str,
1320) -> NormalizedToolArgs {
1321    let normalized_tool = normalize_tool_name(tool_name);
1322    let mut args = raw_args;
1323    let mut args_source = if args.is_string() {
1324        "provider_string".to_string()
1325    } else {
1326        "provider_json".to_string()
1327    };
1328    let mut args_integrity = "ok".to_string();
1329    let mut query = None;
1330    let mut missing_terminal = false;
1331
1332    if normalized_tool == "websearch" {
1333        if let Some(found) = extract_websearch_query(&args) {
1334            query = Some(found);
1335            args = set_websearch_query_and_source(args, query.clone(), "tool_args");
1336        } else if let Some(inferred) = infer_websearch_query_from_text(latest_user_text) {
1337            args_source = "inferred_from_user".to_string();
1338            args_integrity = "recovered".to_string();
1339            query = Some(inferred);
1340            args = set_websearch_query_and_source(args, query.clone(), "inferred_from_user");
1341        } else if let Some(recovered) = infer_websearch_query_from_text(latest_assistant_context) {
1342            args_source = "recovered_from_context".to_string();
1343            args_integrity = "recovered".to_string();
1344            query = Some(recovered);
1345            args = set_websearch_query_and_source(args, query.clone(), "recovered_from_context");
1346        } else {
1347            args_source = "missing".to_string();
1348            args_integrity = "empty".to_string();
1349            missing_terminal = true;
1350        }
1351    }
1352
1353    NormalizedToolArgs {
1354        args,
1355        args_source,
1356        args_integrity,
1357        query,
1358        missing_terminal,
1359    }
1360}
1361
1362fn set_websearch_query_and_source(args: Value, query: Option<String>, query_source: &str) -> Value {
1363    let mut obj = args.as_object().cloned().unwrap_or_default();
1364    if let Some(q) = query {
1365        obj.insert("query".to_string(), Value::String(q));
1366    }
1367    obj.insert(
1368        "__query_source".to_string(),
1369        Value::String(query_source.to_string()),
1370    );
1371    Value::Object(obj)
1372}
1373
1374fn extract_websearch_query(args: &Value) -> Option<String> {
1375    const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
1376    for key in QUERY_KEYS {
1377        if let Some(value) = args.get(key).and_then(|v| v.as_str()) {
1378            let trimmed = value.trim();
1379            if !trimmed.is_empty() {
1380                return Some(trimmed.to_string());
1381            }
1382        }
1383    }
1384    for container in ["arguments", "args", "input", "params"] {
1385        if let Some(obj) = args.get(container) {
1386            for key in QUERY_KEYS {
1387                if let Some(value) = obj.get(key).and_then(|v| v.as_str()) {
1388                    let trimmed = value.trim();
1389                    if !trimmed.is_empty() {
1390                        return Some(trimmed.to_string());
1391                    }
1392                }
1393            }
1394        }
1395    }
1396    args.as_str()
1397        .map(str::trim)
1398        .filter(|s| !s.is_empty())
1399        .map(ToString::to_string)
1400}
1401
1402fn infer_websearch_query_from_text(text: &str) -> Option<String> {
1403    let trimmed = text.trim();
1404    if trimmed.is_empty() {
1405        return None;
1406    }
1407
1408    let lower = trimmed.to_lowercase();
1409    const PREFIXES: [&str; 11] = [
1410        "web search",
1411        "websearch",
1412        "search web for",
1413        "search web",
1414        "search for",
1415        "search",
1416        "look up",
1417        "lookup",
1418        "find",
1419        "web lookup",
1420        "query",
1421    ];
1422
1423    let mut candidate = trimmed;
1424    for prefix in PREFIXES {
1425        if lower.starts_with(prefix) && lower.len() >= prefix.len() {
1426            let remainder = trimmed[prefix.len()..]
1427                .trim_start_matches(|c: char| c.is_whitespace() || c == ':' || c == '-');
1428            candidate = remainder;
1429            break;
1430        }
1431    }
1432
1433    let normalized = candidate
1434        .trim()
1435        .trim_matches(|c: char| c == '"' || c == '\'' || c.is_whitespace())
1436        .trim_matches(|c: char| matches!(c, '.' | ',' | '!' | '?'))
1437        .trim()
1438        .to_string();
1439
1440    if normalized.split_whitespace().count() < 2 {
1441        return None;
1442    }
1443    Some(normalized)
1444}
1445
1446fn tool_signature(tool_name: &str, args: &Value) -> String {
1447    let normalized = normalize_tool_name(tool_name);
1448    if normalized == "websearch" {
1449        let query = extract_websearch_query(args)
1450            .unwrap_or_default()
1451            .to_lowercase();
1452        let limit = args
1453            .get("limit")
1454            .or_else(|| args.get("numResults"))
1455            .or_else(|| args.get("num_results"))
1456            .and_then(|v| v.as_u64())
1457            .unwrap_or(8);
1458        let domains = args
1459            .get("domains")
1460            .or_else(|| args.get("domain"))
1461            .map(|v| v.to_string())
1462            .unwrap_or_default();
1463        let recency = args.get("recency").and_then(|v| v.as_u64()).unwrap_or(0);
1464        return format!("websearch:q={query}|limit={limit}|domains={domains}|recency={recency}");
1465    }
1466    format!("{}:{}", normalized, args)
1467}
1468
1469fn stable_hash(input: &str) -> String {
1470    let mut hasher = DefaultHasher::new();
1471    input.hash(&mut hasher);
1472    format!("{:016x}", hasher.finish())
1473}
1474
1475fn summarize_tool_outputs(outputs: &[String]) -> String {
1476    outputs
1477        .iter()
1478        .take(6)
1479        .map(|output| truncate_text(output, 600))
1480        .collect::<Vec<_>>()
1481        .join("\n\n")
1482}
1483
1484fn tandem_runtime_system_prompt() -> &'static str {
1485    "You are operating inside Tandem (Desktop/TUI) as an engine-backed coding assistant.
1486Use tool calls to inspect and modify the workspace when needed instead of asking the user
1487to manually run basic discovery steps. Permission prompts may occur for some tools; if
1488a tool is denied or blocked, explain what was blocked and suggest a concrete next step."
1489}
1490
1491fn should_force_workspace_probe(user_text: &str, completion: &str) -> bool {
1492    let user = user_text.to_lowercase();
1493    let reply = completion.to_lowercase();
1494
1495    let asked_for_project_context = [
1496        "what is this project",
1497        "what's this project",
1498        "explain this project",
1499        "analyze this project",
1500        "inspect this project",
1501        "look at the project",
1502        "use glob",
1503        "run glob",
1504    ]
1505    .iter()
1506    .any(|needle| user.contains(needle));
1507
1508    if !asked_for_project_context {
1509        return false;
1510    }
1511
1512    let assistant_claimed_no_access = [
1513        "can't inspect",
1514        "cannot inspect",
1515        "don't have visibility",
1516        "haven't been able to inspect",
1517        "i don't know what this project is",
1518        "need your help to",
1519        "sandbox",
1520        "system restriction",
1521    ]
1522    .iter()
1523    .any(|needle| reply.contains(needle));
1524
1525    // If the user is explicitly asking for project inspection and the model replies with
1526    // a no-access narrative instead of making a tool call, force a minimal read-only probe.
1527    asked_for_project_context && assistant_claimed_no_access
1528}
1529
1530fn parse_tool_invocation(input: &str) -> Option<(String, serde_json::Value)> {
1531    let raw = input.trim();
1532    if !raw.starts_with("/tool ") {
1533        return None;
1534    }
1535    let rest = raw.trim_start_matches("/tool ").trim();
1536    let mut split = rest.splitn(2, ' ');
1537    let tool = normalize_tool_name(split.next()?.trim());
1538    let args = split
1539        .next()
1540        .and_then(|v| serde_json::from_str::<serde_json::Value>(v).ok())
1541        .unwrap_or_else(|| json!({}));
1542    Some((tool, args))
1543}
1544
1545fn parse_tool_invocations_from_response(input: &str) -> Vec<(String, serde_json::Value)> {
1546    let trimmed = input.trim();
1547    if trimmed.is_empty() {
1548        return Vec::new();
1549    }
1550
1551    if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
1552        if let Some(found) = extract_tool_call_from_value(&parsed) {
1553            return vec![found];
1554        }
1555    }
1556
1557    if let Some(block) = extract_first_json_object(trimmed) {
1558        if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&block) {
1559            if let Some(found) = extract_tool_call_from_value(&parsed) {
1560                return vec![found];
1561            }
1562        }
1563    }
1564
1565    parse_function_style_tool_calls(trimmed)
1566}
1567
1568#[cfg(test)]
1569fn parse_tool_invocation_from_response(input: &str) -> Option<(String, serde_json::Value)> {
1570    parse_tool_invocations_from_response(input)
1571        .into_iter()
1572        .next()
1573}
1574
1575fn parse_function_style_tool_calls(input: &str) -> Vec<(String, Value)> {
1576    let mut calls = Vec::new();
1577    let lower = input.to_lowercase();
1578    let names = [
1579        "todo_write",
1580        "todowrite",
1581        "update_todo_list",
1582        "update_todos",
1583    ];
1584    let mut cursor = 0usize;
1585
1586    while cursor < lower.len() {
1587        let mut best: Option<(usize, &str)> = None;
1588        for name in names {
1589            let needle = format!("{name}(");
1590            if let Some(rel_idx) = lower[cursor..].find(&needle) {
1591                let idx = cursor + rel_idx;
1592                if best.as_ref().is_none_or(|(best_idx, _)| idx < *best_idx) {
1593                    best = Some((idx, name));
1594                }
1595            }
1596        }
1597
1598        let Some((tool_start, tool_name)) = best else {
1599            break;
1600        };
1601
1602        let open_paren = tool_start + tool_name.len();
1603        if let Some(close_paren) = find_matching_paren(input, open_paren) {
1604            if let Some(args_text) = input.get(open_paren + 1..close_paren) {
1605                let args = parse_function_style_args(args_text.trim());
1606                calls.push((normalize_tool_name(tool_name), Value::Object(args)));
1607            }
1608            cursor = close_paren.saturating_add(1);
1609        } else {
1610            cursor = tool_start.saturating_add(tool_name.len());
1611        }
1612    }
1613
1614    calls
1615}
1616
1617fn find_matching_paren(input: &str, open_paren: usize) -> Option<usize> {
1618    if input.as_bytes().get(open_paren).copied()? != b'(' {
1619        return None;
1620    }
1621
1622    let mut depth = 0usize;
1623    let mut in_single = false;
1624    let mut in_double = false;
1625    let mut escaped = false;
1626
1627    for (offset, ch) in input.get(open_paren..)?.char_indices() {
1628        if escaped {
1629            escaped = false;
1630            continue;
1631        }
1632        if ch == '\\' && (in_single || in_double) {
1633            escaped = true;
1634            continue;
1635        }
1636        if ch == '\'' && !in_double {
1637            in_single = !in_single;
1638            continue;
1639        }
1640        if ch == '"' && !in_single {
1641            in_double = !in_double;
1642            continue;
1643        }
1644        if in_single || in_double {
1645            continue;
1646        }
1647
1648        match ch {
1649            '(' => depth += 1,
1650            ')' => {
1651                depth = depth.saturating_sub(1);
1652                if depth == 0 {
1653                    return Some(open_paren + offset);
1654                }
1655            }
1656            _ => {}
1657        }
1658    }
1659
1660    None
1661}
1662
1663fn parse_function_style_args(input: &str) -> Map<String, Value> {
1664    let mut args = Map::new();
1665    if input.trim().is_empty() {
1666        return args;
1667    }
1668
1669    let mut parts = Vec::<String>::new();
1670    let mut current = String::new();
1671    let mut in_single = false;
1672    let mut in_double = false;
1673    let mut escaped = false;
1674    let mut depth_paren = 0usize;
1675    let mut depth_bracket = 0usize;
1676    let mut depth_brace = 0usize;
1677
1678    for ch in input.chars() {
1679        if escaped {
1680            current.push(ch);
1681            escaped = false;
1682            continue;
1683        }
1684        if ch == '\\' && (in_single || in_double) {
1685            current.push(ch);
1686            escaped = true;
1687            continue;
1688        }
1689        if ch == '\'' && !in_double {
1690            in_single = !in_single;
1691            current.push(ch);
1692            continue;
1693        }
1694        if ch == '"' && !in_single {
1695            in_double = !in_double;
1696            current.push(ch);
1697            continue;
1698        }
1699        if in_single || in_double {
1700            current.push(ch);
1701            continue;
1702        }
1703
1704        match ch {
1705            '(' => depth_paren += 1,
1706            ')' => depth_paren = depth_paren.saturating_sub(1),
1707            '[' => depth_bracket += 1,
1708            ']' => depth_bracket = depth_bracket.saturating_sub(1),
1709            '{' => depth_brace += 1,
1710            '}' => depth_brace = depth_brace.saturating_sub(1),
1711            ',' if depth_paren == 0 && depth_bracket == 0 && depth_brace == 0 => {
1712                let part = current.trim();
1713                if !part.is_empty() {
1714                    parts.push(part.to_string());
1715                }
1716                current.clear();
1717                continue;
1718            }
1719            _ => {}
1720        }
1721        current.push(ch);
1722    }
1723    let tail = current.trim();
1724    if !tail.is_empty() {
1725        parts.push(tail.to_string());
1726    }
1727
1728    for part in parts {
1729        let Some((raw_key, raw_value)) = part
1730            .split_once('=')
1731            .or_else(|| part.split_once(':'))
1732            .map(|(k, v)| (k.trim(), v.trim()))
1733        else {
1734            continue;
1735        };
1736        let key = raw_key.trim_matches(|c| c == '"' || c == '\'' || c == '`');
1737        if key.is_empty() {
1738            continue;
1739        }
1740        let value = parse_scalar_like_value(raw_value);
1741        args.insert(key.to_string(), value);
1742    }
1743
1744    args
1745}
1746
1747fn parse_scalar_like_value(raw: &str) -> Value {
1748    let trimmed = raw.trim();
1749    if trimmed.is_empty() {
1750        return Value::Null;
1751    }
1752
1753    if (trimmed.starts_with('"') && trimmed.ends_with('"'))
1754        || (trimmed.starts_with('\'') && trimmed.ends_with('\''))
1755    {
1756        return Value::String(trimmed[1..trimmed.len().saturating_sub(1)].to_string());
1757    }
1758
1759    if trimmed.eq_ignore_ascii_case("true") {
1760        return Value::Bool(true);
1761    }
1762    if trimmed.eq_ignore_ascii_case("false") {
1763        return Value::Bool(false);
1764    }
1765    if trimmed.eq_ignore_ascii_case("null") {
1766        return Value::Null;
1767    }
1768
1769    if let Ok(v) = serde_json::from_str::<Value>(trimmed) {
1770        return v;
1771    }
1772    if let Ok(v) = trimmed.parse::<i64>() {
1773        return Value::Number(Number::from(v));
1774    }
1775    if let Ok(v) = trimmed.parse::<f64>() {
1776        if let Some(n) = Number::from_f64(v) {
1777            return Value::Number(n);
1778        }
1779    }
1780
1781    Value::String(trimmed.to_string())
1782}
1783
1784fn normalize_todo_write_args(args: Value, completion: &str) -> Value {
1785    if is_todo_status_update_args(&args) {
1786        return args;
1787    }
1788
1789    let mut obj = match args {
1790        Value::Object(map) => map,
1791        Value::Array(items) => {
1792            return json!({ "todos": normalize_todo_arg_items(items) });
1793        }
1794        Value::String(text) => {
1795            let derived = extract_todo_candidates_from_text(&text);
1796            if !derived.is_empty() {
1797                return json!({ "todos": derived });
1798            }
1799            return json!({});
1800        }
1801        _ => return json!({}),
1802    };
1803
1804    if obj
1805        .get("todos")
1806        .and_then(|v| v.as_array())
1807        .map(|arr| !arr.is_empty())
1808        .unwrap_or(false)
1809    {
1810        return Value::Object(obj);
1811    }
1812
1813    for alias in ["tasks", "items", "list", "checklist"] {
1814        if let Some(items) = obj.get(alias).and_then(|v| v.as_array()) {
1815            let normalized = normalize_todo_arg_items(items.clone());
1816            if !normalized.is_empty() {
1817                obj.insert("todos".to_string(), Value::Array(normalized));
1818                return Value::Object(obj);
1819            }
1820        }
1821    }
1822
1823    let derived = extract_todo_candidates_from_text(completion);
1824    if !derived.is_empty() {
1825        obj.insert("todos".to_string(), Value::Array(derived));
1826    }
1827    Value::Object(obj)
1828}
1829
1830fn normalize_todo_arg_items(items: Vec<Value>) -> Vec<Value> {
1831    items
1832        .into_iter()
1833        .filter_map(|item| match item {
1834            Value::String(text) => {
1835                let content = text.trim();
1836                if content.is_empty() {
1837                    None
1838                } else {
1839                    Some(json!({"content": content}))
1840                }
1841            }
1842            Value::Object(mut obj) => {
1843                if !obj.contains_key("content") {
1844                    if let Some(text) = obj.get("text").cloned() {
1845                        obj.insert("content".to_string(), text);
1846                    } else if let Some(title) = obj.get("title").cloned() {
1847                        obj.insert("content".to_string(), title);
1848                    } else if let Some(name) = obj.get("name").cloned() {
1849                        obj.insert("content".to_string(), name);
1850                    }
1851                }
1852                let content = obj
1853                    .get("content")
1854                    .and_then(|v| v.as_str())
1855                    .map(str::trim)
1856                    .unwrap_or("");
1857                if content.is_empty() {
1858                    None
1859                } else {
1860                    Some(Value::Object(obj))
1861                }
1862            }
1863            _ => None,
1864        })
1865        .collect()
1866}
1867
1868fn is_todo_status_update_args(args: &Value) -> bool {
1869    let Some(obj) = args.as_object() else {
1870        return false;
1871    };
1872    let has_status = obj
1873        .get("status")
1874        .and_then(|v| v.as_str())
1875        .map(|s| !s.trim().is_empty())
1876        .unwrap_or(false);
1877    let has_target =
1878        obj.get("task_id").is_some() || obj.get("todo_id").is_some() || obj.get("id").is_some();
1879    has_status && has_target
1880}
1881
1882fn is_empty_todo_write_args(args: &Value) -> bool {
1883    if is_todo_status_update_args(args) {
1884        return false;
1885    }
1886    let Some(obj) = args.as_object() else {
1887        return true;
1888    };
1889    !obj.get("todos")
1890        .and_then(|v| v.as_array())
1891        .map(|arr| !arr.is_empty())
1892        .unwrap_or(false)
1893}
1894
1895fn parse_streamed_tool_args(tool_name: &str, raw_args: &str) -> Value {
1896    let trimmed = raw_args.trim();
1897    if trimmed.is_empty() {
1898        return json!({});
1899    }
1900
1901    if let Ok(parsed) = serde_json::from_str::<Value>(trimmed) {
1902        return normalize_streamed_tool_args(tool_name, parsed, trimmed);
1903    }
1904
1905    // Some providers emit non-JSON argument text (for example: raw query strings
1906    // or key=value fragments). Recover the common forms instead of dropping to {}.
1907    let kv_args = parse_function_style_args(trimmed);
1908    if !kv_args.is_empty() {
1909        return normalize_streamed_tool_args(tool_name, Value::Object(kv_args), trimmed);
1910    }
1911
1912    if normalize_tool_name(tool_name) == "websearch" {
1913        return json!({ "query": trimmed });
1914    }
1915
1916    json!({})
1917}
1918
1919fn normalize_streamed_tool_args(tool_name: &str, parsed: Value, raw: &str) -> Value {
1920    let normalized_tool = normalize_tool_name(tool_name);
1921    if normalized_tool != "websearch" {
1922        return parsed;
1923    }
1924
1925    match parsed {
1926        Value::Object(mut obj) => {
1927            if !has_websearch_query(&obj) && !raw.trim().is_empty() {
1928                obj.insert("query".to_string(), Value::String(raw.trim().to_string()));
1929            }
1930            Value::Object(obj)
1931        }
1932        Value::String(s) => {
1933            let q = s.trim();
1934            if q.is_empty() {
1935                json!({})
1936            } else {
1937                json!({ "query": q })
1938            }
1939        }
1940        other => other,
1941    }
1942}
1943
1944fn has_websearch_query(obj: &Map<String, Value>) -> bool {
1945    const QUERY_KEYS: [&str; 5] = ["query", "q", "search_query", "searchQuery", "keywords"];
1946    QUERY_KEYS.iter().any(|key| {
1947        obj.get(*key)
1948            .and_then(|v| v.as_str())
1949            .map(|s| !s.trim().is_empty())
1950            .unwrap_or(false)
1951    })
1952}
1953
1954fn extract_tool_call_from_value(value: &Value) -> Option<(String, Value)> {
1955    if let Some(obj) = value.as_object() {
1956        if let Some(tool) = obj.get("tool").and_then(|v| v.as_str()) {
1957            return Some((
1958                normalize_tool_name(tool),
1959                obj.get("args").cloned().unwrap_or_else(|| json!({})),
1960            ));
1961        }
1962
1963        if let Some(tool) = obj.get("name").and_then(|v| v.as_str()) {
1964            let args = obj
1965                .get("args")
1966                .cloned()
1967                .or_else(|| obj.get("arguments").cloned())
1968                .unwrap_or_else(|| json!({}));
1969            let normalized_tool = normalize_tool_name(tool);
1970            let args = if let Some(raw) = args.as_str() {
1971                parse_streamed_tool_args(&normalized_tool, raw)
1972            } else {
1973                args
1974            };
1975            return Some((normalized_tool, args));
1976        }
1977
1978        for key in [
1979            "tool_call",
1980            "toolCall",
1981            "call",
1982            "function_call",
1983            "functionCall",
1984        ] {
1985            if let Some(nested) = obj.get(key) {
1986                if let Some(found) = extract_tool_call_from_value(nested) {
1987                    return Some(found);
1988                }
1989            }
1990        }
1991    }
1992
1993    if let Some(items) = value.as_array() {
1994        for item in items {
1995            if let Some(found) = extract_tool_call_from_value(item) {
1996                return Some(found);
1997            }
1998        }
1999    }
2000
2001    None
2002}
2003
2004fn extract_first_json_object(input: &str) -> Option<String> {
2005    let mut start = None;
2006    let mut depth = 0usize;
2007    for (idx, ch) in input.char_indices() {
2008        if ch == '{' {
2009            if start.is_none() {
2010                start = Some(idx);
2011            }
2012            depth += 1;
2013        } else if ch == '}' {
2014            if depth == 0 {
2015                continue;
2016            }
2017            depth -= 1;
2018            if depth == 0 {
2019                let begin = start?;
2020                let block = input.get(begin..=idx)?;
2021                return Some(block.to_string());
2022            }
2023        }
2024    }
2025    None
2026}
2027
2028fn extract_todo_candidates_from_text(input: &str) -> Vec<Value> {
2029    let mut seen = HashSet::<String>::new();
2030    let mut todos = Vec::new();
2031
2032    for raw_line in input.lines() {
2033        let mut line = raw_line.trim();
2034        let mut structured_line = false;
2035        if line.is_empty() {
2036            continue;
2037        }
2038        if line.starts_with("```") {
2039            continue;
2040        }
2041        if line.ends_with(':') {
2042            continue;
2043        }
2044        if let Some(rest) = line
2045            .strip_prefix("- [ ]")
2046            .or_else(|| line.strip_prefix("* [ ]"))
2047            .or_else(|| line.strip_prefix("- [x]"))
2048            .or_else(|| line.strip_prefix("* [x]"))
2049        {
2050            line = rest.trim();
2051            structured_line = true;
2052        } else if let Some(rest) = line.strip_prefix("- ").or_else(|| line.strip_prefix("* ")) {
2053            line = rest.trim();
2054            structured_line = true;
2055        } else {
2056            let bytes = line.as_bytes();
2057            let mut i = 0usize;
2058            while i < bytes.len() && bytes[i].is_ascii_digit() {
2059                i += 1;
2060            }
2061            if i > 0 && i + 1 < bytes.len() && (bytes[i] == b'.' || bytes[i] == b')') {
2062                line = line[i + 1..].trim();
2063                structured_line = true;
2064            }
2065        }
2066        if !structured_line {
2067            continue;
2068        }
2069
2070        let content = line.trim_matches(|c: char| c.is_whitespace() || c == '-' || c == '*');
2071        if content.len() < 5 || content.len() > 180 {
2072            continue;
2073        }
2074        let key = content.to_lowercase();
2075        if seen.contains(&key) {
2076            continue;
2077        }
2078        seen.insert(key);
2079        todos.push(json!({ "content": content }));
2080        if todos.len() >= 25 {
2081            break;
2082        }
2083    }
2084
2085    todos
2086}
2087
2088async fn emit_plan_todo_fallback(
2089    storage: std::sync::Arc<Storage>,
2090    bus: &EventBus,
2091    session_id: &str,
2092    message_id: &str,
2093    completion: &str,
2094) {
2095    let todos = extract_todo_candidates_from_text(completion);
2096    if todos.is_empty() {
2097        return;
2098    }
2099
2100    let invoke_part = WireMessagePart::tool_invocation(
2101        session_id,
2102        message_id,
2103        "todo_write",
2104        json!({"todos": todos.clone()}),
2105    );
2106    let call_id = invoke_part.id.clone();
2107    bus.publish(EngineEvent::new(
2108        "message.part.updated",
2109        json!({"part": invoke_part}),
2110    ));
2111
2112    if storage.set_todos(session_id, todos).await.is_err() {
2113        let mut failed_part =
2114            WireMessagePart::tool_result(session_id, message_id, "todo_write", json!(null));
2115        failed_part.id = call_id;
2116        failed_part.state = Some("failed".to_string());
2117        failed_part.error = Some("failed to persist plan todos".to_string());
2118        bus.publish(EngineEvent::new(
2119            "message.part.updated",
2120            json!({"part": failed_part}),
2121        ));
2122        return;
2123    }
2124
2125    let normalized = storage.get_todos(session_id).await;
2126    let mut result_part = WireMessagePart::tool_result(
2127        session_id,
2128        message_id,
2129        "todo_write",
2130        json!({ "todos": normalized }),
2131    );
2132    result_part.id = call_id;
2133    bus.publish(EngineEvent::new(
2134        "message.part.updated",
2135        json!({"part": result_part}),
2136    ));
2137    bus.publish(EngineEvent::new(
2138        "todo.updated",
2139        json!({
2140            "sessionID": session_id,
2141            "todos": normalized
2142        }),
2143    ));
2144}
2145
2146async fn emit_plan_question_fallback(
2147    storage: std::sync::Arc<Storage>,
2148    bus: &EventBus,
2149    session_id: &str,
2150    message_id: &str,
2151    completion: &str,
2152) {
2153    let trimmed = completion.trim();
2154    if trimmed.is_empty() {
2155        return;
2156    }
2157
2158    let hints = extract_todo_candidates_from_text(trimmed)
2159        .into_iter()
2160        .take(6)
2161        .filter_map(|v| {
2162            v.get("content")
2163                .and_then(|c| c.as_str())
2164                .map(ToString::to_string)
2165        })
2166        .collect::<Vec<_>>();
2167
2168    let mut options = hints
2169        .iter()
2170        .map(|label| json!({"label": label, "description": "Use this as a starting task"}))
2171        .collect::<Vec<_>>();
2172    if options.is_empty() {
2173        options = vec![
2174            json!({"label":"Define scope", "description":"Clarify the intended outcome"}),
2175            json!({"label":"Provide constraints", "description":"Budget, timeline, and constraints"}),
2176            json!({"label":"Draft a starter list", "description":"Generate a first-pass task list"}),
2177        ];
2178    }
2179
2180    let question_payload = vec![json!({
2181        "header":"Planning Input",
2182        "question":"I couldn't produce a concrete task list yet. Which tasks should I include first?",
2183        "options": options,
2184        "multiple": true,
2185        "custom": true
2186    })];
2187
2188    let request = storage
2189        .add_question_request(session_id, message_id, question_payload.clone())
2190        .await
2191        .ok();
2192    bus.publish(EngineEvent::new(
2193        "question.asked",
2194        json!({
2195            "id": request
2196                .as_ref()
2197                .map(|req| req.id.clone())
2198                .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
2199            "sessionID": session_id,
2200            "messageID": message_id,
2201            "questions": question_payload,
2202            "tool": request.and_then(|req| {
2203                req.tool.map(|tool| {
2204                    json!({
2205                        "callID": tool.call_id,
2206                        "messageID": tool.message_id
2207                    })
2208                })
2209            })
2210        }),
2211    ));
2212}
2213
2214async fn load_chat_history(storage: std::sync::Arc<Storage>, session_id: &str) -> Vec<ChatMessage> {
2215    let Some(session) = storage.get_session(session_id).await else {
2216        return Vec::new();
2217    };
2218    let messages = session
2219        .messages
2220        .into_iter()
2221        .map(|m| {
2222            let role = format!("{:?}", m.role).to_lowercase();
2223            let content = m
2224                .parts
2225                .into_iter()
2226                .map(|part| match part {
2227                    MessagePart::Text { text } => text,
2228                    MessagePart::Reasoning { text } => text,
2229                    MessagePart::ToolInvocation { tool, result, .. } => {
2230                        format!("Tool {tool} => {}", result.unwrap_or_else(|| json!({})))
2231                    }
2232                })
2233                .collect::<Vec<_>>()
2234                .join("\n");
2235            ChatMessage { role, content }
2236        })
2237        .collect::<Vec<_>>();
2238    compact_chat_history(messages)
2239}
2240
2241async fn emit_tool_side_events(
2242    storage: std::sync::Arc<Storage>,
2243    bus: &EventBus,
2244    session_id: &str,
2245    message_id: &str,
2246    tool: &str,
2247    args: &serde_json::Value,
2248    metadata: &serde_json::Value,
2249) {
2250    if tool == "todo_write" {
2251        let todos_from_metadata = metadata
2252            .get("todos")
2253            .and_then(|v| v.as_array())
2254            .cloned()
2255            .unwrap_or_default();
2256
2257        if !todos_from_metadata.is_empty() {
2258            let _ = storage.set_todos(session_id, todos_from_metadata).await;
2259        } else {
2260            let current = storage.get_todos(session_id).await;
2261            if let Some(updated) = apply_todo_updates_from_args(current, args) {
2262                let _ = storage.set_todos(session_id, updated).await;
2263            }
2264        }
2265
2266        let normalized = storage.get_todos(session_id).await;
2267        bus.publish(EngineEvent::new(
2268            "todo.updated",
2269            json!({
2270                "sessionID": session_id,
2271                "todos": normalized
2272            }),
2273        ));
2274    }
2275    if tool == "question" {
2276        let questions = metadata
2277            .get("questions")
2278            .and_then(|v| v.as_array())
2279            .cloned()
2280            .unwrap_or_default();
2281        let request = storage
2282            .add_question_request(session_id, message_id, questions.clone())
2283            .await
2284            .ok();
2285        bus.publish(EngineEvent::new(
2286            "question.asked",
2287            json!({
2288                "id": request
2289                    .as_ref()
2290                    .map(|req| req.id.clone())
2291                    .unwrap_or_else(|| format!("q-{}", uuid::Uuid::new_v4())),
2292                "sessionID": session_id,
2293                "messageID": message_id,
2294                "questions": questions,
2295                "tool": request.and_then(|req| {
2296                    req.tool.map(|tool| {
2297                        json!({
2298                            "callID": tool.call_id,
2299                            "messageID": tool.message_id
2300                        })
2301                    })
2302                })
2303            }),
2304        ));
2305    }
2306}
2307
2308fn apply_todo_updates_from_args(current: Vec<Value>, args: &Value) -> Option<Vec<Value>> {
2309    let obj = args.as_object()?;
2310    let mut todos = current;
2311    let mut changed = false;
2312
2313    if let Some(items) = obj.get("todos").and_then(|v| v.as_array()) {
2314        for item in items {
2315            let Some(item_obj) = item.as_object() else {
2316                continue;
2317            };
2318            let status = item_obj
2319                .get("status")
2320                .and_then(|v| v.as_str())
2321                .map(normalize_todo_status);
2322            let target = item_obj
2323                .get("task_id")
2324                .or_else(|| item_obj.get("todo_id"))
2325                .or_else(|| item_obj.get("id"));
2326
2327            if let (Some(status), Some(target)) = (status, target) {
2328                changed |= apply_single_todo_status_update(&mut todos, target, &status);
2329            }
2330        }
2331    }
2332
2333    let status = obj
2334        .get("status")
2335        .and_then(|v| v.as_str())
2336        .map(normalize_todo_status);
2337    let target = obj
2338        .get("task_id")
2339        .or_else(|| obj.get("todo_id"))
2340        .or_else(|| obj.get("id"));
2341    if let (Some(status), Some(target)) = (status, target) {
2342        changed |= apply_single_todo_status_update(&mut todos, target, &status);
2343    }
2344
2345    if changed {
2346        Some(todos)
2347    } else {
2348        None
2349    }
2350}
2351
2352fn apply_single_todo_status_update(todos: &mut [Value], target: &Value, status: &str) -> bool {
2353    let idx_from_value = match target {
2354        Value::Number(n) => n.as_u64().map(|v| v.saturating_sub(1) as usize),
2355        Value::String(s) => {
2356            let trimmed = s.trim();
2357            trimmed
2358                .parse::<usize>()
2359                .ok()
2360                .map(|v| v.saturating_sub(1))
2361                .or_else(|| {
2362                    let digits = trimmed
2363                        .chars()
2364                        .rev()
2365                        .take_while(|c| c.is_ascii_digit())
2366                        .collect::<String>()
2367                        .chars()
2368                        .rev()
2369                        .collect::<String>();
2370                    digits.parse::<usize>().ok().map(|v| v.saturating_sub(1))
2371                })
2372        }
2373        _ => None,
2374    };
2375
2376    if let Some(idx) = idx_from_value {
2377        if idx < todos.len() {
2378            if let Some(obj) = todos[idx].as_object_mut() {
2379                obj.insert("status".to_string(), Value::String(status.to_string()));
2380                return true;
2381            }
2382        }
2383    }
2384
2385    let id_target = target.as_str().map(|s| s.trim()).filter(|s| !s.is_empty());
2386    if let Some(id_target) = id_target {
2387        for todo in todos.iter_mut() {
2388            if let Some(obj) = todo.as_object_mut() {
2389                if obj.get("id").and_then(|v| v.as_str()) == Some(id_target) {
2390                    obj.insert("status".to_string(), Value::String(status.to_string()));
2391                    return true;
2392                }
2393            }
2394        }
2395    }
2396
2397    false
2398}
2399
2400fn normalize_todo_status(raw: &str) -> String {
2401    match raw.trim().to_lowercase().as_str() {
2402        "in_progress" | "inprogress" | "running" | "working" => "in_progress".to_string(),
2403        "done" | "complete" | "completed" => "completed".to_string(),
2404        "cancelled" | "canceled" | "aborted" | "skipped" => "cancelled".to_string(),
2405        "open" | "todo" | "pending" => "pending".to_string(),
2406        other => other.to_string(),
2407    }
2408}
2409
2410fn compact_chat_history(messages: Vec<ChatMessage>) -> Vec<ChatMessage> {
2411    const MAX_CONTEXT_CHARS: usize = 80_000;
2412    const KEEP_RECENT_MESSAGES: usize = 40;
2413
2414    if messages.len() <= KEEP_RECENT_MESSAGES {
2415        let total_chars = messages.iter().map(|m| m.content.len()).sum::<usize>();
2416        if total_chars <= MAX_CONTEXT_CHARS {
2417            return messages;
2418        }
2419    }
2420
2421    let mut kept = messages;
2422    let mut dropped_count = 0usize;
2423    let mut total_chars = kept.iter().map(|m| m.content.len()).sum::<usize>();
2424
2425    while kept.len() > KEEP_RECENT_MESSAGES || total_chars > MAX_CONTEXT_CHARS {
2426        if kept.is_empty() {
2427            break;
2428        }
2429        let removed = kept.remove(0);
2430        total_chars = total_chars.saturating_sub(removed.content.len());
2431        dropped_count += 1;
2432    }
2433
2434    if dropped_count > 0 {
2435        kept.insert(
2436            0,
2437            ChatMessage {
2438                role: "system".to_string(),
2439                content: format!(
2440                    "[history compacted: omitted {} older messages to fit context window]",
2441                    dropped_count
2442                ),
2443            },
2444        );
2445    }
2446    kept
2447}
2448
2449#[cfg(test)]
2450mod tests {
2451    use super::*;
2452    use crate::{EventBus, Storage};
2453    use uuid::Uuid;
2454
2455    #[tokio::test]
2456    async fn todo_updated_event_is_normalized() {
2457        let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
2458        let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
2459        let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
2460        let session_id = session.id.clone();
2461        storage.save_session(session).await.expect("save session");
2462
2463        let bus = EventBus::new();
2464        let mut rx = bus.subscribe();
2465        emit_tool_side_events(
2466            storage.clone(),
2467            &bus,
2468            &session_id,
2469            "m1",
2470            "todo_write",
2471            &json!({"todos":[{"content":"ship parity"}]}),
2472            &json!({"todos":[{"content":"ship parity"}]}),
2473        )
2474        .await;
2475
2476        let event = rx.recv().await.expect("event");
2477        assert_eq!(event.event_type, "todo.updated");
2478        let todos = event
2479            .properties
2480            .get("todos")
2481            .and_then(|v| v.as_array())
2482            .cloned()
2483            .unwrap_or_default();
2484        assert_eq!(todos.len(), 1);
2485        assert!(todos[0].get("id").and_then(|v| v.as_str()).is_some());
2486        assert_eq!(
2487            todos[0].get("content").and_then(|v| v.as_str()),
2488            Some("ship parity")
2489        );
2490        assert!(todos[0].get("status").and_then(|v| v.as_str()).is_some());
2491    }
2492
2493    #[tokio::test]
2494    async fn question_asked_event_contains_tool_reference() {
2495        let base = std::env::temp_dir().join(format!("engine-loop-test-{}", Uuid::new_v4()));
2496        let storage = std::sync::Arc::new(Storage::new(&base).await.expect("storage"));
2497        let session = tandem_types::Session::new(Some("s".to_string()), Some(".".to_string()));
2498        let session_id = session.id.clone();
2499        storage.save_session(session).await.expect("save session");
2500
2501        let bus = EventBus::new();
2502        let mut rx = bus.subscribe();
2503        emit_tool_side_events(
2504            storage,
2505            &bus,
2506            &session_id,
2507            "msg-1",
2508            "question",
2509            &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
2510            &json!({"questions":[{"header":"Topic","question":"Pick one","options":[{"label":"A","description":"d"}]}]}),
2511        )
2512        .await;
2513
2514        let event = rx.recv().await.expect("event");
2515        assert_eq!(event.event_type, "question.asked");
2516        assert_eq!(
2517            event
2518                .properties
2519                .get("sessionID")
2520                .and_then(|v| v.as_str())
2521                .unwrap_or(""),
2522            session_id
2523        );
2524        let tool = event
2525            .properties
2526            .get("tool")
2527            .cloned()
2528            .unwrap_or_else(|| json!({}));
2529        assert!(tool.get("callID").and_then(|v| v.as_str()).is_some());
2530        assert_eq!(
2531            tool.get("messageID").and_then(|v| v.as_str()),
2532            Some("msg-1")
2533        );
2534    }
2535
2536    #[test]
2537    fn compact_chat_history_keeps_recent_and_inserts_summary() {
2538        let mut messages = Vec::new();
2539        for i in 0..60 {
2540            messages.push(ChatMessage {
2541                role: "user".to_string(),
2542                content: format!("message-{i}"),
2543            });
2544        }
2545        let compacted = compact_chat_history(messages);
2546        assert!(compacted.len() <= 41);
2547        assert_eq!(compacted[0].role, "system");
2548        assert!(compacted[0].content.contains("history compacted"));
2549        assert!(compacted.iter().any(|m| m.content.contains("message-59")));
2550    }
2551
2552    #[test]
2553    fn extracts_todos_from_checklist_and_numbered_lines() {
2554        let input = r#"
2555Plan:
2556- [ ] Audit current implementation
2557- [ ] Add planner fallback
25581. Add regression test coverage
2559"#;
2560        let todos = extract_todo_candidates_from_text(input);
2561        assert_eq!(todos.len(), 3);
2562        assert_eq!(
2563            todos[0].get("content").and_then(|v| v.as_str()),
2564            Some("Audit current implementation")
2565        );
2566    }
2567
2568    #[test]
2569    fn does_not_extract_todos_from_plain_prose_lines() {
2570        let input = r#"
2571I need more information to proceed.
2572Can you tell me the event size and budget?
2573Once I have that, I can provide a detailed plan.
2574"#;
2575        let todos = extract_todo_candidates_from_text(input);
2576        assert!(todos.is_empty());
2577    }
2578
2579    #[test]
2580    fn parses_wrapped_tool_call_from_markdown_response() {
2581        let input = r#"
2582Here is the tool call:
2583```json
2584{"tool_call":{"name":"todo_write","arguments":{"todos":[{"content":"a"}]}}}
2585```
2586"#;
2587        let parsed = parse_tool_invocation_from_response(input).expect("tool call");
2588        assert_eq!(parsed.0, "todo_write");
2589        assert!(parsed.1.get("todos").is_some());
2590    }
2591
2592    #[test]
2593    fn parses_function_style_todowrite_call() {
2594        let input = r#"Status: Completed
2595Call: todowrite(task_id=2, status="completed")"#;
2596        let parsed = parse_tool_invocation_from_response(input).expect("function-style tool call");
2597        assert_eq!(parsed.0, "todo_write");
2598        assert_eq!(parsed.1.get("task_id").and_then(|v| v.as_i64()), Some(2));
2599        assert_eq!(
2600            parsed.1.get("status").and_then(|v| v.as_str()),
2601            Some("completed")
2602        );
2603    }
2604
2605    #[test]
2606    fn parses_multiple_function_style_todowrite_calls() {
2607        let input = r#"
2608Call: todowrite(task_id=2, status="completed")
2609Call: todowrite(task_id=3, status="in_progress")
2610"#;
2611        let parsed = parse_tool_invocations_from_response(input);
2612        assert_eq!(parsed.len(), 2);
2613        assert_eq!(parsed[0].0, "todo_write");
2614        assert_eq!(parsed[0].1.get("task_id").and_then(|v| v.as_i64()), Some(2));
2615        assert_eq!(
2616            parsed[0].1.get("status").and_then(|v| v.as_str()),
2617            Some("completed")
2618        );
2619        assert_eq!(parsed[1].1.get("task_id").and_then(|v| v.as_i64()), Some(3));
2620        assert_eq!(
2621            parsed[1].1.get("status").and_then(|v| v.as_str()),
2622            Some("in_progress")
2623        );
2624    }
2625
2626    #[test]
2627    fn applies_todo_status_update_from_task_id_args() {
2628        let current = vec![
2629            json!({"id":"todo-1","content":"a","status":"pending"}),
2630            json!({"id":"todo-2","content":"b","status":"pending"}),
2631            json!({"id":"todo-3","content":"c","status":"pending"}),
2632        ];
2633        let updated =
2634            apply_todo_updates_from_args(current, &json!({"task_id":2, "status":"completed"}))
2635                .expect("status update");
2636        assert_eq!(
2637            updated[1].get("status").and_then(|v| v.as_str()),
2638            Some("completed")
2639        );
2640    }
2641
2642    #[test]
2643    fn normalizes_todo_write_tasks_alias() {
2644        let normalized = normalize_todo_write_args(
2645            json!({"tasks":[{"title":"Book venue"},{"name":"Send invites"}]}),
2646            "",
2647        );
2648        let todos = normalized
2649            .get("todos")
2650            .and_then(|v| v.as_array())
2651            .cloned()
2652            .unwrap_or_default();
2653        assert_eq!(todos.len(), 2);
2654        assert_eq!(
2655            todos[0].get("content").and_then(|v| v.as_str()),
2656            Some("Book venue")
2657        );
2658        assert_eq!(
2659            todos[1].get("content").and_then(|v| v.as_str()),
2660            Some("Send invites")
2661        );
2662    }
2663
2664    #[test]
2665    fn normalizes_todo_write_from_completion_when_args_empty() {
2666        let completion = "Plan:\n1. Secure venue\n2. Create playlist\n3. Send invites";
2667        let normalized = normalize_todo_write_args(json!({}), completion);
2668        let todos = normalized
2669            .get("todos")
2670            .and_then(|v| v.as_array())
2671            .cloned()
2672            .unwrap_or_default();
2673        assert_eq!(todos.len(), 3);
2674        assert!(!is_empty_todo_write_args(&normalized));
2675    }
2676
2677    #[test]
2678    fn empty_todo_write_args_allows_status_updates() {
2679        let args = json!({"task_id": 2, "status":"completed"});
2680        assert!(!is_empty_todo_write_args(&args));
2681    }
2682
2683    #[test]
2684    fn streamed_websearch_args_fallback_to_query_string() {
2685        let parsed = parse_streamed_tool_args("websearch", "meaning of life");
2686        assert_eq!(
2687            parsed.get("query").and_then(|v| v.as_str()),
2688            Some("meaning of life")
2689        );
2690    }
2691
2692    #[test]
2693    fn streamed_websearch_stringified_json_args_are_unwrapped() {
2694        let parsed = parse_streamed_tool_args("websearch", r#""donkey gestation period""#);
2695        assert_eq!(
2696            parsed.get("query").and_then(|v| v.as_str()),
2697            Some("donkey gestation period")
2698        );
2699    }
2700
2701    #[test]
2702    fn normalize_tool_args_websearch_infers_from_user_text() {
2703        let normalized =
2704            normalize_tool_args("websearch", json!({}), "web search meaning of life", "");
2705        assert_eq!(
2706            normalized.args.get("query").and_then(|v| v.as_str()),
2707            Some("meaning of life")
2708        );
2709        assert_eq!(normalized.args_source, "inferred_from_user");
2710        assert_eq!(normalized.args_integrity, "recovered");
2711    }
2712
2713    #[test]
2714    fn normalize_tool_args_websearch_keeps_existing_query() {
2715        let normalized = normalize_tool_args(
2716            "websearch",
2717            json!({"query":"already set"}),
2718            "web search should not override",
2719            "",
2720        );
2721        assert_eq!(
2722            normalized.args.get("query").and_then(|v| v.as_str()),
2723            Some("already set")
2724        );
2725        assert_eq!(normalized.args_source, "provider_json");
2726        assert_eq!(normalized.args_integrity, "ok");
2727    }
2728
2729    #[test]
2730    fn normalize_tool_args_websearch_fails_when_unrecoverable() {
2731        let normalized = normalize_tool_args("websearch", json!({}), "search", "");
2732        assert!(normalized.query.is_none());
2733        assert!(normalized.missing_terminal);
2734        assert_eq!(normalized.args_source, "missing");
2735        assert_eq!(normalized.args_integrity, "empty");
2736    }
2737}