Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
12use tandem_orchestrator::MissionState;
13use tandem_types::{
14    EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
15    SendMessageRequest, Session, ShellFamily,
16};
17use tokio::fs;
18use tokio::sync::RwLock;
19
20use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
21use tandem_core::{
22    resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
23    PermissionManager, PluginRegistry, Storage,
24};
25use tandem_providers::ProviderRegistry;
26use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
27use tandem_tools::ToolRegistry;
28
29mod agent_teams;
30mod http;
31pub mod webui;
32
33pub use agent_teams::AgentTeamRuntime;
34pub use http::serve;
35
36#[derive(Debug, Clone, Serialize, Deserialize, Default)]
37pub struct ChannelStatus {
38    pub enabled: bool,
39    pub connected: bool,
40    pub last_error: Option<String>,
41    pub active_sessions: u64,
42    pub meta: Value,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Default)]
46pub struct WebUiConfig {
47    #[serde(default)]
48    pub enabled: bool,
49    #[serde(default = "default_web_ui_prefix")]
50    pub path_prefix: String,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, Default)]
54pub struct ChannelsConfigFile {
55    pub telegram: Option<TelegramConfigFile>,
56    pub discord: Option<DiscordConfigFile>,
57    pub slack: Option<SlackConfigFile>,
58    #[serde(default)]
59    pub tool_policy: tandem_channels::config::ChannelToolPolicy,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TelegramConfigFile {
64    pub bot_token: String,
65    #[serde(default = "default_allow_all")]
66    pub allowed_users: Vec<String>,
67    #[serde(default)]
68    pub mention_only: bool,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct DiscordConfigFile {
73    pub bot_token: String,
74    #[serde(default)]
75    pub guild_id: Option<String>,
76    #[serde(default = "default_allow_all")]
77    pub allowed_users: Vec<String>,
78    #[serde(default = "default_discord_mention_only")]
79    pub mention_only: bool,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct SlackConfigFile {
84    pub bot_token: String,
85    pub channel_id: String,
86    #[serde(default = "default_allow_all")]
87    pub allowed_users: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, Default)]
91struct EffectiveAppConfig {
92    #[serde(default)]
93    pub channels: ChannelsConfigFile,
94    #[serde(default)]
95    pub web_ui: WebUiConfig,
96    #[serde(default)]
97    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
98}
99
100#[derive(Default)]
101pub struct ChannelRuntime {
102    pub listeners: Option<tokio::task::JoinSet<()>>,
103    pub statuses: std::collections::HashMap<String, ChannelStatus>,
104}
105
106#[derive(Debug, Clone)]
107pub struct EngineLease {
108    pub lease_id: String,
109    pub client_id: String,
110    pub client_type: String,
111    pub acquired_at_ms: u64,
112    pub last_renewed_at_ms: u64,
113    pub ttl_ms: u64,
114}
115
116impl EngineLease {
117    pub fn is_expired(&self, now_ms: u64) -> bool {
118        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
119    }
120}
121
122#[derive(Debug, Clone, Serialize)]
123pub struct ActiveRun {
124    #[serde(rename = "runID")]
125    pub run_id: String,
126    #[serde(rename = "startedAtMs")]
127    pub started_at_ms: u64,
128    #[serde(rename = "lastActivityAtMs")]
129    pub last_activity_at_ms: u64,
130    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
131    pub client_id: Option<String>,
132    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
133    pub agent_id: Option<String>,
134    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
135    pub agent_profile: Option<String>,
136}
137
138#[derive(Clone, Default)]
139pub struct RunRegistry {
140    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
141}
142
143impl RunRegistry {
144    pub fn new() -> Self {
145        Self::default()
146    }
147
148    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
149        self.active.read().await.get(session_id).cloned()
150    }
151
152    pub async fn acquire(
153        &self,
154        session_id: &str,
155        run_id: String,
156        client_id: Option<String>,
157        agent_id: Option<String>,
158        agent_profile: Option<String>,
159    ) -> std::result::Result<ActiveRun, ActiveRun> {
160        let mut guard = self.active.write().await;
161        if let Some(existing) = guard.get(session_id).cloned() {
162            return Err(existing);
163        }
164        let now = now_ms();
165        let run = ActiveRun {
166            run_id,
167            started_at_ms: now,
168            last_activity_at_ms: now,
169            client_id,
170            agent_id,
171            agent_profile,
172        };
173        guard.insert(session_id.to_string(), run.clone());
174        Ok(run)
175    }
176
177    pub async fn touch(&self, session_id: &str, run_id: &str) {
178        let mut guard = self.active.write().await;
179        if let Some(run) = guard.get_mut(session_id) {
180            if run.run_id == run_id {
181                run.last_activity_at_ms = now_ms();
182            }
183        }
184    }
185
186    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
187        let mut guard = self.active.write().await;
188        if let Some(run) = guard.get(session_id) {
189            if run.run_id == run_id {
190                return guard.remove(session_id);
191            }
192        }
193        None
194    }
195
196    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
197        self.active.write().await.remove(session_id)
198    }
199
200    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
201        let now = now_ms();
202        let mut guard = self.active.write().await;
203        let stale_ids = guard
204            .iter()
205            .filter_map(|(session_id, run)| {
206                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
207                    Some(session_id.clone())
208                } else {
209                    None
210                }
211            })
212            .collect::<Vec<_>>();
213        let mut out = Vec::with_capacity(stale_ids.len());
214        for session_id in stale_ids {
215            if let Some(run) = guard.remove(&session_id) {
216                out.push((session_id, run));
217            }
218        }
219        out
220    }
221}
222
223pub fn now_ms() -> u64 {
224    SystemTime::now()
225        .duration_since(UNIX_EPOCH)
226        .map(|d| d.as_millis() as u64)
227        .unwrap_or(0)
228}
229
230pub fn build_id() -> String {
231    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
232        let trimmed = explicit.trim();
233        if !trimmed.is_empty() {
234            return trimmed.to_string();
235        }
236    }
237    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
238        let trimmed = git_sha.trim();
239        if !trimmed.is_empty() {
240            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
241        }
242    }
243    env!("CARGO_PKG_VERSION").to_string()
244}
245
246pub fn detect_host_runtime_context() -> HostRuntimeContext {
247    let os = if cfg!(target_os = "windows") {
248        HostOs::Windows
249    } else if cfg!(target_os = "macos") {
250        HostOs::Macos
251    } else {
252        HostOs::Linux
253    };
254    let (shell_family, path_style) = match os {
255        HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
256        HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
257    };
258    HostRuntimeContext {
259        os,
260        arch: std::env::consts::ARCH.to_string(),
261        shell_family,
262        path_style,
263    }
264}
265
266pub fn binary_path_for_health() -> Option<String> {
267    #[cfg(debug_assertions)]
268    {
269        std::env::current_exe()
270            .ok()
271            .map(|p| p.to_string_lossy().to_string())
272    }
273    #[cfg(not(debug_assertions))]
274    {
275        None
276    }
277}
278
279#[derive(Clone)]
280pub struct RuntimeState {
281    pub storage: Arc<Storage>,
282    pub config: ConfigStore,
283    pub event_bus: EventBus,
284    pub providers: ProviderRegistry,
285    pub plugins: PluginRegistry,
286    pub agents: AgentRegistry,
287    pub tools: ToolRegistry,
288    pub permissions: PermissionManager,
289    pub mcp: McpRegistry,
290    pub pty: PtyManager,
291    pub lsp: LspManager,
292    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
293    pub logs: Arc<RwLock<Vec<Value>>>,
294    pub workspace_index: WorkspaceIndex,
295    pub cancellations: CancellationRegistry,
296    pub engine_loop: EngineLoop,
297    pub host_runtime_context: HostRuntimeContext,
298}
299
300#[derive(Debug, Clone)]
301pub struct GovernedMemoryRecord {
302    pub id: String,
303    pub run_id: String,
304    pub partition: MemoryPartition,
305    pub kind: MemoryContentKind,
306    pub content: String,
307    pub artifact_refs: Vec<String>,
308    pub classification: MemoryClassification,
309    pub metadata: Option<Value>,
310    pub source_memory_id: Option<String>,
311    pub created_at_ms: u64,
312}
313
314#[derive(Debug, Clone, Serialize)]
315pub struct MemoryAuditEvent {
316    pub audit_id: String,
317    pub action: String,
318    pub run_id: String,
319    pub memory_id: Option<String>,
320    pub source_memory_id: Option<String>,
321    pub to_tier: Option<GovernedMemoryTier>,
322    pub partition_key: String,
323    pub actor: String,
324    pub status: String,
325    #[serde(skip_serializing_if = "Option::is_none")]
326    pub detail: Option<String>,
327    pub created_at_ms: u64,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct SharedResourceRecord {
332    pub key: String,
333    pub value: Value,
334    pub rev: u64,
335    pub updated_at_ms: u64,
336    pub updated_by: String,
337    #[serde(skip_serializing_if = "Option::is_none")]
338    pub ttl_ms: Option<u64>,
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
342#[serde(rename_all = "snake_case")]
343pub enum RoutineSchedule {
344    IntervalSeconds { seconds: u64 },
345    Cron { expression: String },
346}
347
348#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
349#[serde(rename_all = "snake_case", tag = "type")]
350pub enum RoutineMisfirePolicy {
351    Skip,
352    RunOnce,
353    CatchUp { max_runs: u32 },
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
357#[serde(rename_all = "snake_case")]
358pub enum RoutineStatus {
359    Active,
360    Paused,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct RoutineSpec {
365    pub routine_id: String,
366    pub name: String,
367    pub status: RoutineStatus,
368    pub schedule: RoutineSchedule,
369    pub timezone: String,
370    pub misfire_policy: RoutineMisfirePolicy,
371    pub entrypoint: String,
372    #[serde(default)]
373    pub args: Value,
374    #[serde(default)]
375    pub allowed_tools: Vec<String>,
376    #[serde(default)]
377    pub output_targets: Vec<String>,
378    pub creator_type: String,
379    pub creator_id: String,
380    pub requires_approval: bool,
381    pub external_integrations_allowed: bool,
382    #[serde(default, skip_serializing_if = "Option::is_none")]
383    pub next_fire_at_ms: Option<u64>,
384    #[serde(default, skip_serializing_if = "Option::is_none")]
385    pub last_fired_at_ms: Option<u64>,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct RoutineHistoryEvent {
390    pub routine_id: String,
391    pub trigger_type: String,
392    pub run_count: u32,
393    pub fired_at_ms: u64,
394    pub status: String,
395    #[serde(default, skip_serializing_if = "Option::is_none")]
396    pub detail: Option<String>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineRunStatus {
402    Queued,
403    PendingApproval,
404    Running,
405    Paused,
406    BlockedPolicy,
407    Denied,
408    Completed,
409    Failed,
410    Cancelled,
411}
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
414pub struct RoutineRunArtifact {
415    pub artifact_id: String,
416    pub uri: String,
417    pub kind: String,
418    #[serde(default, skip_serializing_if = "Option::is_none")]
419    pub label: Option<String>,
420    pub created_at_ms: u64,
421    #[serde(default, skip_serializing_if = "Option::is_none")]
422    pub metadata: Option<Value>,
423}
424
425#[derive(Debug, Clone, Serialize, Deserialize)]
426pub struct RoutineRunRecord {
427    pub run_id: String,
428    pub routine_id: String,
429    pub trigger_type: String,
430    pub run_count: u32,
431    pub status: RoutineRunStatus,
432    pub created_at_ms: u64,
433    pub updated_at_ms: u64,
434    #[serde(default, skip_serializing_if = "Option::is_none")]
435    pub fired_at_ms: Option<u64>,
436    #[serde(default, skip_serializing_if = "Option::is_none")]
437    pub started_at_ms: Option<u64>,
438    #[serde(default, skip_serializing_if = "Option::is_none")]
439    pub finished_at_ms: Option<u64>,
440    pub requires_approval: bool,
441    #[serde(default, skip_serializing_if = "Option::is_none")]
442    pub approval_reason: Option<String>,
443    #[serde(default, skip_serializing_if = "Option::is_none")]
444    pub denial_reason: Option<String>,
445    #[serde(default, skip_serializing_if = "Option::is_none")]
446    pub paused_reason: Option<String>,
447    #[serde(default, skip_serializing_if = "Option::is_none")]
448    pub detail: Option<String>,
449    pub entrypoint: String,
450    #[serde(default)]
451    pub args: Value,
452    #[serde(default)]
453    pub allowed_tools: Vec<String>,
454    #[serde(default)]
455    pub output_targets: Vec<String>,
456    #[serde(default)]
457    pub artifacts: Vec<RoutineRunArtifact>,
458}
459
460#[derive(Debug, Clone)]
461pub struct RoutineSessionPolicy {
462    pub session_id: String,
463    pub run_id: String,
464    pub routine_id: String,
465    pub allowed_tools: Vec<String>,
466}
467
468#[derive(Debug, Clone, Serialize)]
469pub struct RoutineTriggerPlan {
470    pub routine_id: String,
471    pub run_count: u32,
472    pub scheduled_at_ms: u64,
473    pub next_fire_at_ms: u64,
474}
475
476#[derive(Debug, Clone, Serialize)]
477pub struct ResourceConflict {
478    pub key: String,
479    pub expected_rev: Option<u64>,
480    pub current_rev: Option<u64>,
481}
482
483#[derive(Debug, Clone, Serialize)]
484#[serde(tag = "type", rename_all = "snake_case")]
485pub enum ResourceStoreError {
486    InvalidKey { key: String },
487    RevisionConflict(ResourceConflict),
488    PersistFailed { message: String },
489}
490
491#[derive(Debug, Clone, Serialize)]
492#[serde(tag = "type", rename_all = "snake_case")]
493pub enum RoutineStoreError {
494    InvalidRoutineId { routine_id: String },
495    InvalidSchedule { detail: String },
496    PersistFailed { message: String },
497}
498
499#[derive(Debug, Clone)]
500pub enum StartupStatus {
501    Starting,
502    Ready,
503    Failed,
504}
505
506#[derive(Debug, Clone)]
507pub struct StartupState {
508    pub status: StartupStatus,
509    pub phase: String,
510    pub started_at_ms: u64,
511    pub attempt_id: String,
512    pub last_error: Option<String>,
513}
514
515#[derive(Debug, Clone)]
516pub struct StartupSnapshot {
517    pub status: StartupStatus,
518    pub phase: String,
519    pub started_at_ms: u64,
520    pub attempt_id: String,
521    pub last_error: Option<String>,
522    pub elapsed_ms: u64,
523}
524
525#[derive(Clone)]
526pub struct AppState {
527    pub runtime: Arc<OnceLock<RuntimeState>>,
528    pub startup: Arc<RwLock<StartupState>>,
529    pub in_process_mode: Arc<AtomicBool>,
530    pub api_token: Arc<RwLock<Option<String>>>,
531    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
532    pub run_registry: RunRegistry,
533    pub run_stale_ms: u64,
534    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
535    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
536    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
537    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
538    pub shared_resources_path: PathBuf,
539    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
540    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
541    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
542    pub routine_session_policies:
543        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
544    pub routines_path: PathBuf,
545    pub routine_history_path: PathBuf,
546    pub routine_runs_path: PathBuf,
547    pub agent_teams: AgentTeamRuntime,
548    pub web_ui_enabled: Arc<AtomicBool>,
549    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
550    pub server_base_url: Arc<std::sync::RwLock<String>>,
551    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
552    pub host_runtime_context: HostRuntimeContext,
553}
554
555#[derive(Debug, Clone)]
556struct StatusIndexUpdate {
557    key: String,
558    value: Value,
559}
560
561impl AppState {
562    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
563        Self {
564            runtime: Arc::new(OnceLock::new()),
565            startup: Arc::new(RwLock::new(StartupState {
566                status: StartupStatus::Starting,
567                phase: "boot".to_string(),
568                started_at_ms: now_ms(),
569                attempt_id,
570                last_error: None,
571            })),
572            in_process_mode: Arc::new(AtomicBool::new(in_process)),
573            api_token: Arc::new(RwLock::new(None)),
574            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
575            run_registry: RunRegistry::new(),
576            run_stale_ms: resolve_run_stale_ms(),
577            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
578            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
579            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
580            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
581            shared_resources_path: resolve_shared_resources_path(),
582            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
583            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
584            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
585            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
586            routines_path: resolve_routines_path(),
587            routine_history_path: resolve_routine_history_path(),
588            routine_runs_path: resolve_routine_runs_path(),
589            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
590            web_ui_enabled: Arc::new(AtomicBool::new(false)),
591            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
592            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
593            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
594            host_runtime_context: detect_host_runtime_context(),
595        }
596    }
597
598    pub fn is_ready(&self) -> bool {
599        self.runtime.get().is_some()
600    }
601
602    pub fn mode_label(&self) -> &'static str {
603        if self.in_process_mode.load(Ordering::Relaxed) {
604            "in-process"
605        } else {
606            "sidecar"
607        }
608    }
609
610    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
611        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
612        if let Ok(mut guard) = self.web_ui_prefix.write() {
613            *guard = normalize_web_ui_prefix(&prefix);
614        }
615    }
616
617    pub fn web_ui_enabled(&self) -> bool {
618        self.web_ui_enabled.load(Ordering::Relaxed)
619    }
620
621    pub fn web_ui_prefix(&self) -> String {
622        self.web_ui_prefix
623            .read()
624            .map(|v| v.clone())
625            .unwrap_or_else(|_| "/admin".to_string())
626    }
627
628    pub fn set_server_base_url(&self, base_url: String) {
629        if let Ok(mut guard) = self.server_base_url.write() {
630            *guard = base_url;
631        }
632    }
633
634    pub fn server_base_url(&self) -> String {
635        self.server_base_url
636            .read()
637            .map(|v| v.clone())
638            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
639    }
640
641    pub async fn api_token(&self) -> Option<String> {
642        self.api_token.read().await.clone()
643    }
644
645    pub async fn set_api_token(&self, token: Option<String>) {
646        *self.api_token.write().await = token;
647    }
648
649    pub async fn startup_snapshot(&self) -> StartupSnapshot {
650        let state = self.startup.read().await.clone();
651        StartupSnapshot {
652            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
653            status: state.status,
654            phase: state.phase,
655            started_at_ms: state.started_at_ms,
656            attempt_id: state.attempt_id,
657            last_error: state.last_error,
658        }
659    }
660
661    pub fn host_runtime_context(&self) -> HostRuntimeContext {
662        self.runtime
663            .get()
664            .map(|runtime| runtime.host_runtime_context.clone())
665            .unwrap_or_else(|| self.host_runtime_context.clone())
666    }
667
668    pub async fn set_phase(&self, phase: impl Into<String>) {
669        let mut startup = self.startup.write().await;
670        startup.phase = phase.into();
671    }
672
673    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
674        self.runtime
675            .set(runtime)
676            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
677        self.engine_loop
678            .set_spawn_agent_hook(std::sync::Arc::new(
679                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
680            ))
681            .await;
682        self.engine_loop
683            .set_tool_policy_hook(std::sync::Arc::new(
684                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
685            ))
686            .await;
687        let _ = self.load_shared_resources().await;
688        let _ = self.load_routines().await;
689        let _ = self.load_routine_history().await;
690        let _ = self.load_routine_runs().await;
691        let workspace_root = self.workspace_index.snapshot().await.root;
692        let _ = self
693            .agent_teams
694            .ensure_loaded_for_workspace(&workspace_root)
695            .await;
696        let mut startup = self.startup.write().await;
697        startup.status = StartupStatus::Ready;
698        startup.phase = "ready".to_string();
699        startup.last_error = None;
700        Ok(())
701    }
702
703    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
704        let mut startup = self.startup.write().await;
705        startup.status = StartupStatus::Failed;
706        startup.phase = phase.into();
707        startup.last_error = Some(error.into());
708    }
709
710    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
711        let runtime = self.channels_runtime.lock().await;
712        runtime.statuses.clone()
713    }
714
715    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
716        let effective = self.config.get_effective_value().await;
717        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
718        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
719
720        let mut runtime = self.channels_runtime.lock().await;
721        if let Some(listeners) = runtime.listeners.as_mut() {
722            listeners.abort_all();
723        }
724        runtime.listeners = None;
725        runtime.statuses.clear();
726
727        let mut status_map = std::collections::HashMap::new();
728        status_map.insert(
729            "telegram".to_string(),
730            ChannelStatus {
731                enabled: parsed.channels.telegram.is_some(),
732                connected: false,
733                last_error: None,
734                active_sessions: 0,
735                meta: serde_json::json!({}),
736            },
737        );
738        status_map.insert(
739            "discord".to_string(),
740            ChannelStatus {
741                enabled: parsed.channels.discord.is_some(),
742                connected: false,
743                last_error: None,
744                active_sessions: 0,
745                meta: serde_json::json!({}),
746            },
747        );
748        status_map.insert(
749            "slack".to_string(),
750            ChannelStatus {
751                enabled: parsed.channels.slack.is_some(),
752                connected: false,
753                last_error: None,
754                active_sessions: 0,
755                meta: serde_json::json!({}),
756            },
757        );
758
759        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
760            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
761            runtime.listeners = Some(listeners);
762            for status in status_map.values_mut() {
763                if status.enabled {
764                    status.connected = true;
765                }
766            }
767        }
768
769        runtime.statuses = status_map.clone();
770        drop(runtime);
771
772        self.event_bus.publish(EngineEvent::new(
773            "channel.status.changed",
774            serde_json::json!({ "channels": status_map }),
775        ));
776        Ok(())
777    }
778
779    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
780        if !self.shared_resources_path.exists() {
781            return Ok(());
782        }
783        let raw = fs::read_to_string(&self.shared_resources_path).await?;
784        let parsed =
785            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
786                .unwrap_or_default();
787        let mut guard = self.shared_resources.write().await;
788        *guard = parsed;
789        Ok(())
790    }
791
792    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
793        if let Some(parent) = self.shared_resources_path.parent() {
794            fs::create_dir_all(parent).await?;
795        }
796        let payload = {
797            let guard = self.shared_resources.read().await;
798            serde_json::to_string_pretty(&*guard)?
799        };
800        fs::write(&self.shared_resources_path, payload).await?;
801        Ok(())
802    }
803
804    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
805        self.shared_resources.read().await.get(key).cloned()
806    }
807
808    pub async fn list_shared_resources(
809        &self,
810        prefix: Option<&str>,
811        limit: usize,
812    ) -> Vec<SharedResourceRecord> {
813        let limit = limit.clamp(1, 500);
814        let mut rows = self
815            .shared_resources
816            .read()
817            .await
818            .values()
819            .filter(|record| {
820                if let Some(prefix) = prefix {
821                    record.key.starts_with(prefix)
822                } else {
823                    true
824                }
825            })
826            .cloned()
827            .collect::<Vec<_>>();
828        rows.sort_by(|a, b| a.key.cmp(&b.key));
829        rows.truncate(limit);
830        rows
831    }
832
833    pub async fn put_shared_resource(
834        &self,
835        key: String,
836        value: Value,
837        if_match_rev: Option<u64>,
838        updated_by: String,
839        ttl_ms: Option<u64>,
840    ) -> Result<SharedResourceRecord, ResourceStoreError> {
841        if !is_valid_resource_key(&key) {
842            return Err(ResourceStoreError::InvalidKey { key });
843        }
844
845        let now = now_ms();
846        let mut guard = self.shared_resources.write().await;
847        let existing = guard.get(&key).cloned();
848
849        if let Some(expected) = if_match_rev {
850            let current = existing.as_ref().map(|row| row.rev);
851            if current != Some(expected) {
852                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
853                    key,
854                    expected_rev: Some(expected),
855                    current_rev: current,
856                }));
857            }
858        }
859
860        let next_rev = existing
861            .as_ref()
862            .map(|row| row.rev.saturating_add(1))
863            .unwrap_or(1);
864
865        let record = SharedResourceRecord {
866            key: key.clone(),
867            value,
868            rev: next_rev,
869            updated_at_ms: now,
870            updated_by,
871            ttl_ms,
872        };
873
874        let previous = guard.insert(key.clone(), record.clone());
875        drop(guard);
876
877        if let Err(error) = self.persist_shared_resources().await {
878            let mut rollback = self.shared_resources.write().await;
879            if let Some(previous) = previous {
880                rollback.insert(key, previous);
881            } else {
882                rollback.remove(&key);
883            }
884            return Err(ResourceStoreError::PersistFailed {
885                message: error.to_string(),
886            });
887        }
888
889        Ok(record)
890    }
891
892    pub async fn delete_shared_resource(
893        &self,
894        key: &str,
895        if_match_rev: Option<u64>,
896    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
897        if !is_valid_resource_key(key) {
898            return Err(ResourceStoreError::InvalidKey {
899                key: key.to_string(),
900            });
901        }
902
903        let mut guard = self.shared_resources.write().await;
904        let current = guard.get(key).cloned();
905        if let Some(expected) = if_match_rev {
906            let current_rev = current.as_ref().map(|row| row.rev);
907            if current_rev != Some(expected) {
908                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
909                    key: key.to_string(),
910                    expected_rev: Some(expected),
911                    current_rev,
912                }));
913            }
914        }
915
916        let removed = guard.remove(key);
917        drop(guard);
918
919        if let Err(error) = self.persist_shared_resources().await {
920            if let Some(record) = removed.clone() {
921                self.shared_resources
922                    .write()
923                    .await
924                    .insert(record.key.clone(), record);
925            }
926            return Err(ResourceStoreError::PersistFailed {
927                message: error.to_string(),
928            });
929        }
930
931        Ok(removed)
932    }
933
934    pub async fn load_routines(&self) -> anyhow::Result<()> {
935        if !self.routines_path.exists() {
936            return Ok(());
937        }
938        let raw = fs::read_to_string(&self.routines_path).await?;
939        let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
940            .unwrap_or_default();
941        let mut guard = self.routines.write().await;
942        *guard = parsed;
943        Ok(())
944    }
945
946    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
947        if !self.routine_history_path.exists() {
948            return Ok(());
949        }
950        let raw = fs::read_to_string(&self.routine_history_path).await?;
951        let parsed = serde_json::from_str::<
952            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
953        >(&raw)
954        .unwrap_or_default();
955        let mut guard = self.routine_history.write().await;
956        *guard = parsed;
957        Ok(())
958    }
959
960    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
961        if !self.routine_runs_path.exists() {
962            return Ok(());
963        }
964        let raw = fs::read_to_string(&self.routine_runs_path).await?;
965        let parsed =
966            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
967                .unwrap_or_default();
968        let mut guard = self.routine_runs.write().await;
969        *guard = parsed;
970        Ok(())
971    }
972
973    pub async fn persist_routines(&self) -> anyhow::Result<()> {
974        if let Some(parent) = self.routines_path.parent() {
975            fs::create_dir_all(parent).await?;
976        }
977        let payload = {
978            let guard = self.routines.read().await;
979            serde_json::to_string_pretty(&*guard)?
980        };
981        fs::write(&self.routines_path, payload).await?;
982        Ok(())
983    }
984
985    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
986        if let Some(parent) = self.routine_history_path.parent() {
987            fs::create_dir_all(parent).await?;
988        }
989        let payload = {
990            let guard = self.routine_history.read().await;
991            serde_json::to_string_pretty(&*guard)?
992        };
993        fs::write(&self.routine_history_path, payload).await?;
994        Ok(())
995    }
996
997    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
998        if let Some(parent) = self.routine_runs_path.parent() {
999            fs::create_dir_all(parent).await?;
1000        }
1001        let payload = {
1002            let guard = self.routine_runs.read().await;
1003            serde_json::to_string_pretty(&*guard)?
1004        };
1005        fs::write(&self.routine_runs_path, payload).await?;
1006        Ok(())
1007    }
1008
1009    pub async fn put_routine(
1010        &self,
1011        mut routine: RoutineSpec,
1012    ) -> Result<RoutineSpec, RoutineStoreError> {
1013        if routine.routine_id.trim().is_empty() {
1014            return Err(RoutineStoreError::InvalidRoutineId {
1015                routine_id: routine.routine_id,
1016            });
1017        }
1018
1019        routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1020        routine.output_targets = normalize_non_empty_list(routine.output_targets);
1021
1022        let interval = match routine.schedule {
1023            RoutineSchedule::IntervalSeconds { seconds } => {
1024                if seconds == 0 {
1025                    return Err(RoutineStoreError::InvalidSchedule {
1026                        detail: "interval_seconds must be > 0".to_string(),
1027                    });
1028                }
1029                Some(seconds)
1030            }
1031            RoutineSchedule::Cron { .. } => None,
1032        };
1033        if routine.next_fire_at_ms.is_none() {
1034            routine.next_fire_at_ms = Some(now_ms().saturating_add(interval.unwrap_or(60) * 1000));
1035        }
1036
1037        let mut guard = self.routines.write().await;
1038        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1039        drop(guard);
1040
1041        if let Err(error) = self.persist_routines().await {
1042            let mut rollback = self.routines.write().await;
1043            if let Some(previous) = previous {
1044                rollback.insert(previous.routine_id.clone(), previous);
1045            } else {
1046                rollback.remove(&routine.routine_id);
1047            }
1048            return Err(RoutineStoreError::PersistFailed {
1049                message: error.to_string(),
1050            });
1051        }
1052
1053        Ok(routine)
1054    }
1055
1056    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1057        let mut rows = self
1058            .routines
1059            .read()
1060            .await
1061            .values()
1062            .cloned()
1063            .collect::<Vec<_>>();
1064        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1065        rows
1066    }
1067
1068    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1069        self.routines.read().await.get(routine_id).cloned()
1070    }
1071
1072    pub async fn delete_routine(
1073        &self,
1074        routine_id: &str,
1075    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1076        let mut guard = self.routines.write().await;
1077        let removed = guard.remove(routine_id);
1078        drop(guard);
1079
1080        if let Err(error) = self.persist_routines().await {
1081            if let Some(removed) = removed.clone() {
1082                self.routines
1083                    .write()
1084                    .await
1085                    .insert(removed.routine_id.clone(), removed);
1086            }
1087            return Err(RoutineStoreError::PersistFailed {
1088                message: error.to_string(),
1089            });
1090        }
1091        Ok(removed)
1092    }
1093
1094    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1095        let mut plans = Vec::new();
1096        let mut guard = self.routines.write().await;
1097        for routine in guard.values_mut() {
1098            if routine.status != RoutineStatus::Active {
1099                continue;
1100            }
1101            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1102                continue;
1103            };
1104            let Some(interval_ms) = routine_interval_ms(&routine.schedule) else {
1105                continue;
1106            };
1107            if now_ms < next_fire_at_ms {
1108                continue;
1109            }
1110            let (run_count, next_fire_at_ms) = compute_misfire_plan(
1111                now_ms,
1112                next_fire_at_ms,
1113                interval_ms,
1114                &routine.misfire_policy,
1115            );
1116            routine.next_fire_at_ms = Some(next_fire_at_ms);
1117            if run_count == 0 {
1118                continue;
1119            }
1120            plans.push(RoutineTriggerPlan {
1121                routine_id: routine.routine_id.clone(),
1122                run_count,
1123                scheduled_at_ms: now_ms,
1124                next_fire_at_ms,
1125            });
1126        }
1127        drop(guard);
1128        let _ = self.persist_routines().await;
1129        plans
1130    }
1131
1132    pub async fn mark_routine_fired(
1133        &self,
1134        routine_id: &str,
1135        fired_at_ms: u64,
1136    ) -> Option<RoutineSpec> {
1137        let mut guard = self.routines.write().await;
1138        let routine = guard.get_mut(routine_id)?;
1139        routine.last_fired_at_ms = Some(fired_at_ms);
1140        let updated = routine.clone();
1141        drop(guard);
1142        let _ = self.persist_routines().await;
1143        Some(updated)
1144    }
1145
1146    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1147        let mut history = self.routine_history.write().await;
1148        history
1149            .entry(event.routine_id.clone())
1150            .or_default()
1151            .push(event);
1152        drop(history);
1153        let _ = self.persist_routine_history().await;
1154    }
1155
1156    pub async fn list_routine_history(
1157        &self,
1158        routine_id: &str,
1159        limit: usize,
1160    ) -> Vec<RoutineHistoryEvent> {
1161        let limit = limit.clamp(1, 500);
1162        let mut rows = self
1163            .routine_history
1164            .read()
1165            .await
1166            .get(routine_id)
1167            .cloned()
1168            .unwrap_or_default();
1169        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1170        rows.truncate(limit);
1171        rows
1172    }
1173
1174    pub async fn create_routine_run(
1175        &self,
1176        routine: &RoutineSpec,
1177        trigger_type: &str,
1178        run_count: u32,
1179        status: RoutineRunStatus,
1180        detail: Option<String>,
1181    ) -> RoutineRunRecord {
1182        let now = now_ms();
1183        let record = RoutineRunRecord {
1184            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1185            routine_id: routine.routine_id.clone(),
1186            trigger_type: trigger_type.to_string(),
1187            run_count,
1188            status,
1189            created_at_ms: now,
1190            updated_at_ms: now,
1191            fired_at_ms: Some(now),
1192            started_at_ms: None,
1193            finished_at_ms: None,
1194            requires_approval: routine.requires_approval,
1195            approval_reason: None,
1196            denial_reason: None,
1197            paused_reason: None,
1198            detail,
1199            entrypoint: routine.entrypoint.clone(),
1200            args: routine.args.clone(),
1201            allowed_tools: routine.allowed_tools.clone(),
1202            output_targets: routine.output_targets.clone(),
1203            artifacts: Vec::new(),
1204        };
1205        self.routine_runs
1206            .write()
1207            .await
1208            .insert(record.run_id.clone(), record.clone());
1209        let _ = self.persist_routine_runs().await;
1210        record
1211    }
1212
1213    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1214        self.routine_runs.read().await.get(run_id).cloned()
1215    }
1216
1217    pub async fn list_routine_runs(
1218        &self,
1219        routine_id: Option<&str>,
1220        limit: usize,
1221    ) -> Vec<RoutineRunRecord> {
1222        let mut rows = self
1223            .routine_runs
1224            .read()
1225            .await
1226            .values()
1227            .filter(|row| {
1228                if let Some(id) = routine_id {
1229                    row.routine_id == id
1230                } else {
1231                    true
1232                }
1233            })
1234            .cloned()
1235            .collect::<Vec<_>>();
1236        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1237        rows.truncate(limit.clamp(1, 500));
1238        rows
1239    }
1240
1241    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1242        let mut guard = self.routine_runs.write().await;
1243        let next_run_id = guard
1244            .values()
1245            .filter(|row| row.status == RoutineRunStatus::Queued)
1246            .min_by(|a, b| {
1247                a.created_at_ms
1248                    .cmp(&b.created_at_ms)
1249                    .then_with(|| a.run_id.cmp(&b.run_id))
1250            })
1251            .map(|row| row.run_id.clone())?;
1252        let now = now_ms();
1253        let row = guard.get_mut(&next_run_id)?;
1254        row.status = RoutineRunStatus::Running;
1255        row.updated_at_ms = now;
1256        row.started_at_ms = Some(now);
1257        let claimed = row.clone();
1258        drop(guard);
1259        let _ = self.persist_routine_runs().await;
1260        Some(claimed)
1261    }
1262
1263    pub async fn set_routine_session_policy(
1264        &self,
1265        session_id: String,
1266        run_id: String,
1267        routine_id: String,
1268        allowed_tools: Vec<String>,
1269    ) {
1270        let policy = RoutineSessionPolicy {
1271            session_id: session_id.clone(),
1272            run_id,
1273            routine_id,
1274            allowed_tools: normalize_allowed_tools(allowed_tools),
1275        };
1276        self.routine_session_policies
1277            .write()
1278            .await
1279            .insert(session_id, policy);
1280    }
1281
1282    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1283        self.routine_session_policies
1284            .read()
1285            .await
1286            .get(session_id)
1287            .cloned()
1288    }
1289
1290    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1291        self.routine_session_policies
1292            .write()
1293            .await
1294            .remove(session_id);
1295    }
1296
1297    pub async fn update_routine_run_status(
1298        &self,
1299        run_id: &str,
1300        status: RoutineRunStatus,
1301        reason: Option<String>,
1302    ) -> Option<RoutineRunRecord> {
1303        let mut guard = self.routine_runs.write().await;
1304        let row = guard.get_mut(run_id)?;
1305        row.status = status.clone();
1306        row.updated_at_ms = now_ms();
1307        match status {
1308            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1309            RoutineRunStatus::Running => {
1310                row.started_at_ms.get_or_insert_with(now_ms);
1311                if let Some(detail) = reason {
1312                    row.detail = Some(detail);
1313                }
1314            }
1315            RoutineRunStatus::Denied => row.denial_reason = reason,
1316            RoutineRunStatus::Paused => row.paused_reason = reason,
1317            RoutineRunStatus::Completed
1318            | RoutineRunStatus::Failed
1319            | RoutineRunStatus::Cancelled => {
1320                row.finished_at_ms = Some(now_ms());
1321                if let Some(detail) = reason {
1322                    row.detail = Some(detail);
1323                }
1324            }
1325            _ => {
1326                if let Some(detail) = reason {
1327                    row.detail = Some(detail);
1328                }
1329            }
1330        }
1331        let updated = row.clone();
1332        drop(guard);
1333        let _ = self.persist_routine_runs().await;
1334        Some(updated)
1335    }
1336
1337    pub async fn append_routine_run_artifact(
1338        &self,
1339        run_id: &str,
1340        artifact: RoutineRunArtifact,
1341    ) -> Option<RoutineRunRecord> {
1342        let mut guard = self.routine_runs.write().await;
1343        let row = guard.get_mut(run_id)?;
1344        row.updated_at_ms = now_ms();
1345        row.artifacts.push(artifact);
1346        let updated = row.clone();
1347        drop(guard);
1348        let _ = self.persist_routine_runs().await;
1349        Some(updated)
1350    }
1351}
1352
1353async fn build_channels_config(
1354    state: &AppState,
1355    channels: &ChannelsConfigFile,
1356) -> Option<ChannelsConfig> {
1357    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1358        return None;
1359    }
1360    Some(ChannelsConfig {
1361        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1362            bot_token: cfg.bot_token,
1363            allowed_users: cfg.allowed_users,
1364            mention_only: cfg.mention_only,
1365        }),
1366        discord: channels.discord.clone().map(|cfg| DiscordConfig {
1367            bot_token: cfg.bot_token,
1368            guild_id: cfg.guild_id,
1369            allowed_users: cfg.allowed_users,
1370            mention_only: cfg.mention_only,
1371        }),
1372        slack: channels.slack.clone().map(|cfg| SlackConfig {
1373            bot_token: cfg.bot_token,
1374            channel_id: cfg.channel_id,
1375            allowed_users: cfg.allowed_users,
1376        }),
1377        server_base_url: state.server_base_url(),
1378        api_token: state.api_token().await.unwrap_or_default(),
1379        tool_policy: channels.tool_policy.clone(),
1380    })
1381}
1382
1383fn normalize_web_ui_prefix(prefix: &str) -> String {
1384    let trimmed = prefix.trim();
1385    if trimmed.is_empty() || trimmed == "/" {
1386        return "/admin".to_string();
1387    }
1388    let with_leading = if trimmed.starts_with('/') {
1389        trimmed.to_string()
1390    } else {
1391        format!("/{trimmed}")
1392    };
1393    with_leading.trim_end_matches('/').to_string()
1394}
1395
1396fn default_web_ui_prefix() -> String {
1397    "/admin".to_string()
1398}
1399
1400fn default_allow_all() -> Vec<String> {
1401    vec!["*".to_string()]
1402}
1403
1404fn default_discord_mention_only() -> bool {
1405    true
1406}
1407
1408fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
1409    normalize_non_empty_list(raw)
1410}
1411
1412fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1413    let mut out = Vec::new();
1414    let mut seen = std::collections::HashSet::new();
1415    for item in raw {
1416        let normalized = item.trim().to_string();
1417        if normalized.is_empty() {
1418            continue;
1419        }
1420        if seen.insert(normalized.clone()) {
1421            out.push(normalized);
1422        }
1423    }
1424    out
1425}
1426
1427fn resolve_run_stale_ms() -> u64 {
1428    std::env::var("TANDEM_RUN_STALE_MS")
1429        .ok()
1430        .and_then(|v| v.trim().parse::<u64>().ok())
1431        .unwrap_or(120_000)
1432        .clamp(30_000, 600_000)
1433}
1434
1435fn resolve_shared_resources_path() -> PathBuf {
1436    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1437        let trimmed = dir.trim();
1438        if !trimmed.is_empty() {
1439            return PathBuf::from(trimmed).join("shared_resources.json");
1440        }
1441    }
1442    default_state_dir().join("shared_resources.json")
1443}
1444
1445fn resolve_routines_path() -> PathBuf {
1446    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1447        let trimmed = dir.trim();
1448        if !trimmed.is_empty() {
1449            return PathBuf::from(trimmed).join("routines.json");
1450        }
1451    }
1452    default_state_dir().join("routines.json")
1453}
1454
1455fn resolve_routine_history_path() -> PathBuf {
1456    if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
1457        let trimmed = root.trim();
1458        if !trimmed.is_empty() {
1459            return PathBuf::from(trimmed).join("routine_history.json");
1460        }
1461    }
1462    default_state_dir().join("routine_history.json")
1463}
1464
1465fn resolve_routine_runs_path() -> PathBuf {
1466    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
1467        let trimmed = root.trim();
1468        if !trimmed.is_empty() {
1469            return PathBuf::from(trimmed).join("routine_runs.json");
1470        }
1471    }
1472    default_state_dir().join("routine_runs.json")
1473}
1474
1475fn resolve_agent_team_audit_path() -> PathBuf {
1476    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
1477        let trimmed = base.trim();
1478        if !trimmed.is_empty() {
1479            return PathBuf::from(trimmed)
1480                .join("agent-team")
1481                .join("audit.log.jsonl");
1482        }
1483    }
1484    default_state_dir()
1485        .join("agent-team")
1486        .join("audit.log.jsonl")
1487}
1488
1489fn default_state_dir() -> PathBuf {
1490    if let Ok(paths) = resolve_shared_paths() {
1491        return paths.engine_state_dir;
1492    }
1493    if let Some(data_dir) = dirs::data_dir() {
1494        return data_dir.join("tandem").join("data");
1495    }
1496    dirs::home_dir()
1497        .map(|home| home.join(".tandem").join("data"))
1498        .unwrap_or_else(|| PathBuf::from(".tandem"))
1499}
1500
1501fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
1502    match schedule {
1503        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
1504        RoutineSchedule::Cron { .. } => None,
1505    }
1506}
1507
1508fn compute_misfire_plan(
1509    now_ms: u64,
1510    next_fire_at_ms: u64,
1511    interval_ms: u64,
1512    policy: &RoutineMisfirePolicy,
1513) -> (u32, u64) {
1514    if now_ms < next_fire_at_ms || interval_ms == 0 {
1515        return (0, next_fire_at_ms);
1516    }
1517    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
1518    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
1519    match policy {
1520        RoutineMisfirePolicy::Skip => (0, aligned_next),
1521        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
1522        RoutineMisfirePolicy::CatchUp { max_runs } => {
1523            let count = missed.min(u64::from(*max_runs)) as u32;
1524            (count, aligned_next)
1525        }
1526    }
1527}
1528
1529#[derive(Debug, Clone, PartialEq, Eq)]
1530pub enum RoutineExecutionDecision {
1531    Allowed,
1532    RequiresApproval { reason: String },
1533    Blocked { reason: String },
1534}
1535
1536pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
1537    let entrypoint = routine.entrypoint.to_ascii_lowercase();
1538    if entrypoint.starts_with("connector.")
1539        || entrypoint.starts_with("integration.")
1540        || entrypoint.contains("external")
1541    {
1542        return true;
1543    }
1544    routine
1545        .args
1546        .get("uses_external_integrations")
1547        .and_then(|v| v.as_bool())
1548        .unwrap_or(false)
1549        || routine
1550            .args
1551            .get("connector_id")
1552            .and_then(|v| v.as_str())
1553            .is_some()
1554}
1555
1556pub fn evaluate_routine_execution_policy(
1557    routine: &RoutineSpec,
1558    trigger_type: &str,
1559) -> RoutineExecutionDecision {
1560    if !routine_uses_external_integrations(routine) {
1561        return RoutineExecutionDecision::Allowed;
1562    }
1563    if !routine.external_integrations_allowed {
1564        return RoutineExecutionDecision::Blocked {
1565            reason: "external integrations are disabled by policy".to_string(),
1566        };
1567    }
1568    if routine.requires_approval {
1569        return RoutineExecutionDecision::RequiresApproval {
1570            reason: format!(
1571                "manual approval required before external side effects ({})",
1572                trigger_type
1573            ),
1574        };
1575    }
1576    RoutineExecutionDecision::Allowed
1577}
1578
1579fn is_valid_resource_key(key: &str) -> bool {
1580    let trimmed = key.trim();
1581    if trimmed.is_empty() {
1582        return false;
1583    }
1584    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
1585    if !allowed_prefix
1586        .iter()
1587        .any(|prefix| trimmed.starts_with(prefix))
1588    {
1589        return false;
1590    }
1591    !trimmed.contains("//")
1592}
1593
1594impl Deref for AppState {
1595    type Target = RuntimeState;
1596
1597    fn deref(&self) -> &Self::Target {
1598        self.runtime
1599            .get()
1600            .expect("runtime accessed before startup completion")
1601    }
1602}
1603
1604fn extract_event_session_id(properties: &Value) -> Option<String> {
1605    properties
1606        .get("sessionID")
1607        .or_else(|| properties.get("sessionId"))
1608        .or_else(|| properties.get("id"))
1609        .and_then(|v| v.as_str())
1610        .map(|s| s.to_string())
1611}
1612
1613fn extract_event_run_id(properties: &Value) -> Option<String> {
1614    properties
1615        .get("runID")
1616        .or_else(|| properties.get("run_id"))
1617        .and_then(|v| v.as_str())
1618        .map(|s| s.to_string())
1619}
1620
1621fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1622    let session_id = extract_event_session_id(&event.properties)?;
1623    let run_id = extract_event_run_id(&event.properties);
1624    let key = format!("run/{session_id}/status");
1625
1626    let mut base = serde_json::Map::new();
1627    base.insert("sessionID".to_string(), Value::String(session_id));
1628    if let Some(run_id) = run_id {
1629        base.insert("runID".to_string(), Value::String(run_id));
1630    }
1631
1632    match event.event_type.as_str() {
1633        "session.run.started" => {
1634            base.insert("state".to_string(), Value::String("running".to_string()));
1635            base.insert("phase".to_string(), Value::String("run".to_string()));
1636            base.insert(
1637                "eventType".to_string(),
1638                Value::String("session.run.started".to_string()),
1639            );
1640            Some(StatusIndexUpdate {
1641                key,
1642                value: Value::Object(base),
1643            })
1644        }
1645        "session.run.finished" => {
1646            base.insert("state".to_string(), Value::String("finished".to_string()));
1647            base.insert("phase".to_string(), Value::String("run".to_string()));
1648            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1649                base.insert("result".to_string(), Value::String(status.to_string()));
1650            }
1651            base.insert(
1652                "eventType".to_string(),
1653                Value::String("session.run.finished".to_string()),
1654            );
1655            Some(StatusIndexUpdate {
1656                key,
1657                value: Value::Object(base),
1658            })
1659        }
1660        "message.part.updated" => {
1661            let part_type = event
1662                .properties
1663                .get("part")
1664                .and_then(|v| v.get("type"))
1665                .and_then(|v| v.as_str())?;
1666            let (phase, tool_active) = match part_type {
1667                "tool-invocation" => ("tool", true),
1668                "tool-result" => ("run", false),
1669                _ => return None,
1670            };
1671            base.insert("state".to_string(), Value::String("running".to_string()));
1672            base.insert("phase".to_string(), Value::String(phase.to_string()));
1673            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1674            if let Some(tool_name) = event
1675                .properties
1676                .get("part")
1677                .and_then(|v| v.get("tool"))
1678                .and_then(|v| v.as_str())
1679            {
1680                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1681            }
1682            base.insert(
1683                "eventType".to_string(),
1684                Value::String("message.part.updated".to_string()),
1685            );
1686            Some(StatusIndexUpdate {
1687                key,
1688                value: Value::Object(base),
1689            })
1690        }
1691        _ => None,
1692    }
1693}
1694
1695pub async fn run_status_indexer(state: AppState) {
1696    let mut rx = state.event_bus.subscribe();
1697    loop {
1698        match rx.recv().await {
1699            Ok(event) => {
1700                if let Some(update) = derive_status_index_update(&event) {
1701                    if let Err(error) = state
1702                        .put_shared_resource(
1703                            update.key,
1704                            update.value,
1705                            None,
1706                            "system.status_indexer".to_string(),
1707                            None,
1708                        )
1709                        .await
1710                    {
1711                        tracing::warn!("status indexer failed to persist update: {error:?}");
1712                    }
1713                }
1714            }
1715            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1716            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1717        }
1718    }
1719}
1720
1721pub async fn run_agent_team_supervisor(state: AppState) {
1722    let mut rx = state.event_bus.subscribe();
1723    loop {
1724        match rx.recv().await {
1725            Ok(event) => {
1726                state.agent_teams.handle_engine_event(&state, &event).await;
1727            }
1728            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1729            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1730        }
1731    }
1732}
1733
1734pub async fn run_routine_scheduler(state: AppState) {
1735    loop {
1736        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1737        let now = now_ms();
1738        let plans = state.evaluate_routine_misfires(now).await;
1739        for plan in plans {
1740            let Some(routine) = state.get_routine(&plan.routine_id).await else {
1741                continue;
1742            };
1743            match evaluate_routine_execution_policy(&routine, "scheduled") {
1744                RoutineExecutionDecision::Allowed => {
1745                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
1746                    let run = state
1747                        .create_routine_run(
1748                            &routine,
1749                            "scheduled",
1750                            plan.run_count,
1751                            RoutineRunStatus::Queued,
1752                            None,
1753                        )
1754                        .await;
1755                    state
1756                        .append_routine_history(RoutineHistoryEvent {
1757                            routine_id: plan.routine_id.clone(),
1758                            trigger_type: "scheduled".to_string(),
1759                            run_count: plan.run_count,
1760                            fired_at_ms: now,
1761                            status: "queued".to_string(),
1762                            detail: None,
1763                        })
1764                        .await;
1765                    state.event_bus.publish(EngineEvent::new(
1766                        "routine.fired",
1767                        serde_json::json!({
1768                            "routineID": plan.routine_id,
1769                            "runID": run.run_id,
1770                            "runCount": plan.run_count,
1771                            "scheduledAtMs": plan.scheduled_at_ms,
1772                            "nextFireAtMs": plan.next_fire_at_ms,
1773                        }),
1774                    ));
1775                    state.event_bus.publish(EngineEvent::new(
1776                        "routine.run.created",
1777                        serde_json::json!({
1778                            "run": run,
1779                        }),
1780                    ));
1781                }
1782                RoutineExecutionDecision::RequiresApproval { reason } => {
1783                    let run = state
1784                        .create_routine_run(
1785                            &routine,
1786                            "scheduled",
1787                            plan.run_count,
1788                            RoutineRunStatus::PendingApproval,
1789                            Some(reason.clone()),
1790                        )
1791                        .await;
1792                    state
1793                        .append_routine_history(RoutineHistoryEvent {
1794                            routine_id: plan.routine_id.clone(),
1795                            trigger_type: "scheduled".to_string(),
1796                            run_count: plan.run_count,
1797                            fired_at_ms: now,
1798                            status: "pending_approval".to_string(),
1799                            detail: Some(reason.clone()),
1800                        })
1801                        .await;
1802                    state.event_bus.publish(EngineEvent::new(
1803                        "routine.approval_required",
1804                        serde_json::json!({
1805                            "routineID": plan.routine_id,
1806                            "runID": run.run_id,
1807                            "runCount": plan.run_count,
1808                            "triggerType": "scheduled",
1809                            "reason": reason,
1810                        }),
1811                    ));
1812                    state.event_bus.publish(EngineEvent::new(
1813                        "routine.run.created",
1814                        serde_json::json!({
1815                            "run": run,
1816                        }),
1817                    ));
1818                }
1819                RoutineExecutionDecision::Blocked { reason } => {
1820                    let run = state
1821                        .create_routine_run(
1822                            &routine,
1823                            "scheduled",
1824                            plan.run_count,
1825                            RoutineRunStatus::BlockedPolicy,
1826                            Some(reason.clone()),
1827                        )
1828                        .await;
1829                    state
1830                        .append_routine_history(RoutineHistoryEvent {
1831                            routine_id: plan.routine_id.clone(),
1832                            trigger_type: "scheduled".to_string(),
1833                            run_count: plan.run_count,
1834                            fired_at_ms: now,
1835                            status: "blocked_policy".to_string(),
1836                            detail: Some(reason.clone()),
1837                        })
1838                        .await;
1839                    state.event_bus.publish(EngineEvent::new(
1840                        "routine.blocked",
1841                        serde_json::json!({
1842                            "routineID": plan.routine_id,
1843                            "runID": run.run_id,
1844                            "runCount": plan.run_count,
1845                            "triggerType": "scheduled",
1846                            "reason": reason,
1847                        }),
1848                    ));
1849                    state.event_bus.publish(EngineEvent::new(
1850                        "routine.run.created",
1851                        serde_json::json!({
1852                            "run": run,
1853                        }),
1854                    ));
1855                }
1856            }
1857        }
1858    }
1859}
1860
1861pub async fn run_routine_executor(state: AppState) {
1862    loop {
1863        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1864        let Some(run) = state.claim_next_queued_routine_run().await else {
1865            continue;
1866        };
1867
1868        state.event_bus.publish(EngineEvent::new(
1869            "routine.run.started",
1870            serde_json::json!({
1871                "runID": run.run_id,
1872                "routineID": run.routine_id,
1873                "triggerType": run.trigger_type,
1874                "startedAtMs": now_ms(),
1875            }),
1876        ));
1877
1878        let workspace_root = state.workspace_index.snapshot().await.root;
1879        let mut session = Session::new(
1880            Some(format!("Routine {}", run.routine_id)),
1881            Some(workspace_root.clone()),
1882        );
1883        let session_id = session.id.clone();
1884        session.workspace_root = Some(workspace_root);
1885
1886        if let Err(error) = state.storage.save_session(session).await {
1887            let detail = format!("failed to create routine session: {error}");
1888            let _ = state
1889                .update_routine_run_status(
1890                    &run.run_id,
1891                    RoutineRunStatus::Failed,
1892                    Some(detail.clone()),
1893                )
1894                .await;
1895            state.event_bus.publish(EngineEvent::new(
1896                "routine.run.failed",
1897                serde_json::json!({
1898                    "runID": run.run_id,
1899                    "routineID": run.routine_id,
1900                    "reason": detail,
1901                }),
1902            ));
1903            continue;
1904        }
1905
1906        state
1907            .set_routine_session_policy(
1908                session_id.clone(),
1909                run.run_id.clone(),
1910                run.routine_id.clone(),
1911                run.allowed_tools.clone(),
1912            )
1913            .await;
1914        state
1915            .engine_loop
1916            .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
1917            .await;
1918
1919        let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
1920        if let Some(spec) = selected_model.as_ref() {
1921            state.event_bus.publish(EngineEvent::new(
1922                "routine.run.model_selected",
1923                serde_json::json!({
1924                    "runID": run.run_id,
1925                    "routineID": run.routine_id,
1926                    "providerID": spec.provider_id,
1927                    "modelID": spec.model_id,
1928                    "source": model_source,
1929                }),
1930            ));
1931        }
1932
1933        let request = SendMessageRequest {
1934            parts: vec![MessagePartInput::Text {
1935                text: build_routine_prompt(&state, &run).await,
1936            }],
1937            model: selected_model,
1938            agent: None,
1939        };
1940
1941        let run_result = state
1942            .engine_loop
1943            .run_prompt_async_with_context(
1944                session_id.clone(),
1945                request,
1946                Some(format!("routine:{}", run.run_id)),
1947            )
1948            .await;
1949
1950        state.clear_routine_session_policy(&session_id).await;
1951        state
1952            .engine_loop
1953            .clear_session_allowed_tools(&session_id)
1954            .await;
1955
1956        match run_result {
1957            Ok(()) => {
1958                append_configured_output_artifacts(&state, &run).await;
1959                let _ = state
1960                    .update_routine_run_status(
1961                        &run.run_id,
1962                        RoutineRunStatus::Completed,
1963                        Some("routine run completed".to_string()),
1964                    )
1965                    .await;
1966                state.event_bus.publish(EngineEvent::new(
1967                    "routine.run.completed",
1968                    serde_json::json!({
1969                        "runID": run.run_id,
1970                        "routineID": run.routine_id,
1971                        "sessionID": session_id,
1972                        "finishedAtMs": now_ms(),
1973                    }),
1974                ));
1975            }
1976            Err(error) => {
1977                let detail = truncate_text(&error.to_string(), 500);
1978                let _ = state
1979                    .update_routine_run_status(
1980                        &run.run_id,
1981                        RoutineRunStatus::Failed,
1982                        Some(detail.clone()),
1983                    )
1984                    .await;
1985                state.event_bus.publish(EngineEvent::new(
1986                    "routine.run.failed",
1987                    serde_json::json!({
1988                        "runID": run.run_id,
1989                        "routineID": run.routine_id,
1990                        "sessionID": session_id,
1991                        "reason": detail,
1992                        "finishedAtMs": now_ms(),
1993                    }),
1994                ));
1995            }
1996        }
1997    }
1998}
1999
2000async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
2001    let normalized_entrypoint = run.entrypoint.trim();
2002    let known_tool = state
2003        .tools
2004        .list()
2005        .await
2006        .into_iter()
2007        .any(|schema| schema.name == normalized_entrypoint);
2008    if known_tool {
2009        let args = if run.args.is_object() {
2010            run.args.clone()
2011        } else {
2012            serde_json::json!({})
2013        };
2014        return format!("/tool {} {}", normalized_entrypoint, args);
2015    }
2016
2017    if let Some(objective) = routine_objective_from_args(run) {
2018        return build_routine_mission_prompt(run, &objective);
2019    }
2020
2021    format!(
2022        "Execute routine '{}' using entrypoint '{}' with args: {}",
2023        run.routine_id, run.entrypoint, run.args
2024    )
2025}
2026
2027fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
2028    run.args
2029        .get("prompt")
2030        .and_then(|v| v.as_str())
2031        .map(str::trim)
2032        .filter(|v| !v.is_empty())
2033        .map(ToString::to_string)
2034}
2035
2036fn routine_mode_from_args(args: &Value) -> &str {
2037    args.get("mode")
2038        .and_then(|v| v.as_str())
2039        .map(str::trim)
2040        .filter(|v| !v.is_empty())
2041        .unwrap_or("standalone")
2042}
2043
2044fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
2045    args.get("success_criteria")
2046        .and_then(|v| v.as_array())
2047        .map(|rows| {
2048            rows.iter()
2049                .filter_map(|row| row.as_str())
2050                .map(str::trim)
2051                .filter(|row| !row.is_empty())
2052                .map(ToString::to_string)
2053                .collect::<Vec<_>>()
2054        })
2055        .unwrap_or_default()
2056}
2057
2058fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
2059    let mode = routine_mode_from_args(&run.args);
2060    let success_criteria = routine_success_criteria_from_args(&run.args);
2061    let orchestrator_only_tool_calls = run
2062        .args
2063        .get("orchestrator_only_tool_calls")
2064        .and_then(|v| v.as_bool())
2065        .unwrap_or(false);
2066
2067    let mut lines = vec![
2068        format!("Automation ID: {}", run.routine_id),
2069        format!("Run ID: {}", run.run_id),
2070        format!("Mode: {}", mode),
2071        format!("Mission Objective: {}", objective),
2072    ];
2073
2074    if !success_criteria.is_empty() {
2075        lines.push("Success Criteria:".to_string());
2076        for criterion in success_criteria {
2077            lines.push(format!("- {}", criterion));
2078        }
2079    }
2080
2081    if run.allowed_tools.is_empty() {
2082        lines.push("Allowed Tools: all available by current policy".to_string());
2083    } else {
2084        lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
2085    }
2086
2087    if run.output_targets.is_empty() {
2088        lines.push("Output Targets: none configured".to_string());
2089    } else {
2090        lines.push("Output Targets:".to_string());
2091        for target in &run.output_targets {
2092            lines.push(format!("- {}", target));
2093        }
2094    }
2095
2096    if mode.eq_ignore_ascii_case("orchestrated") {
2097        lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
2098        lines
2099            .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
2100        if orchestrator_only_tool_calls {
2101            lines.push(
2102                "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
2103                    .to_string(),
2104            );
2105        }
2106    } else {
2107        lines.push("Execution Pattern: Standalone mission run".to_string());
2108    }
2109
2110    lines.push(
2111        "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
2112            .to_string(),
2113    );
2114
2115    lines.join("\n")
2116}
2117
2118fn truncate_text(input: &str, max_len: usize) -> String {
2119    if input.len() <= max_len {
2120        return input.to_string();
2121    }
2122    let mut out = input[..max_len].to_string();
2123    out.push_str("...<truncated>");
2124    out
2125}
2126
2127async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
2128    if run.output_targets.is_empty() {
2129        return;
2130    }
2131    for target in &run.output_targets {
2132        let artifact = RoutineRunArtifact {
2133            artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
2134            uri: target.clone(),
2135            kind: "output_target".to_string(),
2136            label: Some("configured output target".to_string()),
2137            created_at_ms: now_ms(),
2138            metadata: Some(serde_json::json!({
2139                "source": "routine.output_targets",
2140                "runID": run.run_id,
2141                "routineID": run.routine_id,
2142            })),
2143        };
2144        let _ = state
2145            .append_routine_run_artifact(&run.run_id, artifact.clone())
2146            .await;
2147        state.event_bus.publish(EngineEvent::new(
2148            "routine.run.artifact_added",
2149            serde_json::json!({
2150                "runID": run.run_id,
2151                "routineID": run.routine_id,
2152                "artifact": artifact,
2153            }),
2154        ));
2155    }
2156}
2157
2158fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
2159    let obj = value.as_object()?;
2160    let provider_id = obj.get("provider_id")?.as_str()?.trim();
2161    let model_id = obj.get("model_id")?.as_str()?.trim();
2162    if provider_id.is_empty() || model_id.is_empty() {
2163        return None;
2164    }
2165    Some(ModelSpec {
2166        provider_id: provider_id.to_string(),
2167        model_id: model_id.to_string(),
2168    })
2169}
2170
2171fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
2172    args.get("model_policy")
2173        .and_then(|v| v.get("role_models"))
2174        .and_then(|v| v.get(role))
2175        .and_then(parse_model_spec)
2176}
2177
2178fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
2179    args.get("model_policy")
2180        .and_then(|v| v.get("default_model"))
2181        .and_then(parse_model_spec)
2182}
2183
2184fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
2185    providers.iter().any(|provider| {
2186        provider.id == spec.provider_id
2187            && provider
2188                .models
2189                .iter()
2190                .any(|model| model.id == spec.model_id)
2191    })
2192}
2193
2194async fn resolve_routine_model_spec_for_run(
2195    state: &AppState,
2196    run: &RoutineRunRecord,
2197) -> (Option<ModelSpec>, String) {
2198    let providers = state.providers.list().await;
2199    let mode = routine_mode_from_args(&run.args);
2200    let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
2201
2202    if mode.eq_ignore_ascii_case("orchestrated") {
2203        if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
2204            requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
2205        }
2206    }
2207    if let Some(default_model) = default_model_spec_from_args(&run.args) {
2208        requested.push((default_model, "args.model_policy.default_model"));
2209    }
2210
2211    for (candidate, source) in requested {
2212        if provider_catalog_has_model(&providers, &candidate) {
2213            return (Some(candidate), source.to_string());
2214        }
2215    }
2216
2217    let fallback = providers
2218        .into_iter()
2219        .find(|provider| !provider.models.is_empty())
2220        .and_then(|provider| {
2221            let model = provider.models.first()?;
2222            Some(ModelSpec {
2223                provider_id: provider.id,
2224                model_id: model.id.clone(),
2225            })
2226        });
2227
2228    (fallback, "provider_catalog_fallback".to_string())
2229}
2230
2231#[cfg(test)]
2232mod tests {
2233    use super::*;
2234
2235    fn test_state_with_path(path: PathBuf) -> AppState {
2236        let mut state = AppState::new_starting("test-attempt".to_string(), true);
2237        state.shared_resources_path = path;
2238        state.routines_path = tmp_routines_file("shared-state");
2239        state.routine_history_path = tmp_routines_file("routine-history");
2240        state.routine_runs_path = tmp_routines_file("routine-runs");
2241        state
2242    }
2243
2244    fn tmp_resource_file(name: &str) -> PathBuf {
2245        std::env::temp_dir().join(format!(
2246            "tandem-server-{name}-{}.json",
2247            uuid::Uuid::new_v4()
2248        ))
2249    }
2250
2251    fn tmp_routines_file(name: &str) -> PathBuf {
2252        std::env::temp_dir().join(format!(
2253            "tandem-server-routines-{name}-{}.json",
2254            uuid::Uuid::new_v4()
2255        ))
2256    }
2257
2258    #[tokio::test]
2259    async fn shared_resource_put_increments_revision() {
2260        let path = tmp_resource_file("shared-resource-put");
2261        let state = test_state_with_path(path.clone());
2262
2263        let first = state
2264            .put_shared_resource(
2265                "project/demo/board".to_string(),
2266                serde_json::json!({"status":"todo"}),
2267                None,
2268                "agent-1".to_string(),
2269                None,
2270            )
2271            .await
2272            .expect("first put");
2273        assert_eq!(first.rev, 1);
2274
2275        let second = state
2276            .put_shared_resource(
2277                "project/demo/board".to_string(),
2278                serde_json::json!({"status":"doing"}),
2279                Some(1),
2280                "agent-2".to_string(),
2281                Some(60_000),
2282            )
2283            .await
2284            .expect("second put");
2285        assert_eq!(second.rev, 2);
2286        assert_eq!(second.updated_by, "agent-2");
2287        assert_eq!(second.ttl_ms, Some(60_000));
2288
2289        let raw = tokio::fs::read_to_string(path.clone())
2290            .await
2291            .expect("persisted");
2292        assert!(raw.contains("\"rev\": 2"));
2293        let _ = tokio::fs::remove_file(path).await;
2294    }
2295
2296    #[tokio::test]
2297    async fn shared_resource_put_detects_revision_conflict() {
2298        let path = tmp_resource_file("shared-resource-conflict");
2299        let state = test_state_with_path(path.clone());
2300
2301        let _ = state
2302            .put_shared_resource(
2303                "mission/demo/card-1".to_string(),
2304                serde_json::json!({"title":"Card 1"}),
2305                None,
2306                "agent-1".to_string(),
2307                None,
2308            )
2309            .await
2310            .expect("seed put");
2311
2312        let conflict = state
2313            .put_shared_resource(
2314                "mission/demo/card-1".to_string(),
2315                serde_json::json!({"title":"Card 1 edited"}),
2316                Some(99),
2317                "agent-2".to_string(),
2318                None,
2319            )
2320            .await
2321            .expect_err("expected conflict");
2322
2323        match conflict {
2324            ResourceStoreError::RevisionConflict(conflict) => {
2325                assert_eq!(conflict.expected_rev, Some(99));
2326                assert_eq!(conflict.current_rev, Some(1));
2327            }
2328            other => panic!("unexpected error: {other:?}"),
2329        }
2330
2331        let _ = tokio::fs::remove_file(path).await;
2332    }
2333
2334    #[tokio::test]
2335    async fn shared_resource_rejects_invalid_namespace_key() {
2336        let path = tmp_resource_file("shared-resource-invalid-key");
2337        let state = test_state_with_path(path.clone());
2338
2339        let error = state
2340            .put_shared_resource(
2341                "global/demo/key".to_string(),
2342                serde_json::json!({"x":1}),
2343                None,
2344                "agent-1".to_string(),
2345                None,
2346            )
2347            .await
2348            .expect_err("invalid key should fail");
2349
2350        match error {
2351            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
2352            other => panic!("unexpected error: {other:?}"),
2353        }
2354
2355        assert!(!path.exists());
2356    }
2357
2358    #[test]
2359    fn derive_status_index_update_for_run_started() {
2360        let event = EngineEvent::new(
2361            "session.run.started",
2362            serde_json::json!({
2363                "sessionID": "s-1",
2364                "runID": "r-1"
2365            }),
2366        );
2367        let update = derive_status_index_update(&event).expect("update");
2368        assert_eq!(update.key, "run/s-1/status");
2369        assert_eq!(
2370            update.value.get("state").and_then(|v| v.as_str()),
2371            Some("running")
2372        );
2373        assert_eq!(
2374            update.value.get("phase").and_then(|v| v.as_str()),
2375            Some("run")
2376        );
2377    }
2378
2379    #[test]
2380    fn derive_status_index_update_for_tool_invocation() {
2381        let event = EngineEvent::new(
2382            "message.part.updated",
2383            serde_json::json!({
2384                "sessionID": "s-2",
2385                "runID": "r-2",
2386                "part": { "type": "tool-invocation", "tool": "todo_write" }
2387            }),
2388        );
2389        let update = derive_status_index_update(&event).expect("update");
2390        assert_eq!(update.key, "run/s-2/status");
2391        assert_eq!(
2392            update.value.get("phase").and_then(|v| v.as_str()),
2393            Some("tool")
2394        );
2395        assert_eq!(
2396            update.value.get("toolActive").and_then(|v| v.as_bool()),
2397            Some(true)
2398        );
2399        assert_eq!(
2400            update.value.get("tool").and_then(|v| v.as_str()),
2401            Some("todo_write")
2402        );
2403    }
2404
2405    #[test]
2406    fn misfire_skip_drops_runs_and_advances_next_fire() {
2407        let (count, next_fire) =
2408            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
2409        assert_eq!(count, 0);
2410        assert_eq!(next_fire, 11_000);
2411    }
2412
2413    #[test]
2414    fn misfire_run_once_emits_single_trigger() {
2415        let (count, next_fire) =
2416            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
2417        assert_eq!(count, 1);
2418        assert_eq!(next_fire, 11_000);
2419    }
2420
2421    #[test]
2422    fn misfire_catch_up_caps_trigger_count() {
2423        let (count, next_fire) = compute_misfire_plan(
2424            25_000,
2425            5_000,
2426            1_000,
2427            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2428        );
2429        assert_eq!(count, 3);
2430        assert_eq!(next_fire, 26_000);
2431    }
2432
2433    #[tokio::test]
2434    async fn routine_put_persists_and_loads() {
2435        let routines_path = tmp_routines_file("persist-load");
2436        let mut state = AppState::new_starting("routines-put".to_string(), true);
2437        state.routines_path = routines_path.clone();
2438
2439        let routine = RoutineSpec {
2440            routine_id: "routine-1".to_string(),
2441            name: "Digest".to_string(),
2442            status: RoutineStatus::Active,
2443            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2444            timezone: "UTC".to_string(),
2445            misfire_policy: RoutineMisfirePolicy::RunOnce,
2446            entrypoint: "mission.default".to_string(),
2447            args: serde_json::json!({"topic":"status"}),
2448            allowed_tools: vec![],
2449            output_targets: vec![],
2450            creator_type: "user".to_string(),
2451            creator_id: "user-1".to_string(),
2452            requires_approval: true,
2453            external_integrations_allowed: false,
2454            next_fire_at_ms: Some(5_000),
2455            last_fired_at_ms: None,
2456        };
2457
2458        state.put_routine(routine).await.expect("store routine");
2459
2460        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
2461        reloaded.routines_path = routines_path.clone();
2462        reloaded.load_routines().await.expect("load routines");
2463        let list = reloaded.list_routines().await;
2464        assert_eq!(list.len(), 1);
2465        assert_eq!(list[0].routine_id, "routine-1");
2466
2467        let _ = tokio::fs::remove_file(routines_path).await;
2468    }
2469
2470    #[tokio::test]
2471    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
2472        let routines_path = tmp_routines_file("misfire-eval");
2473        let mut state = AppState::new_starting("routines-eval".to_string(), true);
2474        state.routines_path = routines_path.clone();
2475
2476        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
2477            routine_id: id.to_string(),
2478            name: id.to_string(),
2479            status: RoutineStatus::Active,
2480            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
2481            timezone: "UTC".to_string(),
2482            misfire_policy: policy,
2483            entrypoint: "mission.default".to_string(),
2484            args: serde_json::json!({}),
2485            allowed_tools: vec![],
2486            output_targets: vec![],
2487            creator_type: "user".to_string(),
2488            creator_id: "u-1".to_string(),
2489            requires_approval: false,
2490            external_integrations_allowed: false,
2491            next_fire_at_ms: Some(5_000),
2492            last_fired_at_ms: None,
2493        };
2494
2495        state
2496            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
2497            .await
2498            .expect("put skip");
2499        state
2500            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
2501            .await
2502            .expect("put once");
2503        state
2504            .put_routine(base(
2505                "routine-catch",
2506                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2507            ))
2508            .await
2509            .expect("put catch");
2510
2511        let plans = state.evaluate_routine_misfires(10_500).await;
2512        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
2513        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
2514        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
2515
2516        assert!(plan_skip.is_none());
2517        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
2518        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
2519
2520        let stored = state.list_routines().await;
2521        let skip_next = stored
2522            .iter()
2523            .find(|r| r.routine_id == "routine-skip")
2524            .and_then(|r| r.next_fire_at_ms)
2525            .expect("skip next");
2526        assert!(skip_next > 10_500);
2527
2528        let _ = tokio::fs::remove_file(routines_path).await;
2529    }
2530
2531    #[test]
2532    fn routine_policy_blocks_external_side_effects_by_default() {
2533        let routine = RoutineSpec {
2534            routine_id: "routine-policy-1".to_string(),
2535            name: "Connector routine".to_string(),
2536            status: RoutineStatus::Active,
2537            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2538            timezone: "UTC".to_string(),
2539            misfire_policy: RoutineMisfirePolicy::RunOnce,
2540            entrypoint: "connector.email.reply".to_string(),
2541            args: serde_json::json!({}),
2542            allowed_tools: vec![],
2543            output_targets: vec![],
2544            creator_type: "user".to_string(),
2545            creator_id: "u-1".to_string(),
2546            requires_approval: true,
2547            external_integrations_allowed: false,
2548            next_fire_at_ms: None,
2549            last_fired_at_ms: None,
2550        };
2551
2552        let decision = evaluate_routine_execution_policy(&routine, "manual");
2553        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
2554    }
2555
2556    #[test]
2557    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
2558        let routine = RoutineSpec {
2559            routine_id: "routine-policy-2".to_string(),
2560            name: "Connector routine".to_string(),
2561            status: RoutineStatus::Active,
2562            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2563            timezone: "UTC".to_string(),
2564            misfire_policy: RoutineMisfirePolicy::RunOnce,
2565            entrypoint: "connector.email.reply".to_string(),
2566            args: serde_json::json!({}),
2567            allowed_tools: vec![],
2568            output_targets: vec![],
2569            creator_type: "user".to_string(),
2570            creator_id: "u-1".to_string(),
2571            requires_approval: true,
2572            external_integrations_allowed: true,
2573            next_fire_at_ms: None,
2574            last_fired_at_ms: None,
2575        };
2576
2577        let decision = evaluate_routine_execution_policy(&routine, "manual");
2578        assert!(matches!(
2579            decision,
2580            RoutineExecutionDecision::RequiresApproval { .. }
2581        ));
2582    }
2583
2584    #[test]
2585    fn routine_policy_allows_non_external_entrypoints() {
2586        let routine = RoutineSpec {
2587            routine_id: "routine-policy-3".to_string(),
2588            name: "Internal mission routine".to_string(),
2589            status: RoutineStatus::Active,
2590            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2591            timezone: "UTC".to_string(),
2592            misfire_policy: RoutineMisfirePolicy::RunOnce,
2593            entrypoint: "mission.default".to_string(),
2594            args: serde_json::json!({}),
2595            allowed_tools: vec![],
2596            output_targets: vec![],
2597            creator_type: "user".to_string(),
2598            creator_id: "u-1".to_string(),
2599            requires_approval: true,
2600            external_integrations_allowed: false,
2601            next_fire_at_ms: None,
2602            last_fired_at_ms: None,
2603        };
2604
2605        let decision = evaluate_routine_execution_policy(&routine, "manual");
2606        assert_eq!(decision, RoutineExecutionDecision::Allowed);
2607    }
2608
2609    #[tokio::test]
2610    async fn claim_next_queued_routine_run_marks_oldest_running() {
2611        let mut state = AppState::new_starting("routine-claim".to_string(), true);
2612        state.routine_runs_path = tmp_routines_file("routine-claim-runs");
2613
2614        let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
2615            run_id: run_id.to_string(),
2616            routine_id: "routine-claim".to_string(),
2617            trigger_type: "manual".to_string(),
2618            run_count: 1,
2619            status: RoutineRunStatus::Queued,
2620            created_at_ms,
2621            updated_at_ms: created_at_ms,
2622            fired_at_ms: Some(created_at_ms),
2623            started_at_ms: None,
2624            finished_at_ms: None,
2625            requires_approval: false,
2626            approval_reason: None,
2627            denial_reason: None,
2628            paused_reason: None,
2629            detail: None,
2630            entrypoint: "mission.default".to_string(),
2631            args: serde_json::json!({}),
2632            allowed_tools: vec![],
2633            output_targets: vec![],
2634            artifacts: vec![],
2635        };
2636
2637        {
2638            let mut guard = state.routine_runs.write().await;
2639            guard.insert("run-late".to_string(), mk("run-late", 2_000));
2640            guard.insert("run-early".to_string(), mk("run-early", 1_000));
2641        }
2642        state.persist_routine_runs().await.expect("persist");
2643
2644        let claimed = state
2645            .claim_next_queued_routine_run()
2646            .await
2647            .expect("claimed run");
2648        assert_eq!(claimed.run_id, "run-early");
2649        assert_eq!(claimed.status, RoutineRunStatus::Running);
2650        assert!(claimed.started_at_ms.is_some());
2651    }
2652
2653    #[tokio::test]
2654    async fn routine_session_policy_roundtrip_normalizes_tools() {
2655        let state = AppState::new_starting("routine-policy-hook".to_string(), true);
2656        state
2657            .set_routine_session_policy(
2658                "session-routine-1".to_string(),
2659                "run-1".to_string(),
2660                "routine-1".to_string(),
2661                vec![
2662                    "read".to_string(),
2663                    " mcp.arcade.search ".to_string(),
2664                    "read".to_string(),
2665                    "".to_string(),
2666                ],
2667            )
2668            .await;
2669
2670        let policy = state
2671            .routine_session_policy("session-routine-1")
2672            .await
2673            .expect("policy");
2674        assert_eq!(
2675            policy.allowed_tools,
2676            vec!["read".to_string(), "mcp.arcade.search".to_string()]
2677        );
2678    }
2679
2680    #[test]
2681    fn routine_mission_prompt_includes_orchestrated_contract() {
2682        let run = RoutineRunRecord {
2683            run_id: "run-orchestrated-1".to_string(),
2684            routine_id: "automation-orchestrated".to_string(),
2685            trigger_type: "manual".to_string(),
2686            run_count: 1,
2687            status: RoutineRunStatus::Queued,
2688            created_at_ms: 1_000,
2689            updated_at_ms: 1_000,
2690            fired_at_ms: Some(1_000),
2691            started_at_ms: None,
2692            finished_at_ms: None,
2693            requires_approval: true,
2694            approval_reason: None,
2695            denial_reason: None,
2696            paused_reason: None,
2697            detail: None,
2698            entrypoint: "mission.default".to_string(),
2699            args: serde_json::json!({
2700                "prompt": "Coordinate a multi-step release readiness check.",
2701                "mode": "orchestrated",
2702                "success_criteria": ["All blockers listed", "Output artifact written"],
2703                "orchestrator_only_tool_calls": true
2704            }),
2705            allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
2706            output_targets: vec!["file://reports/release-readiness.md".to_string()],
2707            artifacts: vec![],
2708        };
2709
2710        let objective = routine_objective_from_args(&run).expect("objective");
2711        let prompt = build_routine_mission_prompt(&run, &objective);
2712
2713        assert!(prompt.contains("Mode: orchestrated"));
2714        assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
2715        assert!(prompt.contains("only the orchestrator may execute tools"));
2716        assert!(prompt.contains("Allowed Tools: read, webfetch"));
2717        assert!(prompt.contains("file://reports/release-readiness.md"));
2718    }
2719
2720    #[test]
2721    fn routine_mission_prompt_includes_standalone_defaults() {
2722        let run = RoutineRunRecord {
2723            run_id: "run-standalone-1".to_string(),
2724            routine_id: "automation-standalone".to_string(),
2725            trigger_type: "manual".to_string(),
2726            run_count: 1,
2727            status: RoutineRunStatus::Queued,
2728            created_at_ms: 2_000,
2729            updated_at_ms: 2_000,
2730            fired_at_ms: Some(2_000),
2731            started_at_ms: None,
2732            finished_at_ms: None,
2733            requires_approval: false,
2734            approval_reason: None,
2735            denial_reason: None,
2736            paused_reason: None,
2737            detail: None,
2738            entrypoint: "mission.default".to_string(),
2739            args: serde_json::json!({
2740                "prompt": "Summarize top engineering updates.",
2741                "success_criteria": ["Three bullet summary"]
2742            }),
2743            allowed_tools: vec![],
2744            output_targets: vec![],
2745            artifacts: vec![],
2746        };
2747
2748        let objective = routine_objective_from_args(&run).expect("objective");
2749        let prompt = build_routine_mission_prompt(&run, &objective);
2750
2751        assert!(prompt.contains("Mode: standalone"));
2752        assert!(prompt.contains("Execution Pattern: Standalone mission run"));
2753        assert!(prompt.contains("Allowed Tools: all available by current policy"));
2754        assert!(prompt.contains("Output Targets: none configured"));
2755    }
2756}