Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use futures::future::BoxFuture;
10use serde::{Deserialize, Serialize};
11use serde_json::{json, Value};
12use sha2::{Digest, Sha256};
13use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
14use tandem_orchestrator::MissionState;
15use tandem_types::{
16    EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
17    SendMessageRequest, Session, ShellFamily,
18};
19use tokio::fs;
20use tokio::sync::RwLock;
21
22use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
23use tandem_core::{
24    resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
25    PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
26};
27use tandem_memory::db::MemoryDatabase;
28use tandem_providers::ChatMessage;
29use tandem_providers::ProviderRegistry;
30use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
31use tandem_tools::ToolRegistry;
32
33mod agent_teams;
34mod http;
35pub mod webui;
36
37pub use agent_teams::AgentTeamRuntime;
38pub use http::serve;
39
40#[derive(Debug, Clone, Serialize, Deserialize, Default)]
41pub struct ChannelStatus {
42    pub enabled: bool,
43    pub connected: bool,
44    pub last_error: Option<String>,
45    pub active_sessions: u64,
46    pub meta: Value,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, Default)]
50pub struct WebUiConfig {
51    #[serde(default)]
52    pub enabled: bool,
53    #[serde(default = "default_web_ui_prefix")]
54    pub path_prefix: String,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, Default)]
58pub struct ChannelsConfigFile {
59    pub telegram: Option<TelegramConfigFile>,
60    pub discord: Option<DiscordConfigFile>,
61    pub slack: Option<SlackConfigFile>,
62    #[serde(default)]
63    pub tool_policy: tandem_channels::config::ChannelToolPolicy,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct TelegramConfigFile {
68    pub bot_token: String,
69    #[serde(default = "default_allow_all")]
70    pub allowed_users: Vec<String>,
71    #[serde(default)]
72    pub mention_only: bool,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct DiscordConfigFile {
77    pub bot_token: String,
78    #[serde(default)]
79    pub guild_id: Option<String>,
80    #[serde(default = "default_allow_all")]
81    pub allowed_users: Vec<String>,
82    #[serde(default = "default_discord_mention_only")]
83    pub mention_only: bool,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SlackConfigFile {
88    pub bot_token: String,
89    pub channel_id: String,
90    #[serde(default = "default_allow_all")]
91    pub allowed_users: Vec<String>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, Default)]
95struct EffectiveAppConfig {
96    #[serde(default)]
97    pub channels: ChannelsConfigFile,
98    #[serde(default)]
99    pub web_ui: WebUiConfig,
100    #[serde(default)]
101    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
102}
103
104#[derive(Default)]
105pub struct ChannelRuntime {
106    pub listeners: Option<tokio::task::JoinSet<()>>,
107    pub statuses: std::collections::HashMap<String, ChannelStatus>,
108}
109
110#[derive(Debug, Clone)]
111pub struct EngineLease {
112    pub lease_id: String,
113    pub client_id: String,
114    pub client_type: String,
115    pub acquired_at_ms: u64,
116    pub last_renewed_at_ms: u64,
117    pub ttl_ms: u64,
118}
119
120impl EngineLease {
121    pub fn is_expired(&self, now_ms: u64) -> bool {
122        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
123    }
124}
125
126#[derive(Debug, Clone, Serialize)]
127pub struct ActiveRun {
128    #[serde(rename = "runID")]
129    pub run_id: String,
130    #[serde(rename = "startedAtMs")]
131    pub started_at_ms: u64,
132    #[serde(rename = "lastActivityAtMs")]
133    pub last_activity_at_ms: u64,
134    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
135    pub client_id: Option<String>,
136    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
137    pub agent_id: Option<String>,
138    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
139    pub agent_profile: Option<String>,
140}
141
142#[derive(Clone, Default)]
143pub struct RunRegistry {
144    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
145}
146
147impl RunRegistry {
148    pub fn new() -> Self {
149        Self::default()
150    }
151
152    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
153        self.active.read().await.get(session_id).cloned()
154    }
155
156    pub async fn acquire(
157        &self,
158        session_id: &str,
159        run_id: String,
160        client_id: Option<String>,
161        agent_id: Option<String>,
162        agent_profile: Option<String>,
163    ) -> std::result::Result<ActiveRun, ActiveRun> {
164        let mut guard = self.active.write().await;
165        if let Some(existing) = guard.get(session_id).cloned() {
166            return Err(existing);
167        }
168        let now = now_ms();
169        let run = ActiveRun {
170            run_id,
171            started_at_ms: now,
172            last_activity_at_ms: now,
173            client_id,
174            agent_id,
175            agent_profile,
176        };
177        guard.insert(session_id.to_string(), run.clone());
178        Ok(run)
179    }
180
181    pub async fn touch(&self, session_id: &str, run_id: &str) {
182        let mut guard = self.active.write().await;
183        if let Some(run) = guard.get_mut(session_id) {
184            if run.run_id == run_id {
185                run.last_activity_at_ms = now_ms();
186            }
187        }
188    }
189
190    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
191        let mut guard = self.active.write().await;
192        if let Some(run) = guard.get(session_id) {
193            if run.run_id == run_id {
194                return guard.remove(session_id);
195            }
196        }
197        None
198    }
199
200    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
201        self.active.write().await.remove(session_id)
202    }
203
204    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
205        let now = now_ms();
206        let mut guard = self.active.write().await;
207        let stale_ids = guard
208            .iter()
209            .filter_map(|(session_id, run)| {
210                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
211                    Some(session_id.clone())
212                } else {
213                    None
214                }
215            })
216            .collect::<Vec<_>>();
217        let mut out = Vec::with_capacity(stale_ids.len());
218        for session_id in stale_ids {
219            if let Some(run) = guard.remove(&session_id) {
220                out.push((session_id, run));
221            }
222        }
223        out
224    }
225}
226
227pub fn now_ms() -> u64 {
228    SystemTime::now()
229        .duration_since(UNIX_EPOCH)
230        .map(|d| d.as_millis() as u64)
231        .unwrap_or(0)
232}
233
234pub fn build_id() -> String {
235    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
236        let trimmed = explicit.trim();
237        if !trimmed.is_empty() {
238            return trimmed.to_string();
239        }
240    }
241    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
242        let trimmed = git_sha.trim();
243        if !trimmed.is_empty() {
244            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
245        }
246    }
247    env!("CARGO_PKG_VERSION").to_string()
248}
249
250pub fn detect_host_runtime_context() -> HostRuntimeContext {
251    let os = if cfg!(target_os = "windows") {
252        HostOs::Windows
253    } else if cfg!(target_os = "macos") {
254        HostOs::Macos
255    } else {
256        HostOs::Linux
257    };
258    let (shell_family, path_style) = match os {
259        HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
260        HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
261    };
262    HostRuntimeContext {
263        os,
264        arch: std::env::consts::ARCH.to_string(),
265        shell_family,
266        path_style,
267    }
268}
269
270pub fn binary_path_for_health() -> Option<String> {
271    #[cfg(debug_assertions)]
272    {
273        std::env::current_exe()
274            .ok()
275            .map(|p| p.to_string_lossy().to_string())
276    }
277    #[cfg(not(debug_assertions))]
278    {
279        None
280    }
281}
282
283#[derive(Clone)]
284pub struct RuntimeState {
285    pub storage: Arc<Storage>,
286    pub config: ConfigStore,
287    pub event_bus: EventBus,
288    pub providers: ProviderRegistry,
289    pub plugins: PluginRegistry,
290    pub agents: AgentRegistry,
291    pub tools: ToolRegistry,
292    pub permissions: PermissionManager,
293    pub mcp: McpRegistry,
294    pub pty: PtyManager,
295    pub lsp: LspManager,
296    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
297    pub logs: Arc<RwLock<Vec<Value>>>,
298    pub workspace_index: WorkspaceIndex,
299    pub cancellations: CancellationRegistry,
300    pub engine_loop: EngineLoop,
301    pub host_runtime_context: HostRuntimeContext,
302}
303
304#[derive(Debug, Clone)]
305pub struct GovernedMemoryRecord {
306    pub id: String,
307    pub run_id: String,
308    pub partition: MemoryPartition,
309    pub kind: MemoryContentKind,
310    pub content: String,
311    pub artifact_refs: Vec<String>,
312    pub classification: MemoryClassification,
313    pub metadata: Option<Value>,
314    pub source_memory_id: Option<String>,
315    pub created_at_ms: u64,
316}
317
318#[derive(Debug, Clone, Serialize)]
319pub struct MemoryAuditEvent {
320    pub audit_id: String,
321    pub action: String,
322    pub run_id: String,
323    pub memory_id: Option<String>,
324    pub source_memory_id: Option<String>,
325    pub to_tier: Option<GovernedMemoryTier>,
326    pub partition_key: String,
327    pub actor: String,
328    pub status: String,
329    #[serde(skip_serializing_if = "Option::is_none")]
330    pub detail: Option<String>,
331    pub created_at_ms: u64,
332}
333
334#[derive(Debug, Clone, Serialize, Deserialize)]
335pub struct SharedResourceRecord {
336    pub key: String,
337    pub value: Value,
338    pub rev: u64,
339    pub updated_at_ms: u64,
340    pub updated_by: String,
341    #[serde(skip_serializing_if = "Option::is_none")]
342    pub ttl_ms: Option<u64>,
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
346#[serde(rename_all = "snake_case")]
347pub enum RoutineSchedule {
348    IntervalSeconds { seconds: u64 },
349    Cron { expression: String },
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
353#[serde(rename_all = "snake_case", tag = "type")]
354pub enum RoutineMisfirePolicy {
355    Skip,
356    RunOnce,
357    CatchUp { max_runs: u32 },
358}
359
360#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
361#[serde(rename_all = "snake_case")]
362pub enum RoutineStatus {
363    Active,
364    Paused,
365}
366
367#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct RoutineSpec {
369    pub routine_id: String,
370    pub name: String,
371    pub status: RoutineStatus,
372    pub schedule: RoutineSchedule,
373    pub timezone: String,
374    pub misfire_policy: RoutineMisfirePolicy,
375    pub entrypoint: String,
376    #[serde(default)]
377    pub args: Value,
378    #[serde(default)]
379    pub allowed_tools: Vec<String>,
380    #[serde(default)]
381    pub output_targets: Vec<String>,
382    pub creator_type: String,
383    pub creator_id: String,
384    pub requires_approval: bool,
385    pub external_integrations_allowed: bool,
386    #[serde(default, skip_serializing_if = "Option::is_none")]
387    pub next_fire_at_ms: Option<u64>,
388    #[serde(default, skip_serializing_if = "Option::is_none")]
389    pub last_fired_at_ms: Option<u64>,
390}
391
392#[derive(Debug, Clone, Serialize, Deserialize)]
393pub struct RoutineHistoryEvent {
394    pub routine_id: String,
395    pub trigger_type: String,
396    pub run_count: u32,
397    pub fired_at_ms: u64,
398    pub status: String,
399    #[serde(default, skip_serializing_if = "Option::is_none")]
400    pub detail: Option<String>,
401}
402
403#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
404#[serde(rename_all = "snake_case")]
405pub enum RoutineRunStatus {
406    Queued,
407    PendingApproval,
408    Running,
409    Paused,
410    BlockedPolicy,
411    Denied,
412    Completed,
413    Failed,
414    Cancelled,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct RoutineRunArtifact {
419    pub artifact_id: String,
420    pub uri: String,
421    pub kind: String,
422    #[serde(default, skip_serializing_if = "Option::is_none")]
423    pub label: Option<String>,
424    pub created_at_ms: u64,
425    #[serde(default, skip_serializing_if = "Option::is_none")]
426    pub metadata: Option<Value>,
427}
428
429#[derive(Debug, Clone, Serialize, Deserialize)]
430pub struct RoutineRunRecord {
431    pub run_id: String,
432    pub routine_id: String,
433    pub trigger_type: String,
434    pub run_count: u32,
435    pub status: RoutineRunStatus,
436    pub created_at_ms: u64,
437    pub updated_at_ms: u64,
438    #[serde(default, skip_serializing_if = "Option::is_none")]
439    pub fired_at_ms: Option<u64>,
440    #[serde(default, skip_serializing_if = "Option::is_none")]
441    pub started_at_ms: Option<u64>,
442    #[serde(default, skip_serializing_if = "Option::is_none")]
443    pub finished_at_ms: Option<u64>,
444    pub requires_approval: bool,
445    #[serde(default, skip_serializing_if = "Option::is_none")]
446    pub approval_reason: Option<String>,
447    #[serde(default, skip_serializing_if = "Option::is_none")]
448    pub denial_reason: Option<String>,
449    #[serde(default, skip_serializing_if = "Option::is_none")]
450    pub paused_reason: Option<String>,
451    #[serde(default, skip_serializing_if = "Option::is_none")]
452    pub detail: Option<String>,
453    pub entrypoint: String,
454    #[serde(default)]
455    pub args: Value,
456    #[serde(default)]
457    pub allowed_tools: Vec<String>,
458    #[serde(default)]
459    pub output_targets: Vec<String>,
460    #[serde(default)]
461    pub artifacts: Vec<RoutineRunArtifact>,
462}
463
464#[derive(Debug, Clone)]
465pub struct RoutineSessionPolicy {
466    pub session_id: String,
467    pub run_id: String,
468    pub routine_id: String,
469    pub allowed_tools: Vec<String>,
470}
471
472#[derive(Debug, Clone, Serialize)]
473pub struct RoutineTriggerPlan {
474    pub routine_id: String,
475    pub run_count: u32,
476    pub scheduled_at_ms: u64,
477    pub next_fire_at_ms: u64,
478}
479
480#[derive(Debug, Clone, Serialize)]
481pub struct ResourceConflict {
482    pub key: String,
483    pub expected_rev: Option<u64>,
484    pub current_rev: Option<u64>,
485}
486
487#[derive(Debug, Clone, Serialize)]
488#[serde(tag = "type", rename_all = "snake_case")]
489pub enum ResourceStoreError {
490    InvalidKey { key: String },
491    RevisionConflict(ResourceConflict),
492    PersistFailed { message: String },
493}
494
495#[derive(Debug, Clone, Serialize)]
496#[serde(tag = "type", rename_all = "snake_case")]
497pub enum RoutineStoreError {
498    InvalidRoutineId { routine_id: String },
499    InvalidSchedule { detail: String },
500    PersistFailed { message: String },
501}
502
503#[derive(Debug, Clone)]
504pub enum StartupStatus {
505    Starting,
506    Ready,
507    Failed,
508}
509
510#[derive(Debug, Clone)]
511pub struct StartupState {
512    pub status: StartupStatus,
513    pub phase: String,
514    pub started_at_ms: u64,
515    pub attempt_id: String,
516    pub last_error: Option<String>,
517}
518
519#[derive(Debug, Clone)]
520pub struct StartupSnapshot {
521    pub status: StartupStatus,
522    pub phase: String,
523    pub started_at_ms: u64,
524    pub attempt_id: String,
525    pub last_error: Option<String>,
526    pub elapsed_ms: u64,
527}
528
529#[derive(Clone)]
530pub struct AppState {
531    pub runtime: Arc<OnceLock<RuntimeState>>,
532    pub startup: Arc<RwLock<StartupState>>,
533    pub in_process_mode: Arc<AtomicBool>,
534    pub api_token: Arc<RwLock<Option<String>>>,
535    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
536    pub run_registry: RunRegistry,
537    pub run_stale_ms: u64,
538    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
539    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
540    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
541    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
542    pub shared_resources_path: PathBuf,
543    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
544    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
545    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
546    pub routine_session_policies:
547        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
548    pub routines_path: PathBuf,
549    pub routine_history_path: PathBuf,
550    pub routine_runs_path: PathBuf,
551    pub agent_teams: AgentTeamRuntime,
552    pub web_ui_enabled: Arc<AtomicBool>,
553    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
554    pub server_base_url: Arc<std::sync::RwLock<String>>,
555    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
556    pub host_runtime_context: HostRuntimeContext,
557}
558
559#[derive(Debug, Clone)]
560struct StatusIndexUpdate {
561    key: String,
562    value: Value,
563}
564
565impl AppState {
566    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
567        Self {
568            runtime: Arc::new(OnceLock::new()),
569            startup: Arc::new(RwLock::new(StartupState {
570                status: StartupStatus::Starting,
571                phase: "boot".to_string(),
572                started_at_ms: now_ms(),
573                attempt_id,
574                last_error: None,
575            })),
576            in_process_mode: Arc::new(AtomicBool::new(in_process)),
577            api_token: Arc::new(RwLock::new(None)),
578            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
579            run_registry: RunRegistry::new(),
580            run_stale_ms: resolve_run_stale_ms(),
581            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
582            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
583            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
584            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
585            shared_resources_path: resolve_shared_resources_path(),
586            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
587            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
588            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
589            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
590            routines_path: resolve_routines_path(),
591            routine_history_path: resolve_routine_history_path(),
592            routine_runs_path: resolve_routine_runs_path(),
593            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
594            web_ui_enabled: Arc::new(AtomicBool::new(false)),
595            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
596            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
597            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
598            host_runtime_context: detect_host_runtime_context(),
599        }
600    }
601
602    pub fn is_ready(&self) -> bool {
603        self.runtime.get().is_some()
604    }
605
606    pub fn mode_label(&self) -> &'static str {
607        if self.in_process_mode.load(Ordering::Relaxed) {
608            "in-process"
609        } else {
610            "sidecar"
611        }
612    }
613
614    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
615        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
616        if let Ok(mut guard) = self.web_ui_prefix.write() {
617            *guard = normalize_web_ui_prefix(&prefix);
618        }
619    }
620
621    pub fn web_ui_enabled(&self) -> bool {
622        self.web_ui_enabled.load(Ordering::Relaxed)
623    }
624
625    pub fn web_ui_prefix(&self) -> String {
626        self.web_ui_prefix
627            .read()
628            .map(|v| v.clone())
629            .unwrap_or_else(|_| "/admin".to_string())
630    }
631
632    pub fn set_server_base_url(&self, base_url: String) {
633        if let Ok(mut guard) = self.server_base_url.write() {
634            *guard = base_url;
635        }
636    }
637
638    pub fn server_base_url(&self) -> String {
639        self.server_base_url
640            .read()
641            .map(|v| v.clone())
642            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
643    }
644
645    pub async fn api_token(&self) -> Option<String> {
646        self.api_token.read().await.clone()
647    }
648
649    pub async fn set_api_token(&self, token: Option<String>) {
650        *self.api_token.write().await = token;
651    }
652
653    pub async fn startup_snapshot(&self) -> StartupSnapshot {
654        let state = self.startup.read().await.clone();
655        StartupSnapshot {
656            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
657            status: state.status,
658            phase: state.phase,
659            started_at_ms: state.started_at_ms,
660            attempt_id: state.attempt_id,
661            last_error: state.last_error,
662        }
663    }
664
665    pub fn host_runtime_context(&self) -> HostRuntimeContext {
666        self.runtime
667            .get()
668            .map(|runtime| runtime.host_runtime_context.clone())
669            .unwrap_or_else(|| self.host_runtime_context.clone())
670    }
671
672    pub async fn set_phase(&self, phase: impl Into<String>) {
673        let mut startup = self.startup.write().await;
674        startup.phase = phase.into();
675    }
676
677    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
678        self.runtime
679            .set(runtime)
680            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
681        self.engine_loop
682            .set_spawn_agent_hook(std::sync::Arc::new(
683                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
684            ))
685            .await;
686        self.engine_loop
687            .set_tool_policy_hook(std::sync::Arc::new(
688                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
689            ))
690            .await;
691        self.engine_loop
692            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
693                self.clone(),
694            )))
695            .await;
696        let _ = self.load_shared_resources().await;
697        let _ = self.load_routines().await;
698        let _ = self.load_routine_history().await;
699        let _ = self.load_routine_runs().await;
700        let workspace_root = self.workspace_index.snapshot().await.root;
701        let _ = self
702            .agent_teams
703            .ensure_loaded_for_workspace(&workspace_root)
704            .await;
705        let mut startup = self.startup.write().await;
706        startup.status = StartupStatus::Ready;
707        startup.phase = "ready".to_string();
708        startup.last_error = None;
709        Ok(())
710    }
711
712    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
713        let mut startup = self.startup.write().await;
714        startup.status = StartupStatus::Failed;
715        startup.phase = phase.into();
716        startup.last_error = Some(error.into());
717    }
718
719    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
720        let runtime = self.channels_runtime.lock().await;
721        runtime.statuses.clone()
722    }
723
724    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
725        let effective = self.config.get_effective_value().await;
726        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
727        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
728
729        let mut runtime = self.channels_runtime.lock().await;
730        if let Some(listeners) = runtime.listeners.as_mut() {
731            listeners.abort_all();
732        }
733        runtime.listeners = None;
734        runtime.statuses.clear();
735
736        let mut status_map = std::collections::HashMap::new();
737        status_map.insert(
738            "telegram".to_string(),
739            ChannelStatus {
740                enabled: parsed.channels.telegram.is_some(),
741                connected: false,
742                last_error: None,
743                active_sessions: 0,
744                meta: serde_json::json!({}),
745            },
746        );
747        status_map.insert(
748            "discord".to_string(),
749            ChannelStatus {
750                enabled: parsed.channels.discord.is_some(),
751                connected: false,
752                last_error: None,
753                active_sessions: 0,
754                meta: serde_json::json!({}),
755            },
756        );
757        status_map.insert(
758            "slack".to_string(),
759            ChannelStatus {
760                enabled: parsed.channels.slack.is_some(),
761                connected: false,
762                last_error: None,
763                active_sessions: 0,
764                meta: serde_json::json!({}),
765            },
766        );
767
768        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
769            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
770            runtime.listeners = Some(listeners);
771            for status in status_map.values_mut() {
772                if status.enabled {
773                    status.connected = true;
774                }
775            }
776        }
777
778        runtime.statuses = status_map.clone();
779        drop(runtime);
780
781        self.event_bus.publish(EngineEvent::new(
782            "channel.status.changed",
783            serde_json::json!({ "channels": status_map }),
784        ));
785        Ok(())
786    }
787
788    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
789        if !self.shared_resources_path.exists() {
790            return Ok(());
791        }
792        let raw = fs::read_to_string(&self.shared_resources_path).await?;
793        let parsed =
794            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
795                .unwrap_or_default();
796        let mut guard = self.shared_resources.write().await;
797        *guard = parsed;
798        Ok(())
799    }
800
801    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
802        if let Some(parent) = self.shared_resources_path.parent() {
803            fs::create_dir_all(parent).await?;
804        }
805        let payload = {
806            let guard = self.shared_resources.read().await;
807            serde_json::to_string_pretty(&*guard)?
808        };
809        fs::write(&self.shared_resources_path, payload).await?;
810        Ok(())
811    }
812
813    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
814        self.shared_resources.read().await.get(key).cloned()
815    }
816
817    pub async fn list_shared_resources(
818        &self,
819        prefix: Option<&str>,
820        limit: usize,
821    ) -> Vec<SharedResourceRecord> {
822        let limit = limit.clamp(1, 500);
823        let mut rows = self
824            .shared_resources
825            .read()
826            .await
827            .values()
828            .filter(|record| {
829                if let Some(prefix) = prefix {
830                    record.key.starts_with(prefix)
831                } else {
832                    true
833                }
834            })
835            .cloned()
836            .collect::<Vec<_>>();
837        rows.sort_by(|a, b| a.key.cmp(&b.key));
838        rows.truncate(limit);
839        rows
840    }
841
842    pub async fn put_shared_resource(
843        &self,
844        key: String,
845        value: Value,
846        if_match_rev: Option<u64>,
847        updated_by: String,
848        ttl_ms: Option<u64>,
849    ) -> Result<SharedResourceRecord, ResourceStoreError> {
850        if !is_valid_resource_key(&key) {
851            return Err(ResourceStoreError::InvalidKey { key });
852        }
853
854        let now = now_ms();
855        let mut guard = self.shared_resources.write().await;
856        let existing = guard.get(&key).cloned();
857
858        if let Some(expected) = if_match_rev {
859            let current = existing.as_ref().map(|row| row.rev);
860            if current != Some(expected) {
861                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
862                    key,
863                    expected_rev: Some(expected),
864                    current_rev: current,
865                }));
866            }
867        }
868
869        let next_rev = existing
870            .as_ref()
871            .map(|row| row.rev.saturating_add(1))
872            .unwrap_or(1);
873
874        let record = SharedResourceRecord {
875            key: key.clone(),
876            value,
877            rev: next_rev,
878            updated_at_ms: now,
879            updated_by,
880            ttl_ms,
881        };
882
883        let previous = guard.insert(key.clone(), record.clone());
884        drop(guard);
885
886        if let Err(error) = self.persist_shared_resources().await {
887            let mut rollback = self.shared_resources.write().await;
888            if let Some(previous) = previous {
889                rollback.insert(key, previous);
890            } else {
891                rollback.remove(&key);
892            }
893            return Err(ResourceStoreError::PersistFailed {
894                message: error.to_string(),
895            });
896        }
897
898        Ok(record)
899    }
900
901    pub async fn delete_shared_resource(
902        &self,
903        key: &str,
904        if_match_rev: Option<u64>,
905    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
906        if !is_valid_resource_key(key) {
907            return Err(ResourceStoreError::InvalidKey {
908                key: key.to_string(),
909            });
910        }
911
912        let mut guard = self.shared_resources.write().await;
913        let current = guard.get(key).cloned();
914        if let Some(expected) = if_match_rev {
915            let current_rev = current.as_ref().map(|row| row.rev);
916            if current_rev != Some(expected) {
917                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
918                    key: key.to_string(),
919                    expected_rev: Some(expected),
920                    current_rev,
921                }));
922            }
923        }
924
925        let removed = guard.remove(key);
926        drop(guard);
927
928        if let Err(error) = self.persist_shared_resources().await {
929            if let Some(record) = removed.clone() {
930                self.shared_resources
931                    .write()
932                    .await
933                    .insert(record.key.clone(), record);
934            }
935            return Err(ResourceStoreError::PersistFailed {
936                message: error.to_string(),
937            });
938        }
939
940        Ok(removed)
941    }
942
943    pub async fn load_routines(&self) -> anyhow::Result<()> {
944        if !self.routines_path.exists() {
945            return Ok(());
946        }
947        let raw = fs::read_to_string(&self.routines_path).await?;
948        let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
949            .unwrap_or_default();
950        let mut guard = self.routines.write().await;
951        *guard = parsed;
952        Ok(())
953    }
954
955    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
956        if !self.routine_history_path.exists() {
957            return Ok(());
958        }
959        let raw = fs::read_to_string(&self.routine_history_path).await?;
960        let parsed = serde_json::from_str::<
961            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
962        >(&raw)
963        .unwrap_or_default();
964        let mut guard = self.routine_history.write().await;
965        *guard = parsed;
966        Ok(())
967    }
968
969    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
970        if !self.routine_runs_path.exists() {
971            return Ok(());
972        }
973        let raw = fs::read_to_string(&self.routine_runs_path).await?;
974        let parsed =
975            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
976                .unwrap_or_default();
977        let mut guard = self.routine_runs.write().await;
978        *guard = parsed;
979        Ok(())
980    }
981
982    pub async fn persist_routines(&self) -> anyhow::Result<()> {
983        if let Some(parent) = self.routines_path.parent() {
984            fs::create_dir_all(parent).await?;
985        }
986        let payload = {
987            let guard = self.routines.read().await;
988            serde_json::to_string_pretty(&*guard)?
989        };
990        fs::write(&self.routines_path, payload).await?;
991        Ok(())
992    }
993
994    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
995        if let Some(parent) = self.routine_history_path.parent() {
996            fs::create_dir_all(parent).await?;
997        }
998        let payload = {
999            let guard = self.routine_history.read().await;
1000            serde_json::to_string_pretty(&*guard)?
1001        };
1002        fs::write(&self.routine_history_path, payload).await?;
1003        Ok(())
1004    }
1005
1006    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1007        if let Some(parent) = self.routine_runs_path.parent() {
1008            fs::create_dir_all(parent).await?;
1009        }
1010        let payload = {
1011            let guard = self.routine_runs.read().await;
1012            serde_json::to_string_pretty(&*guard)?
1013        };
1014        fs::write(&self.routine_runs_path, payload).await?;
1015        Ok(())
1016    }
1017
1018    pub async fn put_routine(
1019        &self,
1020        mut routine: RoutineSpec,
1021    ) -> Result<RoutineSpec, RoutineStoreError> {
1022        if routine.routine_id.trim().is_empty() {
1023            return Err(RoutineStoreError::InvalidRoutineId {
1024                routine_id: routine.routine_id,
1025            });
1026        }
1027
1028        routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1029        routine.output_targets = normalize_non_empty_list(routine.output_targets);
1030
1031        let interval = match routine.schedule {
1032            RoutineSchedule::IntervalSeconds { seconds } => {
1033                if seconds == 0 {
1034                    return Err(RoutineStoreError::InvalidSchedule {
1035                        detail: "interval_seconds must be > 0".to_string(),
1036                    });
1037                }
1038                Some(seconds)
1039            }
1040            RoutineSchedule::Cron { .. } => None,
1041        };
1042        if routine.next_fire_at_ms.is_none() {
1043            routine.next_fire_at_ms = Some(now_ms().saturating_add(interval.unwrap_or(60) * 1000));
1044        }
1045
1046        let mut guard = self.routines.write().await;
1047        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1048        drop(guard);
1049
1050        if let Err(error) = self.persist_routines().await {
1051            let mut rollback = self.routines.write().await;
1052            if let Some(previous) = previous {
1053                rollback.insert(previous.routine_id.clone(), previous);
1054            } else {
1055                rollback.remove(&routine.routine_id);
1056            }
1057            return Err(RoutineStoreError::PersistFailed {
1058                message: error.to_string(),
1059            });
1060        }
1061
1062        Ok(routine)
1063    }
1064
1065    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1066        let mut rows = self
1067            .routines
1068            .read()
1069            .await
1070            .values()
1071            .cloned()
1072            .collect::<Vec<_>>();
1073        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1074        rows
1075    }
1076
1077    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1078        self.routines.read().await.get(routine_id).cloned()
1079    }
1080
1081    pub async fn delete_routine(
1082        &self,
1083        routine_id: &str,
1084    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1085        let mut guard = self.routines.write().await;
1086        let removed = guard.remove(routine_id);
1087        drop(guard);
1088
1089        if let Err(error) = self.persist_routines().await {
1090            if let Some(removed) = removed.clone() {
1091                self.routines
1092                    .write()
1093                    .await
1094                    .insert(removed.routine_id.clone(), removed);
1095            }
1096            return Err(RoutineStoreError::PersistFailed {
1097                message: error.to_string(),
1098            });
1099        }
1100        Ok(removed)
1101    }
1102
1103    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1104        let mut plans = Vec::new();
1105        let mut guard = self.routines.write().await;
1106        for routine in guard.values_mut() {
1107            if routine.status != RoutineStatus::Active {
1108                continue;
1109            }
1110            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1111                continue;
1112            };
1113            let Some(interval_ms) = routine_interval_ms(&routine.schedule) else {
1114                continue;
1115            };
1116            if now_ms < next_fire_at_ms {
1117                continue;
1118            }
1119            let (run_count, next_fire_at_ms) = compute_misfire_plan(
1120                now_ms,
1121                next_fire_at_ms,
1122                interval_ms,
1123                &routine.misfire_policy,
1124            );
1125            routine.next_fire_at_ms = Some(next_fire_at_ms);
1126            if run_count == 0 {
1127                continue;
1128            }
1129            plans.push(RoutineTriggerPlan {
1130                routine_id: routine.routine_id.clone(),
1131                run_count,
1132                scheduled_at_ms: now_ms,
1133                next_fire_at_ms,
1134            });
1135        }
1136        drop(guard);
1137        let _ = self.persist_routines().await;
1138        plans
1139    }
1140
1141    pub async fn mark_routine_fired(
1142        &self,
1143        routine_id: &str,
1144        fired_at_ms: u64,
1145    ) -> Option<RoutineSpec> {
1146        let mut guard = self.routines.write().await;
1147        let routine = guard.get_mut(routine_id)?;
1148        routine.last_fired_at_ms = Some(fired_at_ms);
1149        let updated = routine.clone();
1150        drop(guard);
1151        let _ = self.persist_routines().await;
1152        Some(updated)
1153    }
1154
1155    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1156        let mut history = self.routine_history.write().await;
1157        history
1158            .entry(event.routine_id.clone())
1159            .or_default()
1160            .push(event);
1161        drop(history);
1162        let _ = self.persist_routine_history().await;
1163    }
1164
1165    pub async fn list_routine_history(
1166        &self,
1167        routine_id: &str,
1168        limit: usize,
1169    ) -> Vec<RoutineHistoryEvent> {
1170        let limit = limit.clamp(1, 500);
1171        let mut rows = self
1172            .routine_history
1173            .read()
1174            .await
1175            .get(routine_id)
1176            .cloned()
1177            .unwrap_or_default();
1178        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1179        rows.truncate(limit);
1180        rows
1181    }
1182
1183    pub async fn create_routine_run(
1184        &self,
1185        routine: &RoutineSpec,
1186        trigger_type: &str,
1187        run_count: u32,
1188        status: RoutineRunStatus,
1189        detail: Option<String>,
1190    ) -> RoutineRunRecord {
1191        let now = now_ms();
1192        let record = RoutineRunRecord {
1193            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1194            routine_id: routine.routine_id.clone(),
1195            trigger_type: trigger_type.to_string(),
1196            run_count,
1197            status,
1198            created_at_ms: now,
1199            updated_at_ms: now,
1200            fired_at_ms: Some(now),
1201            started_at_ms: None,
1202            finished_at_ms: None,
1203            requires_approval: routine.requires_approval,
1204            approval_reason: None,
1205            denial_reason: None,
1206            paused_reason: None,
1207            detail,
1208            entrypoint: routine.entrypoint.clone(),
1209            args: routine.args.clone(),
1210            allowed_tools: routine.allowed_tools.clone(),
1211            output_targets: routine.output_targets.clone(),
1212            artifacts: Vec::new(),
1213        };
1214        self.routine_runs
1215            .write()
1216            .await
1217            .insert(record.run_id.clone(), record.clone());
1218        let _ = self.persist_routine_runs().await;
1219        record
1220    }
1221
1222    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1223        self.routine_runs.read().await.get(run_id).cloned()
1224    }
1225
1226    pub async fn list_routine_runs(
1227        &self,
1228        routine_id: Option<&str>,
1229        limit: usize,
1230    ) -> Vec<RoutineRunRecord> {
1231        let mut rows = self
1232            .routine_runs
1233            .read()
1234            .await
1235            .values()
1236            .filter(|row| {
1237                if let Some(id) = routine_id {
1238                    row.routine_id == id
1239                } else {
1240                    true
1241                }
1242            })
1243            .cloned()
1244            .collect::<Vec<_>>();
1245        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1246        rows.truncate(limit.clamp(1, 500));
1247        rows
1248    }
1249
1250    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1251        let mut guard = self.routine_runs.write().await;
1252        let next_run_id = guard
1253            .values()
1254            .filter(|row| row.status == RoutineRunStatus::Queued)
1255            .min_by(|a, b| {
1256                a.created_at_ms
1257                    .cmp(&b.created_at_ms)
1258                    .then_with(|| a.run_id.cmp(&b.run_id))
1259            })
1260            .map(|row| row.run_id.clone())?;
1261        let now = now_ms();
1262        let row = guard.get_mut(&next_run_id)?;
1263        row.status = RoutineRunStatus::Running;
1264        row.updated_at_ms = now;
1265        row.started_at_ms = Some(now);
1266        let claimed = row.clone();
1267        drop(guard);
1268        let _ = self.persist_routine_runs().await;
1269        Some(claimed)
1270    }
1271
1272    pub async fn set_routine_session_policy(
1273        &self,
1274        session_id: String,
1275        run_id: String,
1276        routine_id: String,
1277        allowed_tools: Vec<String>,
1278    ) {
1279        let policy = RoutineSessionPolicy {
1280            session_id: session_id.clone(),
1281            run_id,
1282            routine_id,
1283            allowed_tools: normalize_allowed_tools(allowed_tools),
1284        };
1285        self.routine_session_policies
1286            .write()
1287            .await
1288            .insert(session_id, policy);
1289    }
1290
1291    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1292        self.routine_session_policies
1293            .read()
1294            .await
1295            .get(session_id)
1296            .cloned()
1297    }
1298
1299    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1300        self.routine_session_policies
1301            .write()
1302            .await
1303            .remove(session_id);
1304    }
1305
1306    pub async fn update_routine_run_status(
1307        &self,
1308        run_id: &str,
1309        status: RoutineRunStatus,
1310        reason: Option<String>,
1311    ) -> Option<RoutineRunRecord> {
1312        let mut guard = self.routine_runs.write().await;
1313        let row = guard.get_mut(run_id)?;
1314        row.status = status.clone();
1315        row.updated_at_ms = now_ms();
1316        match status {
1317            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1318            RoutineRunStatus::Running => {
1319                row.started_at_ms.get_or_insert_with(now_ms);
1320                if let Some(detail) = reason {
1321                    row.detail = Some(detail);
1322                }
1323            }
1324            RoutineRunStatus::Denied => row.denial_reason = reason,
1325            RoutineRunStatus::Paused => row.paused_reason = reason,
1326            RoutineRunStatus::Completed
1327            | RoutineRunStatus::Failed
1328            | RoutineRunStatus::Cancelled => {
1329                row.finished_at_ms = Some(now_ms());
1330                if let Some(detail) = reason {
1331                    row.detail = Some(detail);
1332                }
1333            }
1334            _ => {
1335                if let Some(detail) = reason {
1336                    row.detail = Some(detail);
1337                }
1338            }
1339        }
1340        let updated = row.clone();
1341        drop(guard);
1342        let _ = self.persist_routine_runs().await;
1343        Some(updated)
1344    }
1345
1346    pub async fn append_routine_run_artifact(
1347        &self,
1348        run_id: &str,
1349        artifact: RoutineRunArtifact,
1350    ) -> Option<RoutineRunRecord> {
1351        let mut guard = self.routine_runs.write().await;
1352        let row = guard.get_mut(run_id)?;
1353        row.updated_at_ms = now_ms();
1354        row.artifacts.push(artifact);
1355        let updated = row.clone();
1356        drop(guard);
1357        let _ = self.persist_routine_runs().await;
1358        Some(updated)
1359    }
1360}
1361
1362async fn build_channels_config(
1363    state: &AppState,
1364    channels: &ChannelsConfigFile,
1365) -> Option<ChannelsConfig> {
1366    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1367        return None;
1368    }
1369    Some(ChannelsConfig {
1370        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1371            bot_token: cfg.bot_token,
1372            allowed_users: cfg.allowed_users,
1373            mention_only: cfg.mention_only,
1374        }),
1375        discord: channels.discord.clone().map(|cfg| DiscordConfig {
1376            bot_token: cfg.bot_token,
1377            guild_id: cfg.guild_id,
1378            allowed_users: cfg.allowed_users,
1379            mention_only: cfg.mention_only,
1380        }),
1381        slack: channels.slack.clone().map(|cfg| SlackConfig {
1382            bot_token: cfg.bot_token,
1383            channel_id: cfg.channel_id,
1384            allowed_users: cfg.allowed_users,
1385        }),
1386        server_base_url: state.server_base_url(),
1387        api_token: state.api_token().await.unwrap_or_default(),
1388        tool_policy: channels.tool_policy.clone(),
1389    })
1390}
1391
1392fn normalize_web_ui_prefix(prefix: &str) -> String {
1393    let trimmed = prefix.trim();
1394    if trimmed.is_empty() || trimmed == "/" {
1395        return "/admin".to_string();
1396    }
1397    let with_leading = if trimmed.starts_with('/') {
1398        trimmed.to_string()
1399    } else {
1400        format!("/{trimmed}")
1401    };
1402    with_leading.trim_end_matches('/').to_string()
1403}
1404
1405fn default_web_ui_prefix() -> String {
1406    "/admin".to_string()
1407}
1408
1409fn default_allow_all() -> Vec<String> {
1410    vec!["*".to_string()]
1411}
1412
1413fn default_discord_mention_only() -> bool {
1414    true
1415}
1416
1417fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
1418    normalize_non_empty_list(raw)
1419}
1420
1421fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1422    let mut out = Vec::new();
1423    let mut seen = std::collections::HashSet::new();
1424    for item in raw {
1425        let normalized = item.trim().to_string();
1426        if normalized.is_empty() {
1427            continue;
1428        }
1429        if seen.insert(normalized.clone()) {
1430            out.push(normalized);
1431        }
1432    }
1433    out
1434}
1435
1436fn resolve_run_stale_ms() -> u64 {
1437    std::env::var("TANDEM_RUN_STALE_MS")
1438        .ok()
1439        .and_then(|v| v.trim().parse::<u64>().ok())
1440        .unwrap_or(120_000)
1441        .clamp(30_000, 600_000)
1442}
1443
1444fn resolve_shared_resources_path() -> PathBuf {
1445    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1446        let trimmed = dir.trim();
1447        if !trimmed.is_empty() {
1448            return PathBuf::from(trimmed).join("shared_resources.json");
1449        }
1450    }
1451    default_state_dir().join("shared_resources.json")
1452}
1453
1454fn resolve_routines_path() -> PathBuf {
1455    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1456        let trimmed = dir.trim();
1457        if !trimmed.is_empty() {
1458            return PathBuf::from(trimmed).join("routines.json");
1459        }
1460    }
1461    default_state_dir().join("routines.json")
1462}
1463
1464fn resolve_routine_history_path() -> PathBuf {
1465    if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
1466        let trimmed = root.trim();
1467        if !trimmed.is_empty() {
1468            return PathBuf::from(trimmed).join("routine_history.json");
1469        }
1470    }
1471    default_state_dir().join("routine_history.json")
1472}
1473
1474fn resolve_routine_runs_path() -> PathBuf {
1475    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
1476        let trimmed = root.trim();
1477        if !trimmed.is_empty() {
1478            return PathBuf::from(trimmed).join("routine_runs.json");
1479        }
1480    }
1481    default_state_dir().join("routine_runs.json")
1482}
1483
1484fn resolve_agent_team_audit_path() -> PathBuf {
1485    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
1486        let trimmed = base.trim();
1487        if !trimmed.is_empty() {
1488            return PathBuf::from(trimmed)
1489                .join("agent-team")
1490                .join("audit.log.jsonl");
1491        }
1492    }
1493    default_state_dir()
1494        .join("agent-team")
1495        .join("audit.log.jsonl")
1496}
1497
1498fn default_state_dir() -> PathBuf {
1499    if let Ok(paths) = resolve_shared_paths() {
1500        return paths.engine_state_dir;
1501    }
1502    if let Some(data_dir) = dirs::data_dir() {
1503        return data_dir.join("tandem").join("data");
1504    }
1505    dirs::home_dir()
1506        .map(|home| home.join(".tandem").join("data"))
1507        .unwrap_or_else(|| PathBuf::from(".tandem"))
1508}
1509
1510fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
1511    match schedule {
1512        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
1513        RoutineSchedule::Cron { .. } => None,
1514    }
1515}
1516
1517fn compute_misfire_plan(
1518    now_ms: u64,
1519    next_fire_at_ms: u64,
1520    interval_ms: u64,
1521    policy: &RoutineMisfirePolicy,
1522) -> (u32, u64) {
1523    if now_ms < next_fire_at_ms || interval_ms == 0 {
1524        return (0, next_fire_at_ms);
1525    }
1526    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
1527    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
1528    match policy {
1529        RoutineMisfirePolicy::Skip => (0, aligned_next),
1530        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
1531        RoutineMisfirePolicy::CatchUp { max_runs } => {
1532            let count = missed.min(u64::from(*max_runs)) as u32;
1533            (count, aligned_next)
1534        }
1535    }
1536}
1537
1538#[derive(Debug, Clone, PartialEq, Eq)]
1539pub enum RoutineExecutionDecision {
1540    Allowed,
1541    RequiresApproval { reason: String },
1542    Blocked { reason: String },
1543}
1544
1545pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
1546    let entrypoint = routine.entrypoint.to_ascii_lowercase();
1547    if entrypoint.starts_with("connector.")
1548        || entrypoint.starts_with("integration.")
1549        || entrypoint.contains("external")
1550    {
1551        return true;
1552    }
1553    routine
1554        .args
1555        .get("uses_external_integrations")
1556        .and_then(|v| v.as_bool())
1557        .unwrap_or(false)
1558        || routine
1559            .args
1560            .get("connector_id")
1561            .and_then(|v| v.as_str())
1562            .is_some()
1563}
1564
1565pub fn evaluate_routine_execution_policy(
1566    routine: &RoutineSpec,
1567    trigger_type: &str,
1568) -> RoutineExecutionDecision {
1569    if !routine_uses_external_integrations(routine) {
1570        return RoutineExecutionDecision::Allowed;
1571    }
1572    if !routine.external_integrations_allowed {
1573        return RoutineExecutionDecision::Blocked {
1574            reason: "external integrations are disabled by policy".to_string(),
1575        };
1576    }
1577    if routine.requires_approval {
1578        return RoutineExecutionDecision::RequiresApproval {
1579            reason: format!(
1580                "manual approval required before external side effects ({})",
1581                trigger_type
1582            ),
1583        };
1584    }
1585    RoutineExecutionDecision::Allowed
1586}
1587
1588fn is_valid_resource_key(key: &str) -> bool {
1589    let trimmed = key.trim();
1590    if trimmed.is_empty() {
1591        return false;
1592    }
1593    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
1594    if !allowed_prefix
1595        .iter()
1596        .any(|prefix| trimmed.starts_with(prefix))
1597    {
1598        return false;
1599    }
1600    !trimmed.contains("//")
1601}
1602
1603impl Deref for AppState {
1604    type Target = RuntimeState;
1605
1606    fn deref(&self) -> &Self::Target {
1607        self.runtime
1608            .get()
1609            .expect("runtime accessed before startup completion")
1610    }
1611}
1612
1613#[derive(Clone)]
1614struct ServerPromptContextHook {
1615    state: AppState,
1616}
1617
1618impl ServerPromptContextHook {
1619    fn new(state: AppState) -> Self {
1620        Self { state }
1621    }
1622
1623    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
1624        let paths = resolve_shared_paths().ok()?;
1625        MemoryDatabase::new(&paths.memory_db_path).await.ok()
1626    }
1627
1628    fn hash_query(input: &str) -> String {
1629        let mut hasher = Sha256::new();
1630        hasher.update(input.as_bytes());
1631        format!("{:x}", hasher.finalize())
1632    }
1633
1634    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
1635        let mut out = vec!["<memory_context>".to_string()];
1636        let mut used = 0usize;
1637        for hit in hits {
1638            let text = hit
1639                .record
1640                .content
1641                .split_whitespace()
1642                .take(60)
1643                .collect::<Vec<_>>()
1644                .join(" ");
1645            let line = format!(
1646                "- [{:.3}] {} (source={}, run={})",
1647                hit.score, text, hit.record.source_type, hit.record.run_id
1648            );
1649            used = used.saturating_add(line.len());
1650            if used > 2200 {
1651                break;
1652            }
1653            out.push(line);
1654        }
1655        out.push("</memory_context>".to_string());
1656        out.join("\n")
1657    }
1658}
1659
1660impl PromptContextHook for ServerPromptContextHook {
1661    fn augment_provider_messages(
1662        &self,
1663        ctx: PromptContextHookContext,
1664        mut messages: Vec<ChatMessage>,
1665    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
1666        let this = self.clone();
1667        Box::pin(async move {
1668            let run = this.state.run_registry.get(&ctx.session_id).await;
1669            let Some(run) = run else {
1670                return Ok(messages);
1671            };
1672            let run_id = run.run_id;
1673            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1674            let query = messages
1675                .iter()
1676                .rev()
1677                .find(|m| m.role == "user")
1678                .map(|m| m.content.clone())
1679                .unwrap_or_default();
1680            if query.trim().is_empty() {
1681                return Ok(messages);
1682            }
1683
1684            let Some(db) = this.open_memory_db().await else {
1685                return Ok(messages);
1686            };
1687            let started = now_ms();
1688            let hits = db
1689                .search_global_memory(&user_id, &query, 8, None, None, None)
1690                .await
1691                .unwrap_or_default();
1692            let latency_ms = now_ms().saturating_sub(started);
1693            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1694            this.state.event_bus.publish(EngineEvent::new(
1695                "memory.search.performed",
1696                json!({
1697                    "runID": run_id,
1698                    "sessionID": ctx.session_id,
1699                    "messageID": ctx.message_id,
1700                    "providerID": ctx.provider_id,
1701                    "modelID": ctx.model_id,
1702                    "iteration": ctx.iteration,
1703                    "queryHash": Self::hash_query(&query),
1704                    "resultCount": hits.len(),
1705                    "scoreMin": scores.iter().copied().reduce(f64::min),
1706                    "scoreMax": scores.iter().copied().reduce(f64::max),
1707                    "scores": scores,
1708                    "latencyMs": latency_ms,
1709                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1710                }),
1711            ));
1712
1713            if hits.is_empty() {
1714                return Ok(messages);
1715            }
1716
1717            let memory_block = Self::build_memory_block(&hits);
1718            messages.push(ChatMessage {
1719                role: "system".to_string(),
1720                content: memory_block.clone(),
1721            });
1722            this.state.event_bus.publish(EngineEvent::new(
1723                "memory.context.injected",
1724                json!({
1725                    "runID": run_id,
1726                    "sessionID": ctx.session_id,
1727                    "messageID": ctx.message_id,
1728                    "iteration": ctx.iteration,
1729                    "count": hits.len(),
1730                    "tokenSizeApprox": memory_block.split_whitespace().count(),
1731                }),
1732            ));
1733            Ok(messages)
1734        })
1735    }
1736}
1737
1738fn extract_event_session_id(properties: &Value) -> Option<String> {
1739    properties
1740        .get("sessionID")
1741        .or_else(|| properties.get("sessionId"))
1742        .or_else(|| properties.get("id"))
1743        .and_then(|v| v.as_str())
1744        .map(|s| s.to_string())
1745}
1746
1747fn extract_event_run_id(properties: &Value) -> Option<String> {
1748    properties
1749        .get("runID")
1750        .or_else(|| properties.get("run_id"))
1751        .and_then(|v| v.as_str())
1752        .map(|s| s.to_string())
1753}
1754
1755fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1756    let session_id = extract_event_session_id(&event.properties)?;
1757    let run_id = extract_event_run_id(&event.properties);
1758    let key = format!("run/{session_id}/status");
1759
1760    let mut base = serde_json::Map::new();
1761    base.insert("sessionID".to_string(), Value::String(session_id));
1762    if let Some(run_id) = run_id {
1763        base.insert("runID".to_string(), Value::String(run_id));
1764    }
1765
1766    match event.event_type.as_str() {
1767        "session.run.started" => {
1768            base.insert("state".to_string(), Value::String("running".to_string()));
1769            base.insert("phase".to_string(), Value::String("run".to_string()));
1770            base.insert(
1771                "eventType".to_string(),
1772                Value::String("session.run.started".to_string()),
1773            );
1774            Some(StatusIndexUpdate {
1775                key,
1776                value: Value::Object(base),
1777            })
1778        }
1779        "session.run.finished" => {
1780            base.insert("state".to_string(), Value::String("finished".to_string()));
1781            base.insert("phase".to_string(), Value::String("run".to_string()));
1782            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1783                base.insert("result".to_string(), Value::String(status.to_string()));
1784            }
1785            base.insert(
1786                "eventType".to_string(),
1787                Value::String("session.run.finished".to_string()),
1788            );
1789            Some(StatusIndexUpdate {
1790                key,
1791                value: Value::Object(base),
1792            })
1793        }
1794        "message.part.updated" => {
1795            let part_type = event
1796                .properties
1797                .get("part")
1798                .and_then(|v| v.get("type"))
1799                .and_then(|v| v.as_str())?;
1800            let (phase, tool_active) = match part_type {
1801                "tool-invocation" => ("tool", true),
1802                "tool-result" => ("run", false),
1803                _ => return None,
1804            };
1805            base.insert("state".to_string(), Value::String("running".to_string()));
1806            base.insert("phase".to_string(), Value::String(phase.to_string()));
1807            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1808            if let Some(tool_name) = event
1809                .properties
1810                .get("part")
1811                .and_then(|v| v.get("tool"))
1812                .and_then(|v| v.as_str())
1813            {
1814                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1815            }
1816            base.insert(
1817                "eventType".to_string(),
1818                Value::String("message.part.updated".to_string()),
1819            );
1820            Some(StatusIndexUpdate {
1821                key,
1822                value: Value::Object(base),
1823            })
1824        }
1825        _ => None,
1826    }
1827}
1828
1829pub async fn run_status_indexer(state: AppState) {
1830    let mut rx = state.event_bus.subscribe();
1831    loop {
1832        match rx.recv().await {
1833            Ok(event) => {
1834                if let Some(update) = derive_status_index_update(&event) {
1835                    if let Err(error) = state
1836                        .put_shared_resource(
1837                            update.key,
1838                            update.value,
1839                            None,
1840                            "system.status_indexer".to_string(),
1841                            None,
1842                        )
1843                        .await
1844                    {
1845                        tracing::warn!("status indexer failed to persist update: {error:?}");
1846                    }
1847                }
1848            }
1849            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1850            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1851        }
1852    }
1853}
1854
1855pub async fn run_agent_team_supervisor(state: AppState) {
1856    let mut rx = state.event_bus.subscribe();
1857    loop {
1858        match rx.recv().await {
1859            Ok(event) => {
1860                state.agent_teams.handle_engine_event(&state, &event).await;
1861            }
1862            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1863            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1864        }
1865    }
1866}
1867
1868pub async fn run_routine_scheduler(state: AppState) {
1869    loop {
1870        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1871        let now = now_ms();
1872        let plans = state.evaluate_routine_misfires(now).await;
1873        for plan in plans {
1874            let Some(routine) = state.get_routine(&plan.routine_id).await else {
1875                continue;
1876            };
1877            match evaluate_routine_execution_policy(&routine, "scheduled") {
1878                RoutineExecutionDecision::Allowed => {
1879                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
1880                    let run = state
1881                        .create_routine_run(
1882                            &routine,
1883                            "scheduled",
1884                            plan.run_count,
1885                            RoutineRunStatus::Queued,
1886                            None,
1887                        )
1888                        .await;
1889                    state
1890                        .append_routine_history(RoutineHistoryEvent {
1891                            routine_id: plan.routine_id.clone(),
1892                            trigger_type: "scheduled".to_string(),
1893                            run_count: plan.run_count,
1894                            fired_at_ms: now,
1895                            status: "queued".to_string(),
1896                            detail: None,
1897                        })
1898                        .await;
1899                    state.event_bus.publish(EngineEvent::new(
1900                        "routine.fired",
1901                        serde_json::json!({
1902                            "routineID": plan.routine_id,
1903                            "runID": run.run_id,
1904                            "runCount": plan.run_count,
1905                            "scheduledAtMs": plan.scheduled_at_ms,
1906                            "nextFireAtMs": plan.next_fire_at_ms,
1907                        }),
1908                    ));
1909                    state.event_bus.publish(EngineEvent::new(
1910                        "routine.run.created",
1911                        serde_json::json!({
1912                            "run": run,
1913                        }),
1914                    ));
1915                }
1916                RoutineExecutionDecision::RequiresApproval { reason } => {
1917                    let run = state
1918                        .create_routine_run(
1919                            &routine,
1920                            "scheduled",
1921                            plan.run_count,
1922                            RoutineRunStatus::PendingApproval,
1923                            Some(reason.clone()),
1924                        )
1925                        .await;
1926                    state
1927                        .append_routine_history(RoutineHistoryEvent {
1928                            routine_id: plan.routine_id.clone(),
1929                            trigger_type: "scheduled".to_string(),
1930                            run_count: plan.run_count,
1931                            fired_at_ms: now,
1932                            status: "pending_approval".to_string(),
1933                            detail: Some(reason.clone()),
1934                        })
1935                        .await;
1936                    state.event_bus.publish(EngineEvent::new(
1937                        "routine.approval_required",
1938                        serde_json::json!({
1939                            "routineID": plan.routine_id,
1940                            "runID": run.run_id,
1941                            "runCount": plan.run_count,
1942                            "triggerType": "scheduled",
1943                            "reason": reason,
1944                        }),
1945                    ));
1946                    state.event_bus.publish(EngineEvent::new(
1947                        "routine.run.created",
1948                        serde_json::json!({
1949                            "run": run,
1950                        }),
1951                    ));
1952                }
1953                RoutineExecutionDecision::Blocked { reason } => {
1954                    let run = state
1955                        .create_routine_run(
1956                            &routine,
1957                            "scheduled",
1958                            plan.run_count,
1959                            RoutineRunStatus::BlockedPolicy,
1960                            Some(reason.clone()),
1961                        )
1962                        .await;
1963                    state
1964                        .append_routine_history(RoutineHistoryEvent {
1965                            routine_id: plan.routine_id.clone(),
1966                            trigger_type: "scheduled".to_string(),
1967                            run_count: plan.run_count,
1968                            fired_at_ms: now,
1969                            status: "blocked_policy".to_string(),
1970                            detail: Some(reason.clone()),
1971                        })
1972                        .await;
1973                    state.event_bus.publish(EngineEvent::new(
1974                        "routine.blocked",
1975                        serde_json::json!({
1976                            "routineID": plan.routine_id,
1977                            "runID": run.run_id,
1978                            "runCount": plan.run_count,
1979                            "triggerType": "scheduled",
1980                            "reason": reason,
1981                        }),
1982                    ));
1983                    state.event_bus.publish(EngineEvent::new(
1984                        "routine.run.created",
1985                        serde_json::json!({
1986                            "run": run,
1987                        }),
1988                    ));
1989                }
1990            }
1991        }
1992    }
1993}
1994
1995pub async fn run_routine_executor(state: AppState) {
1996    loop {
1997        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1998        let Some(run) = state.claim_next_queued_routine_run().await else {
1999            continue;
2000        };
2001
2002        state.event_bus.publish(EngineEvent::new(
2003            "routine.run.started",
2004            serde_json::json!({
2005                "runID": run.run_id,
2006                "routineID": run.routine_id,
2007                "triggerType": run.trigger_type,
2008                "startedAtMs": now_ms(),
2009            }),
2010        ));
2011
2012        let workspace_root = state.workspace_index.snapshot().await.root;
2013        let mut session = Session::new(
2014            Some(format!("Routine {}", run.routine_id)),
2015            Some(workspace_root.clone()),
2016        );
2017        let session_id = session.id.clone();
2018        session.workspace_root = Some(workspace_root);
2019
2020        if let Err(error) = state.storage.save_session(session).await {
2021            let detail = format!("failed to create routine session: {error}");
2022            let _ = state
2023                .update_routine_run_status(
2024                    &run.run_id,
2025                    RoutineRunStatus::Failed,
2026                    Some(detail.clone()),
2027                )
2028                .await;
2029            state.event_bus.publish(EngineEvent::new(
2030                "routine.run.failed",
2031                serde_json::json!({
2032                    "runID": run.run_id,
2033                    "routineID": run.routine_id,
2034                    "reason": detail,
2035                }),
2036            ));
2037            continue;
2038        }
2039
2040        state
2041            .set_routine_session_policy(
2042                session_id.clone(),
2043                run.run_id.clone(),
2044                run.routine_id.clone(),
2045                run.allowed_tools.clone(),
2046            )
2047            .await;
2048        state
2049            .engine_loop
2050            .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
2051            .await;
2052
2053        let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
2054        if let Some(spec) = selected_model.as_ref() {
2055            state.event_bus.publish(EngineEvent::new(
2056                "routine.run.model_selected",
2057                serde_json::json!({
2058                    "runID": run.run_id,
2059                    "routineID": run.routine_id,
2060                    "providerID": spec.provider_id,
2061                    "modelID": spec.model_id,
2062                    "source": model_source,
2063                }),
2064            ));
2065        }
2066
2067        let request = SendMessageRequest {
2068            parts: vec![MessagePartInput::Text {
2069                text: build_routine_prompt(&state, &run).await,
2070            }],
2071            model: selected_model,
2072            agent: None,
2073        };
2074
2075        let run_result = state
2076            .engine_loop
2077            .run_prompt_async_with_context(
2078                session_id.clone(),
2079                request,
2080                Some(format!("routine:{}", run.run_id)),
2081            )
2082            .await;
2083
2084        state.clear_routine_session_policy(&session_id).await;
2085        state
2086            .engine_loop
2087            .clear_session_allowed_tools(&session_id)
2088            .await;
2089
2090        match run_result {
2091            Ok(()) => {
2092                append_configured_output_artifacts(&state, &run).await;
2093                let _ = state
2094                    .update_routine_run_status(
2095                        &run.run_id,
2096                        RoutineRunStatus::Completed,
2097                        Some("routine run completed".to_string()),
2098                    )
2099                    .await;
2100                state.event_bus.publish(EngineEvent::new(
2101                    "routine.run.completed",
2102                    serde_json::json!({
2103                        "runID": run.run_id,
2104                        "routineID": run.routine_id,
2105                        "sessionID": session_id,
2106                        "finishedAtMs": now_ms(),
2107                    }),
2108                ));
2109            }
2110            Err(error) => {
2111                let detail = truncate_text(&error.to_string(), 500);
2112                let _ = state
2113                    .update_routine_run_status(
2114                        &run.run_id,
2115                        RoutineRunStatus::Failed,
2116                        Some(detail.clone()),
2117                    )
2118                    .await;
2119                state.event_bus.publish(EngineEvent::new(
2120                    "routine.run.failed",
2121                    serde_json::json!({
2122                        "runID": run.run_id,
2123                        "routineID": run.routine_id,
2124                        "sessionID": session_id,
2125                        "reason": detail,
2126                        "finishedAtMs": now_ms(),
2127                    }),
2128                ));
2129            }
2130        }
2131    }
2132}
2133
2134async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
2135    let normalized_entrypoint = run.entrypoint.trim();
2136    let known_tool = state
2137        .tools
2138        .list()
2139        .await
2140        .into_iter()
2141        .any(|schema| schema.name == normalized_entrypoint);
2142    if known_tool {
2143        let args = if run.args.is_object() {
2144            run.args.clone()
2145        } else {
2146            serde_json::json!({})
2147        };
2148        return format!("/tool {} {}", normalized_entrypoint, args);
2149    }
2150
2151    if let Some(objective) = routine_objective_from_args(run) {
2152        return build_routine_mission_prompt(run, &objective);
2153    }
2154
2155    format!(
2156        "Execute routine '{}' using entrypoint '{}' with args: {}",
2157        run.routine_id, run.entrypoint, run.args
2158    )
2159}
2160
2161fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
2162    run.args
2163        .get("prompt")
2164        .and_then(|v| v.as_str())
2165        .map(str::trim)
2166        .filter(|v| !v.is_empty())
2167        .map(ToString::to_string)
2168}
2169
2170fn routine_mode_from_args(args: &Value) -> &str {
2171    args.get("mode")
2172        .and_then(|v| v.as_str())
2173        .map(str::trim)
2174        .filter(|v| !v.is_empty())
2175        .unwrap_or("standalone")
2176}
2177
2178fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
2179    args.get("success_criteria")
2180        .and_then(|v| v.as_array())
2181        .map(|rows| {
2182            rows.iter()
2183                .filter_map(|row| row.as_str())
2184                .map(str::trim)
2185                .filter(|row| !row.is_empty())
2186                .map(ToString::to_string)
2187                .collect::<Vec<_>>()
2188        })
2189        .unwrap_or_default()
2190}
2191
2192fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
2193    let mode = routine_mode_from_args(&run.args);
2194    let success_criteria = routine_success_criteria_from_args(&run.args);
2195    let orchestrator_only_tool_calls = run
2196        .args
2197        .get("orchestrator_only_tool_calls")
2198        .and_then(|v| v.as_bool())
2199        .unwrap_or(false);
2200
2201    let mut lines = vec![
2202        format!("Automation ID: {}", run.routine_id),
2203        format!("Run ID: {}", run.run_id),
2204        format!("Mode: {}", mode),
2205        format!("Mission Objective: {}", objective),
2206    ];
2207
2208    if !success_criteria.is_empty() {
2209        lines.push("Success Criteria:".to_string());
2210        for criterion in success_criteria {
2211            lines.push(format!("- {}", criterion));
2212        }
2213    }
2214
2215    if run.allowed_tools.is_empty() {
2216        lines.push("Allowed Tools: all available by current policy".to_string());
2217    } else {
2218        lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
2219    }
2220
2221    if run.output_targets.is_empty() {
2222        lines.push("Output Targets: none configured".to_string());
2223    } else {
2224        lines.push("Output Targets:".to_string());
2225        for target in &run.output_targets {
2226            lines.push(format!("- {}", target));
2227        }
2228    }
2229
2230    if mode.eq_ignore_ascii_case("orchestrated") {
2231        lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
2232        lines
2233            .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
2234        if orchestrator_only_tool_calls {
2235            lines.push(
2236                "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
2237                    .to_string(),
2238            );
2239        }
2240    } else {
2241        lines.push("Execution Pattern: Standalone mission run".to_string());
2242    }
2243
2244    lines.push(
2245        "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
2246            .to_string(),
2247    );
2248
2249    lines.join("\n")
2250}
2251
2252fn truncate_text(input: &str, max_len: usize) -> String {
2253    if input.len() <= max_len {
2254        return input.to_string();
2255    }
2256    let mut out = input[..max_len].to_string();
2257    out.push_str("...<truncated>");
2258    out
2259}
2260
2261async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
2262    if run.output_targets.is_empty() {
2263        return;
2264    }
2265    for target in &run.output_targets {
2266        let artifact = RoutineRunArtifact {
2267            artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
2268            uri: target.clone(),
2269            kind: "output_target".to_string(),
2270            label: Some("configured output target".to_string()),
2271            created_at_ms: now_ms(),
2272            metadata: Some(serde_json::json!({
2273                "source": "routine.output_targets",
2274                "runID": run.run_id,
2275                "routineID": run.routine_id,
2276            })),
2277        };
2278        let _ = state
2279            .append_routine_run_artifact(&run.run_id, artifact.clone())
2280            .await;
2281        state.event_bus.publish(EngineEvent::new(
2282            "routine.run.artifact_added",
2283            serde_json::json!({
2284                "runID": run.run_id,
2285                "routineID": run.routine_id,
2286                "artifact": artifact,
2287            }),
2288        ));
2289    }
2290}
2291
2292fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
2293    let obj = value.as_object()?;
2294    let provider_id = obj.get("provider_id")?.as_str()?.trim();
2295    let model_id = obj.get("model_id")?.as_str()?.trim();
2296    if provider_id.is_empty() || model_id.is_empty() {
2297        return None;
2298    }
2299    Some(ModelSpec {
2300        provider_id: provider_id.to_string(),
2301        model_id: model_id.to_string(),
2302    })
2303}
2304
2305fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
2306    args.get("model_policy")
2307        .and_then(|v| v.get("role_models"))
2308        .and_then(|v| v.get(role))
2309        .and_then(parse_model_spec)
2310}
2311
2312fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
2313    args.get("model_policy")
2314        .and_then(|v| v.get("default_model"))
2315        .and_then(parse_model_spec)
2316}
2317
2318fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
2319    providers.iter().any(|provider| {
2320        provider.id == spec.provider_id
2321            && provider
2322                .models
2323                .iter()
2324                .any(|model| model.id == spec.model_id)
2325    })
2326}
2327
2328async fn resolve_routine_model_spec_for_run(
2329    state: &AppState,
2330    run: &RoutineRunRecord,
2331) -> (Option<ModelSpec>, String) {
2332    let providers = state.providers.list().await;
2333    let mode = routine_mode_from_args(&run.args);
2334    let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
2335
2336    if mode.eq_ignore_ascii_case("orchestrated") {
2337        if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
2338            requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
2339        }
2340    }
2341    if let Some(default_model) = default_model_spec_from_args(&run.args) {
2342        requested.push((default_model, "args.model_policy.default_model"));
2343    }
2344
2345    for (candidate, source) in requested {
2346        if provider_catalog_has_model(&providers, &candidate) {
2347            return (Some(candidate), source.to_string());
2348        }
2349    }
2350
2351    let fallback = providers
2352        .into_iter()
2353        .find(|provider| !provider.models.is_empty())
2354        .and_then(|provider| {
2355            let model = provider.models.first()?;
2356            Some(ModelSpec {
2357                provider_id: provider.id,
2358                model_id: model.id.clone(),
2359            })
2360        });
2361
2362    (fallback, "provider_catalog_fallback".to_string())
2363}
2364
2365#[cfg(test)]
2366mod tests {
2367    use super::*;
2368
2369    fn test_state_with_path(path: PathBuf) -> AppState {
2370        let mut state = AppState::new_starting("test-attempt".to_string(), true);
2371        state.shared_resources_path = path;
2372        state.routines_path = tmp_routines_file("shared-state");
2373        state.routine_history_path = tmp_routines_file("routine-history");
2374        state.routine_runs_path = tmp_routines_file("routine-runs");
2375        state
2376    }
2377
2378    fn tmp_resource_file(name: &str) -> PathBuf {
2379        std::env::temp_dir().join(format!(
2380            "tandem-server-{name}-{}.json",
2381            uuid::Uuid::new_v4()
2382        ))
2383    }
2384
2385    fn tmp_routines_file(name: &str) -> PathBuf {
2386        std::env::temp_dir().join(format!(
2387            "tandem-server-routines-{name}-{}.json",
2388            uuid::Uuid::new_v4()
2389        ))
2390    }
2391
2392    #[tokio::test]
2393    async fn shared_resource_put_increments_revision() {
2394        let path = tmp_resource_file("shared-resource-put");
2395        let state = test_state_with_path(path.clone());
2396
2397        let first = state
2398            .put_shared_resource(
2399                "project/demo/board".to_string(),
2400                serde_json::json!({"status":"todo"}),
2401                None,
2402                "agent-1".to_string(),
2403                None,
2404            )
2405            .await
2406            .expect("first put");
2407        assert_eq!(first.rev, 1);
2408
2409        let second = state
2410            .put_shared_resource(
2411                "project/demo/board".to_string(),
2412                serde_json::json!({"status":"doing"}),
2413                Some(1),
2414                "agent-2".to_string(),
2415                Some(60_000),
2416            )
2417            .await
2418            .expect("second put");
2419        assert_eq!(second.rev, 2);
2420        assert_eq!(second.updated_by, "agent-2");
2421        assert_eq!(second.ttl_ms, Some(60_000));
2422
2423        let raw = tokio::fs::read_to_string(path.clone())
2424            .await
2425            .expect("persisted");
2426        assert!(raw.contains("\"rev\": 2"));
2427        let _ = tokio::fs::remove_file(path).await;
2428    }
2429
2430    #[tokio::test]
2431    async fn shared_resource_put_detects_revision_conflict() {
2432        let path = tmp_resource_file("shared-resource-conflict");
2433        let state = test_state_with_path(path.clone());
2434
2435        let _ = state
2436            .put_shared_resource(
2437                "mission/demo/card-1".to_string(),
2438                serde_json::json!({"title":"Card 1"}),
2439                None,
2440                "agent-1".to_string(),
2441                None,
2442            )
2443            .await
2444            .expect("seed put");
2445
2446        let conflict = state
2447            .put_shared_resource(
2448                "mission/demo/card-1".to_string(),
2449                serde_json::json!({"title":"Card 1 edited"}),
2450                Some(99),
2451                "agent-2".to_string(),
2452                None,
2453            )
2454            .await
2455            .expect_err("expected conflict");
2456
2457        match conflict {
2458            ResourceStoreError::RevisionConflict(conflict) => {
2459                assert_eq!(conflict.expected_rev, Some(99));
2460                assert_eq!(conflict.current_rev, Some(1));
2461            }
2462            other => panic!("unexpected error: {other:?}"),
2463        }
2464
2465        let _ = tokio::fs::remove_file(path).await;
2466    }
2467
2468    #[tokio::test]
2469    async fn shared_resource_rejects_invalid_namespace_key() {
2470        let path = tmp_resource_file("shared-resource-invalid-key");
2471        let state = test_state_with_path(path.clone());
2472
2473        let error = state
2474            .put_shared_resource(
2475                "global/demo/key".to_string(),
2476                serde_json::json!({"x":1}),
2477                None,
2478                "agent-1".to_string(),
2479                None,
2480            )
2481            .await
2482            .expect_err("invalid key should fail");
2483
2484        match error {
2485            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
2486            other => panic!("unexpected error: {other:?}"),
2487        }
2488
2489        assert!(!path.exists());
2490    }
2491
2492    #[test]
2493    fn derive_status_index_update_for_run_started() {
2494        let event = EngineEvent::new(
2495            "session.run.started",
2496            serde_json::json!({
2497                "sessionID": "s-1",
2498                "runID": "r-1"
2499            }),
2500        );
2501        let update = derive_status_index_update(&event).expect("update");
2502        assert_eq!(update.key, "run/s-1/status");
2503        assert_eq!(
2504            update.value.get("state").and_then(|v| v.as_str()),
2505            Some("running")
2506        );
2507        assert_eq!(
2508            update.value.get("phase").and_then(|v| v.as_str()),
2509            Some("run")
2510        );
2511    }
2512
2513    #[test]
2514    fn derive_status_index_update_for_tool_invocation() {
2515        let event = EngineEvent::new(
2516            "message.part.updated",
2517            serde_json::json!({
2518                "sessionID": "s-2",
2519                "runID": "r-2",
2520                "part": { "type": "tool-invocation", "tool": "todo_write" }
2521            }),
2522        );
2523        let update = derive_status_index_update(&event).expect("update");
2524        assert_eq!(update.key, "run/s-2/status");
2525        assert_eq!(
2526            update.value.get("phase").and_then(|v| v.as_str()),
2527            Some("tool")
2528        );
2529        assert_eq!(
2530            update.value.get("toolActive").and_then(|v| v.as_bool()),
2531            Some(true)
2532        );
2533        assert_eq!(
2534            update.value.get("tool").and_then(|v| v.as_str()),
2535            Some("todo_write")
2536        );
2537    }
2538
2539    #[test]
2540    fn misfire_skip_drops_runs_and_advances_next_fire() {
2541        let (count, next_fire) =
2542            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
2543        assert_eq!(count, 0);
2544        assert_eq!(next_fire, 11_000);
2545    }
2546
2547    #[test]
2548    fn misfire_run_once_emits_single_trigger() {
2549        let (count, next_fire) =
2550            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
2551        assert_eq!(count, 1);
2552        assert_eq!(next_fire, 11_000);
2553    }
2554
2555    #[test]
2556    fn misfire_catch_up_caps_trigger_count() {
2557        let (count, next_fire) = compute_misfire_plan(
2558            25_000,
2559            5_000,
2560            1_000,
2561            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2562        );
2563        assert_eq!(count, 3);
2564        assert_eq!(next_fire, 26_000);
2565    }
2566
2567    #[tokio::test]
2568    async fn routine_put_persists_and_loads() {
2569        let routines_path = tmp_routines_file("persist-load");
2570        let mut state = AppState::new_starting("routines-put".to_string(), true);
2571        state.routines_path = routines_path.clone();
2572
2573        let routine = RoutineSpec {
2574            routine_id: "routine-1".to_string(),
2575            name: "Digest".to_string(),
2576            status: RoutineStatus::Active,
2577            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2578            timezone: "UTC".to_string(),
2579            misfire_policy: RoutineMisfirePolicy::RunOnce,
2580            entrypoint: "mission.default".to_string(),
2581            args: serde_json::json!({"topic":"status"}),
2582            allowed_tools: vec![],
2583            output_targets: vec![],
2584            creator_type: "user".to_string(),
2585            creator_id: "user-1".to_string(),
2586            requires_approval: true,
2587            external_integrations_allowed: false,
2588            next_fire_at_ms: Some(5_000),
2589            last_fired_at_ms: None,
2590        };
2591
2592        state.put_routine(routine).await.expect("store routine");
2593
2594        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
2595        reloaded.routines_path = routines_path.clone();
2596        reloaded.load_routines().await.expect("load routines");
2597        let list = reloaded.list_routines().await;
2598        assert_eq!(list.len(), 1);
2599        assert_eq!(list[0].routine_id, "routine-1");
2600
2601        let _ = tokio::fs::remove_file(routines_path).await;
2602    }
2603
2604    #[tokio::test]
2605    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
2606        let routines_path = tmp_routines_file("misfire-eval");
2607        let mut state = AppState::new_starting("routines-eval".to_string(), true);
2608        state.routines_path = routines_path.clone();
2609
2610        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
2611            routine_id: id.to_string(),
2612            name: id.to_string(),
2613            status: RoutineStatus::Active,
2614            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
2615            timezone: "UTC".to_string(),
2616            misfire_policy: policy,
2617            entrypoint: "mission.default".to_string(),
2618            args: serde_json::json!({}),
2619            allowed_tools: vec![],
2620            output_targets: vec![],
2621            creator_type: "user".to_string(),
2622            creator_id: "u-1".to_string(),
2623            requires_approval: false,
2624            external_integrations_allowed: false,
2625            next_fire_at_ms: Some(5_000),
2626            last_fired_at_ms: None,
2627        };
2628
2629        state
2630            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
2631            .await
2632            .expect("put skip");
2633        state
2634            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
2635            .await
2636            .expect("put once");
2637        state
2638            .put_routine(base(
2639                "routine-catch",
2640                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2641            ))
2642            .await
2643            .expect("put catch");
2644
2645        let plans = state.evaluate_routine_misfires(10_500).await;
2646        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
2647        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
2648        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
2649
2650        assert!(plan_skip.is_none());
2651        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
2652        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
2653
2654        let stored = state.list_routines().await;
2655        let skip_next = stored
2656            .iter()
2657            .find(|r| r.routine_id == "routine-skip")
2658            .and_then(|r| r.next_fire_at_ms)
2659            .expect("skip next");
2660        assert!(skip_next > 10_500);
2661
2662        let _ = tokio::fs::remove_file(routines_path).await;
2663    }
2664
2665    #[test]
2666    fn routine_policy_blocks_external_side_effects_by_default() {
2667        let routine = RoutineSpec {
2668            routine_id: "routine-policy-1".to_string(),
2669            name: "Connector routine".to_string(),
2670            status: RoutineStatus::Active,
2671            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2672            timezone: "UTC".to_string(),
2673            misfire_policy: RoutineMisfirePolicy::RunOnce,
2674            entrypoint: "connector.email.reply".to_string(),
2675            args: serde_json::json!({}),
2676            allowed_tools: vec![],
2677            output_targets: vec![],
2678            creator_type: "user".to_string(),
2679            creator_id: "u-1".to_string(),
2680            requires_approval: true,
2681            external_integrations_allowed: false,
2682            next_fire_at_ms: None,
2683            last_fired_at_ms: None,
2684        };
2685
2686        let decision = evaluate_routine_execution_policy(&routine, "manual");
2687        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
2688    }
2689
2690    #[test]
2691    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
2692        let routine = RoutineSpec {
2693            routine_id: "routine-policy-2".to_string(),
2694            name: "Connector routine".to_string(),
2695            status: RoutineStatus::Active,
2696            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2697            timezone: "UTC".to_string(),
2698            misfire_policy: RoutineMisfirePolicy::RunOnce,
2699            entrypoint: "connector.email.reply".to_string(),
2700            args: serde_json::json!({}),
2701            allowed_tools: vec![],
2702            output_targets: vec![],
2703            creator_type: "user".to_string(),
2704            creator_id: "u-1".to_string(),
2705            requires_approval: true,
2706            external_integrations_allowed: true,
2707            next_fire_at_ms: None,
2708            last_fired_at_ms: None,
2709        };
2710
2711        let decision = evaluate_routine_execution_policy(&routine, "manual");
2712        assert!(matches!(
2713            decision,
2714            RoutineExecutionDecision::RequiresApproval { .. }
2715        ));
2716    }
2717
2718    #[test]
2719    fn routine_policy_allows_non_external_entrypoints() {
2720        let routine = RoutineSpec {
2721            routine_id: "routine-policy-3".to_string(),
2722            name: "Internal mission routine".to_string(),
2723            status: RoutineStatus::Active,
2724            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2725            timezone: "UTC".to_string(),
2726            misfire_policy: RoutineMisfirePolicy::RunOnce,
2727            entrypoint: "mission.default".to_string(),
2728            args: serde_json::json!({}),
2729            allowed_tools: vec![],
2730            output_targets: vec![],
2731            creator_type: "user".to_string(),
2732            creator_id: "u-1".to_string(),
2733            requires_approval: true,
2734            external_integrations_allowed: false,
2735            next_fire_at_ms: None,
2736            last_fired_at_ms: None,
2737        };
2738
2739        let decision = evaluate_routine_execution_policy(&routine, "manual");
2740        assert_eq!(decision, RoutineExecutionDecision::Allowed);
2741    }
2742
2743    #[tokio::test]
2744    async fn claim_next_queued_routine_run_marks_oldest_running() {
2745        let mut state = AppState::new_starting("routine-claim".to_string(), true);
2746        state.routine_runs_path = tmp_routines_file("routine-claim-runs");
2747
2748        let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
2749            run_id: run_id.to_string(),
2750            routine_id: "routine-claim".to_string(),
2751            trigger_type: "manual".to_string(),
2752            run_count: 1,
2753            status: RoutineRunStatus::Queued,
2754            created_at_ms,
2755            updated_at_ms: created_at_ms,
2756            fired_at_ms: Some(created_at_ms),
2757            started_at_ms: None,
2758            finished_at_ms: None,
2759            requires_approval: false,
2760            approval_reason: None,
2761            denial_reason: None,
2762            paused_reason: None,
2763            detail: None,
2764            entrypoint: "mission.default".to_string(),
2765            args: serde_json::json!({}),
2766            allowed_tools: vec![],
2767            output_targets: vec![],
2768            artifacts: vec![],
2769        };
2770
2771        {
2772            let mut guard = state.routine_runs.write().await;
2773            guard.insert("run-late".to_string(), mk("run-late", 2_000));
2774            guard.insert("run-early".to_string(), mk("run-early", 1_000));
2775        }
2776        state.persist_routine_runs().await.expect("persist");
2777
2778        let claimed = state
2779            .claim_next_queued_routine_run()
2780            .await
2781            .expect("claimed run");
2782        assert_eq!(claimed.run_id, "run-early");
2783        assert_eq!(claimed.status, RoutineRunStatus::Running);
2784        assert!(claimed.started_at_ms.is_some());
2785    }
2786
2787    #[tokio::test]
2788    async fn routine_session_policy_roundtrip_normalizes_tools() {
2789        let state = AppState::new_starting("routine-policy-hook".to_string(), true);
2790        state
2791            .set_routine_session_policy(
2792                "session-routine-1".to_string(),
2793                "run-1".to_string(),
2794                "routine-1".to_string(),
2795                vec![
2796                    "read".to_string(),
2797                    " mcp.arcade.search ".to_string(),
2798                    "read".to_string(),
2799                    "".to_string(),
2800                ],
2801            )
2802            .await;
2803
2804        let policy = state
2805            .routine_session_policy("session-routine-1")
2806            .await
2807            .expect("policy");
2808        assert_eq!(
2809            policy.allowed_tools,
2810            vec!["read".to_string(), "mcp.arcade.search".to_string()]
2811        );
2812    }
2813
2814    #[test]
2815    fn routine_mission_prompt_includes_orchestrated_contract() {
2816        let run = RoutineRunRecord {
2817            run_id: "run-orchestrated-1".to_string(),
2818            routine_id: "automation-orchestrated".to_string(),
2819            trigger_type: "manual".to_string(),
2820            run_count: 1,
2821            status: RoutineRunStatus::Queued,
2822            created_at_ms: 1_000,
2823            updated_at_ms: 1_000,
2824            fired_at_ms: Some(1_000),
2825            started_at_ms: None,
2826            finished_at_ms: None,
2827            requires_approval: true,
2828            approval_reason: None,
2829            denial_reason: None,
2830            paused_reason: None,
2831            detail: None,
2832            entrypoint: "mission.default".to_string(),
2833            args: serde_json::json!({
2834                "prompt": "Coordinate a multi-step release readiness check.",
2835                "mode": "orchestrated",
2836                "success_criteria": ["All blockers listed", "Output artifact written"],
2837                "orchestrator_only_tool_calls": true
2838            }),
2839            allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
2840            output_targets: vec!["file://reports/release-readiness.md".to_string()],
2841            artifacts: vec![],
2842        };
2843
2844        let objective = routine_objective_from_args(&run).expect("objective");
2845        let prompt = build_routine_mission_prompt(&run, &objective);
2846
2847        assert!(prompt.contains("Mode: orchestrated"));
2848        assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
2849        assert!(prompt.contains("only the orchestrator may execute tools"));
2850        assert!(prompt.contains("Allowed Tools: read, webfetch"));
2851        assert!(prompt.contains("file://reports/release-readiness.md"));
2852    }
2853
2854    #[test]
2855    fn routine_mission_prompt_includes_standalone_defaults() {
2856        let run = RoutineRunRecord {
2857            run_id: "run-standalone-1".to_string(),
2858            routine_id: "automation-standalone".to_string(),
2859            trigger_type: "manual".to_string(),
2860            run_count: 1,
2861            status: RoutineRunStatus::Queued,
2862            created_at_ms: 2_000,
2863            updated_at_ms: 2_000,
2864            fired_at_ms: Some(2_000),
2865            started_at_ms: None,
2866            finished_at_ms: None,
2867            requires_approval: false,
2868            approval_reason: None,
2869            denial_reason: None,
2870            paused_reason: None,
2871            detail: None,
2872            entrypoint: "mission.default".to_string(),
2873            args: serde_json::json!({
2874                "prompt": "Summarize top engineering updates.",
2875                "success_criteria": ["Three bullet summary"]
2876            }),
2877            allowed_tools: vec![],
2878            output_targets: vec![],
2879            artifacts: vec![],
2880        };
2881
2882        let objective = routine_objective_from_args(&run).expect("objective");
2883        let prompt = build_routine_mission_prompt(&run, &objective);
2884
2885        assert!(prompt.contains("Mode: standalone"));
2886        assert!(prompt.contains("Execution Pattern: Standalone mission run"));
2887        assert!(prompt.contains("Allowed Tools: all available by current policy"));
2888        assert!(prompt.contains("Output Targets: none configured"));
2889    }
2890}