Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "256"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
12use tandem_orchestrator::MissionState;
13use tandem_types::{
14    EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
15    SendMessageRequest, Session, ShellFamily,
16};
17use tokio::fs;
18use tokio::sync::RwLock;
19
20use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
21use tandem_core::{
22    AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus, PermissionManager,
23    PluginRegistry, Storage,
24};
25use tandem_providers::ProviderRegistry;
26use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
27use tandem_tools::ToolRegistry;
28
29mod agent_teams;
30mod http;
31pub mod webui;
32
33pub use agent_teams::AgentTeamRuntime;
34pub use http::serve;
35
36#[derive(Debug, Clone, Serialize, Deserialize, Default)]
37pub struct ChannelStatus {
38    pub enabled: bool,
39    pub connected: bool,
40    pub last_error: Option<String>,
41    pub active_sessions: u64,
42    pub meta: Value,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Default)]
46pub struct WebUiConfig {
47    #[serde(default)]
48    pub enabled: bool,
49    #[serde(default = "default_web_ui_prefix")]
50    pub path_prefix: String,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, Default)]
54pub struct ChannelsConfigFile {
55    pub telegram: Option<TelegramConfigFile>,
56    pub discord: Option<DiscordConfigFile>,
57    pub slack: Option<SlackConfigFile>,
58    #[serde(default)]
59    pub tool_policy: tandem_channels::config::ChannelToolPolicy,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TelegramConfigFile {
64    pub bot_token: String,
65    #[serde(default = "default_allow_all")]
66    pub allowed_users: Vec<String>,
67    #[serde(default)]
68    pub mention_only: bool,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct DiscordConfigFile {
73    pub bot_token: String,
74    #[serde(default)]
75    pub guild_id: Option<String>,
76    #[serde(default = "default_allow_all")]
77    pub allowed_users: Vec<String>,
78    #[serde(default = "default_discord_mention_only")]
79    pub mention_only: bool,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct SlackConfigFile {
84    pub bot_token: String,
85    pub channel_id: String,
86    #[serde(default = "default_allow_all")]
87    pub allowed_users: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, Default)]
91struct EffectiveAppConfig {
92    #[serde(default)]
93    pub channels: ChannelsConfigFile,
94    #[serde(default)]
95    pub web_ui: WebUiConfig,
96    #[serde(default)]
97    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
98}
99
100#[derive(Default)]
101pub struct ChannelRuntime {
102    pub listeners: Option<tokio::task::JoinSet<()>>,
103    pub statuses: std::collections::HashMap<String, ChannelStatus>,
104}
105
106#[derive(Debug, Clone)]
107pub struct EngineLease {
108    pub lease_id: String,
109    pub client_id: String,
110    pub client_type: String,
111    pub acquired_at_ms: u64,
112    pub last_renewed_at_ms: u64,
113    pub ttl_ms: u64,
114}
115
116impl EngineLease {
117    pub fn is_expired(&self, now_ms: u64) -> bool {
118        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
119    }
120}
121
122#[derive(Debug, Clone, Serialize)]
123pub struct ActiveRun {
124    #[serde(rename = "runID")]
125    pub run_id: String,
126    #[serde(rename = "startedAtMs")]
127    pub started_at_ms: u64,
128    #[serde(rename = "lastActivityAtMs")]
129    pub last_activity_at_ms: u64,
130    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
131    pub client_id: Option<String>,
132    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
133    pub agent_id: Option<String>,
134    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
135    pub agent_profile: Option<String>,
136}
137
138#[derive(Clone, Default)]
139pub struct RunRegistry {
140    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
141}
142
143impl RunRegistry {
144    pub fn new() -> Self {
145        Self::default()
146    }
147
148    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
149        self.active.read().await.get(session_id).cloned()
150    }
151
152    pub async fn acquire(
153        &self,
154        session_id: &str,
155        run_id: String,
156        client_id: Option<String>,
157        agent_id: Option<String>,
158        agent_profile: Option<String>,
159    ) -> std::result::Result<ActiveRun, ActiveRun> {
160        let mut guard = self.active.write().await;
161        if let Some(existing) = guard.get(session_id).cloned() {
162            return Err(existing);
163        }
164        let now = now_ms();
165        let run = ActiveRun {
166            run_id,
167            started_at_ms: now,
168            last_activity_at_ms: now,
169            client_id,
170            agent_id,
171            agent_profile,
172        };
173        guard.insert(session_id.to_string(), run.clone());
174        Ok(run)
175    }
176
177    pub async fn touch(&self, session_id: &str, run_id: &str) {
178        let mut guard = self.active.write().await;
179        if let Some(run) = guard.get_mut(session_id) {
180            if run.run_id == run_id {
181                run.last_activity_at_ms = now_ms();
182            }
183        }
184    }
185
186    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
187        let mut guard = self.active.write().await;
188        if let Some(run) = guard.get(session_id) {
189            if run.run_id == run_id {
190                return guard.remove(session_id);
191            }
192        }
193        None
194    }
195
196    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
197        self.active.write().await.remove(session_id)
198    }
199
200    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
201        let now = now_ms();
202        let mut guard = self.active.write().await;
203        let stale_ids = guard
204            .iter()
205            .filter_map(|(session_id, run)| {
206                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
207                    Some(session_id.clone())
208                } else {
209                    None
210                }
211            })
212            .collect::<Vec<_>>();
213        let mut out = Vec::with_capacity(stale_ids.len());
214        for session_id in stale_ids {
215            if let Some(run) = guard.remove(&session_id) {
216                out.push((session_id, run));
217            }
218        }
219        out
220    }
221}
222
223pub fn now_ms() -> u64 {
224    SystemTime::now()
225        .duration_since(UNIX_EPOCH)
226        .map(|d| d.as_millis() as u64)
227        .unwrap_or(0)
228}
229
230pub fn build_id() -> String {
231    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
232        let trimmed = explicit.trim();
233        if !trimmed.is_empty() {
234            return trimmed.to_string();
235        }
236    }
237    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
238        let trimmed = git_sha.trim();
239        if !trimmed.is_empty() {
240            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
241        }
242    }
243    env!("CARGO_PKG_VERSION").to_string()
244}
245
246pub fn detect_host_runtime_context() -> HostRuntimeContext {
247    let os = if cfg!(target_os = "windows") {
248        HostOs::Windows
249    } else if cfg!(target_os = "macos") {
250        HostOs::Macos
251    } else {
252        HostOs::Linux
253    };
254    let (shell_family, path_style) = match os {
255        HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
256        HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
257    };
258    HostRuntimeContext {
259        os,
260        arch: std::env::consts::ARCH.to_string(),
261        shell_family,
262        path_style,
263    }
264}
265
266pub fn binary_path_for_health() -> Option<String> {
267    #[cfg(debug_assertions)]
268    {
269        std::env::current_exe()
270            .ok()
271            .map(|p| p.to_string_lossy().to_string())
272    }
273    #[cfg(not(debug_assertions))]
274    {
275        None
276    }
277}
278
279#[derive(Clone)]
280pub struct RuntimeState {
281    pub storage: Arc<Storage>,
282    pub config: ConfigStore,
283    pub event_bus: EventBus,
284    pub providers: ProviderRegistry,
285    pub plugins: PluginRegistry,
286    pub agents: AgentRegistry,
287    pub tools: ToolRegistry,
288    pub permissions: PermissionManager,
289    pub mcp: McpRegistry,
290    pub pty: PtyManager,
291    pub lsp: LspManager,
292    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
293    pub logs: Arc<RwLock<Vec<Value>>>,
294    pub workspace_index: WorkspaceIndex,
295    pub cancellations: CancellationRegistry,
296    pub engine_loop: EngineLoop,
297    pub host_runtime_context: HostRuntimeContext,
298}
299
300#[derive(Debug, Clone)]
301pub struct GovernedMemoryRecord {
302    pub id: String,
303    pub run_id: String,
304    pub partition: MemoryPartition,
305    pub kind: MemoryContentKind,
306    pub content: String,
307    pub artifact_refs: Vec<String>,
308    pub classification: MemoryClassification,
309    pub metadata: Option<Value>,
310    pub source_memory_id: Option<String>,
311    pub created_at_ms: u64,
312}
313
314#[derive(Debug, Clone, Serialize)]
315pub struct MemoryAuditEvent {
316    pub audit_id: String,
317    pub action: String,
318    pub run_id: String,
319    pub memory_id: Option<String>,
320    pub source_memory_id: Option<String>,
321    pub to_tier: Option<GovernedMemoryTier>,
322    pub partition_key: String,
323    pub actor: String,
324    pub status: String,
325    #[serde(skip_serializing_if = "Option::is_none")]
326    pub detail: Option<String>,
327    pub created_at_ms: u64,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct SharedResourceRecord {
332    pub key: String,
333    pub value: Value,
334    pub rev: u64,
335    pub updated_at_ms: u64,
336    pub updated_by: String,
337    #[serde(skip_serializing_if = "Option::is_none")]
338    pub ttl_ms: Option<u64>,
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
342#[serde(rename_all = "snake_case")]
343pub enum RoutineSchedule {
344    IntervalSeconds { seconds: u64 },
345    Cron { expression: String },
346}
347
348#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
349#[serde(rename_all = "snake_case", tag = "type")]
350pub enum RoutineMisfirePolicy {
351    Skip,
352    RunOnce,
353    CatchUp { max_runs: u32 },
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
357#[serde(rename_all = "snake_case")]
358pub enum RoutineStatus {
359    Active,
360    Paused,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct RoutineSpec {
365    pub routine_id: String,
366    pub name: String,
367    pub status: RoutineStatus,
368    pub schedule: RoutineSchedule,
369    pub timezone: String,
370    pub misfire_policy: RoutineMisfirePolicy,
371    pub entrypoint: String,
372    #[serde(default)]
373    pub args: Value,
374    #[serde(default)]
375    pub allowed_tools: Vec<String>,
376    #[serde(default)]
377    pub output_targets: Vec<String>,
378    pub creator_type: String,
379    pub creator_id: String,
380    pub requires_approval: bool,
381    pub external_integrations_allowed: bool,
382    #[serde(default, skip_serializing_if = "Option::is_none")]
383    pub next_fire_at_ms: Option<u64>,
384    #[serde(default, skip_serializing_if = "Option::is_none")]
385    pub last_fired_at_ms: Option<u64>,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct RoutineHistoryEvent {
390    pub routine_id: String,
391    pub trigger_type: String,
392    pub run_count: u32,
393    pub fired_at_ms: u64,
394    pub status: String,
395    #[serde(default, skip_serializing_if = "Option::is_none")]
396    pub detail: Option<String>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineRunStatus {
402    Queued,
403    PendingApproval,
404    Running,
405    Paused,
406    BlockedPolicy,
407    Denied,
408    Completed,
409    Failed,
410    Cancelled,
411}
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
414pub struct RoutineRunArtifact {
415    pub artifact_id: String,
416    pub uri: String,
417    pub kind: String,
418    #[serde(default, skip_serializing_if = "Option::is_none")]
419    pub label: Option<String>,
420    pub created_at_ms: u64,
421    #[serde(default, skip_serializing_if = "Option::is_none")]
422    pub metadata: Option<Value>,
423}
424
425#[derive(Debug, Clone, Serialize, Deserialize)]
426pub struct RoutineRunRecord {
427    pub run_id: String,
428    pub routine_id: String,
429    pub trigger_type: String,
430    pub run_count: u32,
431    pub status: RoutineRunStatus,
432    pub created_at_ms: u64,
433    pub updated_at_ms: u64,
434    #[serde(default, skip_serializing_if = "Option::is_none")]
435    pub fired_at_ms: Option<u64>,
436    #[serde(default, skip_serializing_if = "Option::is_none")]
437    pub started_at_ms: Option<u64>,
438    #[serde(default, skip_serializing_if = "Option::is_none")]
439    pub finished_at_ms: Option<u64>,
440    pub requires_approval: bool,
441    #[serde(default, skip_serializing_if = "Option::is_none")]
442    pub approval_reason: Option<String>,
443    #[serde(default, skip_serializing_if = "Option::is_none")]
444    pub denial_reason: Option<String>,
445    #[serde(default, skip_serializing_if = "Option::is_none")]
446    pub paused_reason: Option<String>,
447    #[serde(default, skip_serializing_if = "Option::is_none")]
448    pub detail: Option<String>,
449    pub entrypoint: String,
450    #[serde(default)]
451    pub args: Value,
452    #[serde(default)]
453    pub allowed_tools: Vec<String>,
454    #[serde(default)]
455    pub output_targets: Vec<String>,
456    #[serde(default)]
457    pub artifacts: Vec<RoutineRunArtifact>,
458}
459
460#[derive(Debug, Clone)]
461pub struct RoutineSessionPolicy {
462    pub session_id: String,
463    pub run_id: String,
464    pub routine_id: String,
465    pub allowed_tools: Vec<String>,
466}
467
468#[derive(Debug, Clone, Serialize)]
469pub struct RoutineTriggerPlan {
470    pub routine_id: String,
471    pub run_count: u32,
472    pub scheduled_at_ms: u64,
473    pub next_fire_at_ms: u64,
474}
475
476#[derive(Debug, Clone, Serialize)]
477pub struct ResourceConflict {
478    pub key: String,
479    pub expected_rev: Option<u64>,
480    pub current_rev: Option<u64>,
481}
482
483#[derive(Debug, Clone, Serialize)]
484#[serde(tag = "type", rename_all = "snake_case")]
485pub enum ResourceStoreError {
486    InvalidKey { key: String },
487    RevisionConflict(ResourceConflict),
488    PersistFailed { message: String },
489}
490
491#[derive(Debug, Clone, Serialize)]
492#[serde(tag = "type", rename_all = "snake_case")]
493pub enum RoutineStoreError {
494    InvalidRoutineId { routine_id: String },
495    InvalidSchedule { detail: String },
496    PersistFailed { message: String },
497}
498
499#[derive(Debug, Clone)]
500pub enum StartupStatus {
501    Starting,
502    Ready,
503    Failed,
504}
505
506#[derive(Debug, Clone)]
507pub struct StartupState {
508    pub status: StartupStatus,
509    pub phase: String,
510    pub started_at_ms: u64,
511    pub attempt_id: String,
512    pub last_error: Option<String>,
513}
514
515#[derive(Debug, Clone)]
516pub struct StartupSnapshot {
517    pub status: StartupStatus,
518    pub phase: String,
519    pub started_at_ms: u64,
520    pub attempt_id: String,
521    pub last_error: Option<String>,
522    pub elapsed_ms: u64,
523}
524
525#[derive(Clone)]
526pub struct AppState {
527    pub runtime: Arc<OnceLock<RuntimeState>>,
528    pub startup: Arc<RwLock<StartupState>>,
529    pub in_process_mode: Arc<AtomicBool>,
530    pub api_token: Arc<RwLock<Option<String>>>,
531    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
532    pub run_registry: RunRegistry,
533    pub run_stale_ms: u64,
534    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
535    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
536    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
537    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
538    pub shared_resources_path: PathBuf,
539    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
540    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
541    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
542    pub routine_session_policies:
543        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
544    pub routines_path: PathBuf,
545    pub routine_history_path: PathBuf,
546    pub routine_runs_path: PathBuf,
547    pub agent_teams: AgentTeamRuntime,
548    pub web_ui_enabled: Arc<AtomicBool>,
549    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
550    pub server_base_url: Arc<std::sync::RwLock<String>>,
551    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
552    pub host_runtime_context: HostRuntimeContext,
553}
554
555#[derive(Debug, Clone)]
556struct StatusIndexUpdate {
557    key: String,
558    value: Value,
559}
560
561impl AppState {
562    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
563        Self {
564            runtime: Arc::new(OnceLock::new()),
565            startup: Arc::new(RwLock::new(StartupState {
566                status: StartupStatus::Starting,
567                phase: "boot".to_string(),
568                started_at_ms: now_ms(),
569                attempt_id,
570                last_error: None,
571            })),
572            in_process_mode: Arc::new(AtomicBool::new(in_process)),
573            api_token: Arc::new(RwLock::new(None)),
574            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
575            run_registry: RunRegistry::new(),
576            run_stale_ms: resolve_run_stale_ms(),
577            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
578            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
579            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
580            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
581            shared_resources_path: resolve_shared_resources_path(),
582            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
583            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
584            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
585            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
586            routines_path: resolve_routines_path(),
587            routine_history_path: resolve_routine_history_path(),
588            routine_runs_path: resolve_routine_runs_path(),
589            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
590            web_ui_enabled: Arc::new(AtomicBool::new(false)),
591            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
592            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
593            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
594            host_runtime_context: detect_host_runtime_context(),
595        }
596    }
597
598    pub fn is_ready(&self) -> bool {
599        self.runtime.get().is_some()
600    }
601
602    pub fn mode_label(&self) -> &'static str {
603        if self.in_process_mode.load(Ordering::Relaxed) {
604            "in-process"
605        } else {
606            "sidecar"
607        }
608    }
609
610    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
611        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
612        if let Ok(mut guard) = self.web_ui_prefix.write() {
613            *guard = normalize_web_ui_prefix(&prefix);
614        }
615    }
616
617    pub fn web_ui_enabled(&self) -> bool {
618        self.web_ui_enabled.load(Ordering::Relaxed)
619    }
620
621    pub fn web_ui_prefix(&self) -> String {
622        self.web_ui_prefix
623            .read()
624            .map(|v| v.clone())
625            .unwrap_or_else(|_| "/admin".to_string())
626    }
627
628    pub fn set_server_base_url(&self, base_url: String) {
629        if let Ok(mut guard) = self.server_base_url.write() {
630            *guard = base_url;
631        }
632    }
633
634    pub fn server_base_url(&self) -> String {
635        self.server_base_url
636            .read()
637            .map(|v| v.clone())
638            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
639    }
640
641    pub async fn api_token(&self) -> Option<String> {
642        self.api_token.read().await.clone()
643    }
644
645    pub async fn set_api_token(&self, token: Option<String>) {
646        *self.api_token.write().await = token;
647    }
648
649    pub async fn startup_snapshot(&self) -> StartupSnapshot {
650        let state = self.startup.read().await.clone();
651        StartupSnapshot {
652            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
653            status: state.status,
654            phase: state.phase,
655            started_at_ms: state.started_at_ms,
656            attempt_id: state.attempt_id,
657            last_error: state.last_error,
658        }
659    }
660
661    pub fn host_runtime_context(&self) -> HostRuntimeContext {
662        self.runtime
663            .get()
664            .map(|runtime| runtime.host_runtime_context.clone())
665            .unwrap_or_else(|| self.host_runtime_context.clone())
666    }
667
668    pub async fn set_phase(&self, phase: impl Into<String>) {
669        let mut startup = self.startup.write().await;
670        startup.phase = phase.into();
671    }
672
673    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
674        self.runtime
675            .set(runtime)
676            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
677        self.engine_loop
678            .set_spawn_agent_hook(std::sync::Arc::new(
679                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
680            ))
681            .await;
682        self.engine_loop
683            .set_tool_policy_hook(std::sync::Arc::new(
684                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
685            ))
686            .await;
687        let _ = self.load_shared_resources().await;
688        let _ = self.load_routines().await;
689        let _ = self.load_routine_history().await;
690        let _ = self.load_routine_runs().await;
691        let workspace_root = self.workspace_index.snapshot().await.root;
692        let _ = self
693            .agent_teams
694            .ensure_loaded_for_workspace(&workspace_root)
695            .await;
696        let mut startup = self.startup.write().await;
697        startup.status = StartupStatus::Ready;
698        startup.phase = "ready".to_string();
699        startup.last_error = None;
700        Ok(())
701    }
702
703    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
704        let mut startup = self.startup.write().await;
705        startup.status = StartupStatus::Failed;
706        startup.phase = phase.into();
707        startup.last_error = Some(error.into());
708    }
709
710    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
711        let runtime = self.channels_runtime.lock().await;
712        runtime.statuses.clone()
713    }
714
715    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
716        let effective = self.config.get_effective_value().await;
717        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
718        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
719
720        let mut runtime = self.channels_runtime.lock().await;
721        if let Some(listeners) = runtime.listeners.as_mut() {
722            listeners.abort_all();
723        }
724        runtime.listeners = None;
725        runtime.statuses.clear();
726
727        let mut status_map = std::collections::HashMap::new();
728        status_map.insert(
729            "telegram".to_string(),
730            ChannelStatus {
731                enabled: parsed.channels.telegram.is_some(),
732                connected: false,
733                last_error: None,
734                active_sessions: 0,
735                meta: serde_json::json!({}),
736            },
737        );
738        status_map.insert(
739            "discord".to_string(),
740            ChannelStatus {
741                enabled: parsed.channels.discord.is_some(),
742                connected: false,
743                last_error: None,
744                active_sessions: 0,
745                meta: serde_json::json!({}),
746            },
747        );
748        status_map.insert(
749            "slack".to_string(),
750            ChannelStatus {
751                enabled: parsed.channels.slack.is_some(),
752                connected: false,
753                last_error: None,
754                active_sessions: 0,
755                meta: serde_json::json!({}),
756            },
757        );
758
759        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
760            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
761            runtime.listeners = Some(listeners);
762            for status in status_map.values_mut() {
763                if status.enabled {
764                    status.connected = true;
765                }
766            }
767        }
768
769        runtime.statuses = status_map.clone();
770        drop(runtime);
771
772        self.event_bus.publish(EngineEvent::new(
773            "channel.status.changed",
774            serde_json::json!({ "channels": status_map }),
775        ));
776        Ok(())
777    }
778
779    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
780        if !self.shared_resources_path.exists() {
781            return Ok(());
782        }
783        let raw = fs::read_to_string(&self.shared_resources_path).await?;
784        let parsed =
785            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
786                .unwrap_or_default();
787        let mut guard = self.shared_resources.write().await;
788        *guard = parsed;
789        Ok(())
790    }
791
792    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
793        if let Some(parent) = self.shared_resources_path.parent() {
794            fs::create_dir_all(parent).await?;
795        }
796        let payload = {
797            let guard = self.shared_resources.read().await;
798            serde_json::to_string_pretty(&*guard)?
799        };
800        fs::write(&self.shared_resources_path, payload).await?;
801        Ok(())
802    }
803
804    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
805        self.shared_resources.read().await.get(key).cloned()
806    }
807
808    pub async fn list_shared_resources(
809        &self,
810        prefix: Option<&str>,
811        limit: usize,
812    ) -> Vec<SharedResourceRecord> {
813        let limit = limit.clamp(1, 500);
814        let mut rows = self
815            .shared_resources
816            .read()
817            .await
818            .values()
819            .filter(|record| {
820                if let Some(prefix) = prefix {
821                    record.key.starts_with(prefix)
822                } else {
823                    true
824                }
825            })
826            .cloned()
827            .collect::<Vec<_>>();
828        rows.sort_by(|a, b| a.key.cmp(&b.key));
829        rows.truncate(limit);
830        rows
831    }
832
833    pub async fn put_shared_resource(
834        &self,
835        key: String,
836        value: Value,
837        if_match_rev: Option<u64>,
838        updated_by: String,
839        ttl_ms: Option<u64>,
840    ) -> Result<SharedResourceRecord, ResourceStoreError> {
841        if !is_valid_resource_key(&key) {
842            return Err(ResourceStoreError::InvalidKey { key });
843        }
844
845        let now = now_ms();
846        let mut guard = self.shared_resources.write().await;
847        let existing = guard.get(&key).cloned();
848
849        if let Some(expected) = if_match_rev {
850            let current = existing.as_ref().map(|row| row.rev);
851            if current != Some(expected) {
852                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
853                    key,
854                    expected_rev: Some(expected),
855                    current_rev: current,
856                }));
857            }
858        }
859
860        let next_rev = existing
861            .as_ref()
862            .map(|row| row.rev.saturating_add(1))
863            .unwrap_or(1);
864
865        let record = SharedResourceRecord {
866            key: key.clone(),
867            value,
868            rev: next_rev,
869            updated_at_ms: now,
870            updated_by,
871            ttl_ms,
872        };
873
874        let previous = guard.insert(key.clone(), record.clone());
875        drop(guard);
876
877        if let Err(error) = self.persist_shared_resources().await {
878            let mut rollback = self.shared_resources.write().await;
879            if let Some(previous) = previous {
880                rollback.insert(key, previous);
881            } else {
882                rollback.remove(&key);
883            }
884            return Err(ResourceStoreError::PersistFailed {
885                message: error.to_string(),
886            });
887        }
888
889        Ok(record)
890    }
891
892    pub async fn delete_shared_resource(
893        &self,
894        key: &str,
895        if_match_rev: Option<u64>,
896    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
897        if !is_valid_resource_key(key) {
898            return Err(ResourceStoreError::InvalidKey {
899                key: key.to_string(),
900            });
901        }
902
903        let mut guard = self.shared_resources.write().await;
904        let current = guard.get(key).cloned();
905        if let Some(expected) = if_match_rev {
906            let current_rev = current.as_ref().map(|row| row.rev);
907            if current_rev != Some(expected) {
908                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
909                    key: key.to_string(),
910                    expected_rev: Some(expected),
911                    current_rev,
912                }));
913            }
914        }
915
916        let removed = guard.remove(key);
917        drop(guard);
918
919        if let Err(error) = self.persist_shared_resources().await {
920            if let Some(record) = removed.clone() {
921                self.shared_resources
922                    .write()
923                    .await
924                    .insert(record.key.clone(), record);
925            }
926            return Err(ResourceStoreError::PersistFailed {
927                message: error.to_string(),
928            });
929        }
930
931        Ok(removed)
932    }
933
934    pub async fn load_routines(&self) -> anyhow::Result<()> {
935        if !self.routines_path.exists() {
936            return Ok(());
937        }
938        let raw = fs::read_to_string(&self.routines_path).await?;
939        let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
940            .unwrap_or_default();
941        let mut guard = self.routines.write().await;
942        *guard = parsed;
943        Ok(())
944    }
945
946    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
947        if !self.routine_history_path.exists() {
948            return Ok(());
949        }
950        let raw = fs::read_to_string(&self.routine_history_path).await?;
951        let parsed = serde_json::from_str::<
952            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
953        >(&raw)
954        .unwrap_or_default();
955        let mut guard = self.routine_history.write().await;
956        *guard = parsed;
957        Ok(())
958    }
959
960    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
961        if !self.routine_runs_path.exists() {
962            return Ok(());
963        }
964        let raw = fs::read_to_string(&self.routine_runs_path).await?;
965        let parsed =
966            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
967                .unwrap_or_default();
968        let mut guard = self.routine_runs.write().await;
969        *guard = parsed;
970        Ok(())
971    }
972
973    pub async fn persist_routines(&self) -> anyhow::Result<()> {
974        if let Some(parent) = self.routines_path.parent() {
975            fs::create_dir_all(parent).await?;
976        }
977        let payload = {
978            let guard = self.routines.read().await;
979            serde_json::to_string_pretty(&*guard)?
980        };
981        fs::write(&self.routines_path, payload).await?;
982        Ok(())
983    }
984
985    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
986        if let Some(parent) = self.routine_history_path.parent() {
987            fs::create_dir_all(parent).await?;
988        }
989        let payload = {
990            let guard = self.routine_history.read().await;
991            serde_json::to_string_pretty(&*guard)?
992        };
993        fs::write(&self.routine_history_path, payload).await?;
994        Ok(())
995    }
996
997    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
998        if let Some(parent) = self.routine_runs_path.parent() {
999            fs::create_dir_all(parent).await?;
1000        }
1001        let payload = {
1002            let guard = self.routine_runs.read().await;
1003            serde_json::to_string_pretty(&*guard)?
1004        };
1005        fs::write(&self.routine_runs_path, payload).await?;
1006        Ok(())
1007    }
1008
1009    pub async fn put_routine(
1010        &self,
1011        mut routine: RoutineSpec,
1012    ) -> Result<RoutineSpec, RoutineStoreError> {
1013        if routine.routine_id.trim().is_empty() {
1014            return Err(RoutineStoreError::InvalidRoutineId {
1015                routine_id: routine.routine_id,
1016            });
1017        }
1018
1019        routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1020        routine.output_targets = normalize_non_empty_list(routine.output_targets);
1021
1022        let interval = match routine.schedule {
1023            RoutineSchedule::IntervalSeconds { seconds } => {
1024                if seconds == 0 {
1025                    return Err(RoutineStoreError::InvalidSchedule {
1026                        detail: "interval_seconds must be > 0".to_string(),
1027                    });
1028                }
1029                Some(seconds)
1030            }
1031            RoutineSchedule::Cron { .. } => None,
1032        };
1033        if routine.next_fire_at_ms.is_none() {
1034            routine.next_fire_at_ms = Some(now_ms().saturating_add(interval.unwrap_or(60) * 1000));
1035        }
1036
1037        let mut guard = self.routines.write().await;
1038        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1039        drop(guard);
1040
1041        if let Err(error) = self.persist_routines().await {
1042            let mut rollback = self.routines.write().await;
1043            if let Some(previous) = previous {
1044                rollback.insert(previous.routine_id.clone(), previous);
1045            } else {
1046                rollback.remove(&routine.routine_id);
1047            }
1048            return Err(RoutineStoreError::PersistFailed {
1049                message: error.to_string(),
1050            });
1051        }
1052
1053        Ok(routine)
1054    }
1055
1056    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1057        let mut rows = self
1058            .routines
1059            .read()
1060            .await
1061            .values()
1062            .cloned()
1063            .collect::<Vec<_>>();
1064        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1065        rows
1066    }
1067
1068    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1069        self.routines.read().await.get(routine_id).cloned()
1070    }
1071
1072    pub async fn delete_routine(
1073        &self,
1074        routine_id: &str,
1075    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1076        let mut guard = self.routines.write().await;
1077        let removed = guard.remove(routine_id);
1078        drop(guard);
1079
1080        if let Err(error) = self.persist_routines().await {
1081            if let Some(removed) = removed.clone() {
1082                self.routines
1083                    .write()
1084                    .await
1085                    .insert(removed.routine_id.clone(), removed);
1086            }
1087            return Err(RoutineStoreError::PersistFailed {
1088                message: error.to_string(),
1089            });
1090        }
1091        Ok(removed)
1092    }
1093
1094    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1095        let mut plans = Vec::new();
1096        let mut guard = self.routines.write().await;
1097        for routine in guard.values_mut() {
1098            if routine.status != RoutineStatus::Active {
1099                continue;
1100            }
1101            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1102                continue;
1103            };
1104            let Some(interval_ms) = routine_interval_ms(&routine.schedule) else {
1105                continue;
1106            };
1107            if now_ms < next_fire_at_ms {
1108                continue;
1109            }
1110            let (run_count, next_fire_at_ms) = compute_misfire_plan(
1111                now_ms,
1112                next_fire_at_ms,
1113                interval_ms,
1114                &routine.misfire_policy,
1115            );
1116            routine.next_fire_at_ms = Some(next_fire_at_ms);
1117            if run_count == 0 {
1118                continue;
1119            }
1120            plans.push(RoutineTriggerPlan {
1121                routine_id: routine.routine_id.clone(),
1122                run_count,
1123                scheduled_at_ms: now_ms,
1124                next_fire_at_ms,
1125            });
1126        }
1127        drop(guard);
1128        let _ = self.persist_routines().await;
1129        plans
1130    }
1131
1132    pub async fn mark_routine_fired(
1133        &self,
1134        routine_id: &str,
1135        fired_at_ms: u64,
1136    ) -> Option<RoutineSpec> {
1137        let mut guard = self.routines.write().await;
1138        let routine = guard.get_mut(routine_id)?;
1139        routine.last_fired_at_ms = Some(fired_at_ms);
1140        let updated = routine.clone();
1141        drop(guard);
1142        let _ = self.persist_routines().await;
1143        Some(updated)
1144    }
1145
1146    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1147        let mut history = self.routine_history.write().await;
1148        history
1149            .entry(event.routine_id.clone())
1150            .or_default()
1151            .push(event);
1152        drop(history);
1153        let _ = self.persist_routine_history().await;
1154    }
1155
1156    pub async fn list_routine_history(
1157        &self,
1158        routine_id: &str,
1159        limit: usize,
1160    ) -> Vec<RoutineHistoryEvent> {
1161        let limit = limit.clamp(1, 500);
1162        let mut rows = self
1163            .routine_history
1164            .read()
1165            .await
1166            .get(routine_id)
1167            .cloned()
1168            .unwrap_or_default();
1169        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1170        rows.truncate(limit);
1171        rows
1172    }
1173
1174    pub async fn create_routine_run(
1175        &self,
1176        routine: &RoutineSpec,
1177        trigger_type: &str,
1178        run_count: u32,
1179        status: RoutineRunStatus,
1180        detail: Option<String>,
1181    ) -> RoutineRunRecord {
1182        let now = now_ms();
1183        let record = RoutineRunRecord {
1184            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1185            routine_id: routine.routine_id.clone(),
1186            trigger_type: trigger_type.to_string(),
1187            run_count,
1188            status,
1189            created_at_ms: now,
1190            updated_at_ms: now,
1191            fired_at_ms: Some(now),
1192            started_at_ms: None,
1193            finished_at_ms: None,
1194            requires_approval: routine.requires_approval,
1195            approval_reason: None,
1196            denial_reason: None,
1197            paused_reason: None,
1198            detail,
1199            entrypoint: routine.entrypoint.clone(),
1200            args: routine.args.clone(),
1201            allowed_tools: routine.allowed_tools.clone(),
1202            output_targets: routine.output_targets.clone(),
1203            artifacts: Vec::new(),
1204        };
1205        self.routine_runs
1206            .write()
1207            .await
1208            .insert(record.run_id.clone(), record.clone());
1209        let _ = self.persist_routine_runs().await;
1210        record
1211    }
1212
1213    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1214        self.routine_runs.read().await.get(run_id).cloned()
1215    }
1216
1217    pub async fn list_routine_runs(
1218        &self,
1219        routine_id: Option<&str>,
1220        limit: usize,
1221    ) -> Vec<RoutineRunRecord> {
1222        let mut rows = self
1223            .routine_runs
1224            .read()
1225            .await
1226            .values()
1227            .filter(|row| {
1228                if let Some(id) = routine_id {
1229                    row.routine_id == id
1230                } else {
1231                    true
1232                }
1233            })
1234            .cloned()
1235            .collect::<Vec<_>>();
1236        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1237        rows.truncate(limit.clamp(1, 500));
1238        rows
1239    }
1240
1241    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1242        let mut guard = self.routine_runs.write().await;
1243        let next_run_id = guard
1244            .values()
1245            .filter(|row| row.status == RoutineRunStatus::Queued)
1246            .min_by(|a, b| {
1247                a.created_at_ms
1248                    .cmp(&b.created_at_ms)
1249                    .then_with(|| a.run_id.cmp(&b.run_id))
1250            })
1251            .map(|row| row.run_id.clone())?;
1252        let now = now_ms();
1253        let row = guard.get_mut(&next_run_id)?;
1254        row.status = RoutineRunStatus::Running;
1255        row.updated_at_ms = now;
1256        row.started_at_ms = Some(now);
1257        let claimed = row.clone();
1258        drop(guard);
1259        let _ = self.persist_routine_runs().await;
1260        Some(claimed)
1261    }
1262
1263    pub async fn set_routine_session_policy(
1264        &self,
1265        session_id: String,
1266        run_id: String,
1267        routine_id: String,
1268        allowed_tools: Vec<String>,
1269    ) {
1270        let policy = RoutineSessionPolicy {
1271            session_id: session_id.clone(),
1272            run_id,
1273            routine_id,
1274            allowed_tools: normalize_allowed_tools(allowed_tools),
1275        };
1276        self.routine_session_policies
1277            .write()
1278            .await
1279            .insert(session_id, policy);
1280    }
1281
1282    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1283        self.routine_session_policies
1284            .read()
1285            .await
1286            .get(session_id)
1287            .cloned()
1288    }
1289
1290    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1291        self.routine_session_policies
1292            .write()
1293            .await
1294            .remove(session_id);
1295    }
1296
1297    pub async fn update_routine_run_status(
1298        &self,
1299        run_id: &str,
1300        status: RoutineRunStatus,
1301        reason: Option<String>,
1302    ) -> Option<RoutineRunRecord> {
1303        let mut guard = self.routine_runs.write().await;
1304        let row = guard.get_mut(run_id)?;
1305        row.status = status.clone();
1306        row.updated_at_ms = now_ms();
1307        match status {
1308            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1309            RoutineRunStatus::Running => {
1310                row.started_at_ms.get_or_insert_with(now_ms);
1311                if let Some(detail) = reason {
1312                    row.detail = Some(detail);
1313                }
1314            }
1315            RoutineRunStatus::Denied => row.denial_reason = reason,
1316            RoutineRunStatus::Paused => row.paused_reason = reason,
1317            RoutineRunStatus::Completed
1318            | RoutineRunStatus::Failed
1319            | RoutineRunStatus::Cancelled => {
1320                row.finished_at_ms = Some(now_ms());
1321                if let Some(detail) = reason {
1322                    row.detail = Some(detail);
1323                }
1324            }
1325            _ => {
1326                if let Some(detail) = reason {
1327                    row.detail = Some(detail);
1328                }
1329            }
1330        }
1331        let updated = row.clone();
1332        drop(guard);
1333        let _ = self.persist_routine_runs().await;
1334        Some(updated)
1335    }
1336
1337    pub async fn append_routine_run_artifact(
1338        &self,
1339        run_id: &str,
1340        artifact: RoutineRunArtifact,
1341    ) -> Option<RoutineRunRecord> {
1342        let mut guard = self.routine_runs.write().await;
1343        let row = guard.get_mut(run_id)?;
1344        row.updated_at_ms = now_ms();
1345        row.artifacts.push(artifact);
1346        let updated = row.clone();
1347        drop(guard);
1348        let _ = self.persist_routine_runs().await;
1349        Some(updated)
1350    }
1351}
1352
1353async fn build_channels_config(
1354    state: &AppState,
1355    channels: &ChannelsConfigFile,
1356) -> Option<ChannelsConfig> {
1357    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1358        return None;
1359    }
1360    Some(ChannelsConfig {
1361        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1362            bot_token: cfg.bot_token,
1363            allowed_users: cfg.allowed_users,
1364            mention_only: cfg.mention_only,
1365        }),
1366        discord: channels.discord.clone().map(|cfg| DiscordConfig {
1367            bot_token: cfg.bot_token,
1368            guild_id: cfg.guild_id,
1369            allowed_users: cfg.allowed_users,
1370            mention_only: cfg.mention_only,
1371        }),
1372        slack: channels.slack.clone().map(|cfg| SlackConfig {
1373            bot_token: cfg.bot_token,
1374            channel_id: cfg.channel_id,
1375            allowed_users: cfg.allowed_users,
1376        }),
1377        server_base_url: state.server_base_url(),
1378        api_token: state.api_token().await.unwrap_or_default(),
1379        tool_policy: channels.tool_policy.clone(),
1380    })
1381}
1382
1383fn normalize_web_ui_prefix(prefix: &str) -> String {
1384    let trimmed = prefix.trim();
1385    if trimmed.is_empty() || trimmed == "/" {
1386        return "/admin".to_string();
1387    }
1388    let with_leading = if trimmed.starts_with('/') {
1389        trimmed.to_string()
1390    } else {
1391        format!("/{trimmed}")
1392    };
1393    with_leading.trim_end_matches('/').to_string()
1394}
1395
1396fn default_web_ui_prefix() -> String {
1397    "/admin".to_string()
1398}
1399
1400fn default_allow_all() -> Vec<String> {
1401    vec!["*".to_string()]
1402}
1403
1404fn default_discord_mention_only() -> bool {
1405    true
1406}
1407
1408fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
1409    normalize_non_empty_list(raw)
1410}
1411
1412fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1413    let mut out = Vec::new();
1414    let mut seen = std::collections::HashSet::new();
1415    for item in raw {
1416        let normalized = item.trim().to_string();
1417        if normalized.is_empty() {
1418            continue;
1419        }
1420        if seen.insert(normalized.clone()) {
1421            out.push(normalized);
1422        }
1423    }
1424    out
1425}
1426
1427fn resolve_run_stale_ms() -> u64 {
1428    std::env::var("TANDEM_RUN_STALE_MS")
1429        .ok()
1430        .and_then(|v| v.trim().parse::<u64>().ok())
1431        .unwrap_or(120_000)
1432        .clamp(30_000, 600_000)
1433}
1434
1435fn resolve_shared_resources_path() -> PathBuf {
1436    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1437        let trimmed = dir.trim();
1438        if !trimmed.is_empty() {
1439            return PathBuf::from(trimmed).join("shared_resources.json");
1440        }
1441    }
1442    PathBuf::from(".tandem").join("shared_resources.json")
1443}
1444
1445fn resolve_routines_path() -> PathBuf {
1446    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1447        let trimmed = dir.trim();
1448        if !trimmed.is_empty() {
1449            return PathBuf::from(trimmed).join("routines.json");
1450        }
1451    }
1452    PathBuf::from(".tandem").join("routines.json")
1453}
1454
1455fn resolve_routine_history_path() -> PathBuf {
1456    if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
1457        let trimmed = root.trim();
1458        if !trimmed.is_empty() {
1459            return PathBuf::from(trimmed).join("routine_history.json");
1460        }
1461    }
1462    PathBuf::from(".tandem").join("routine_history.json")
1463}
1464
1465fn resolve_routine_runs_path() -> PathBuf {
1466    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
1467        let trimmed = root.trim();
1468        if !trimmed.is_empty() {
1469            return PathBuf::from(trimmed).join("routine_runs.json");
1470        }
1471    }
1472    PathBuf::from(".tandem").join("routine_runs.json")
1473}
1474
1475fn resolve_agent_team_audit_path() -> PathBuf {
1476    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
1477        let trimmed = base.trim();
1478        if !trimmed.is_empty() {
1479            return PathBuf::from(trimmed)
1480                .join("agent-team")
1481                .join("audit.log.jsonl");
1482        }
1483    }
1484    PathBuf::from(".tandem")
1485        .join("agent-team")
1486        .join("audit.log.jsonl")
1487}
1488
1489fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
1490    match schedule {
1491        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
1492        RoutineSchedule::Cron { .. } => None,
1493    }
1494}
1495
1496fn compute_misfire_plan(
1497    now_ms: u64,
1498    next_fire_at_ms: u64,
1499    interval_ms: u64,
1500    policy: &RoutineMisfirePolicy,
1501) -> (u32, u64) {
1502    if now_ms < next_fire_at_ms || interval_ms == 0 {
1503        return (0, next_fire_at_ms);
1504    }
1505    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
1506    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
1507    match policy {
1508        RoutineMisfirePolicy::Skip => (0, aligned_next),
1509        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
1510        RoutineMisfirePolicy::CatchUp { max_runs } => {
1511            let count = missed.min(u64::from(*max_runs)) as u32;
1512            (count, aligned_next)
1513        }
1514    }
1515}
1516
1517#[derive(Debug, Clone, PartialEq, Eq)]
1518pub enum RoutineExecutionDecision {
1519    Allowed,
1520    RequiresApproval { reason: String },
1521    Blocked { reason: String },
1522}
1523
1524pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
1525    let entrypoint = routine.entrypoint.to_ascii_lowercase();
1526    if entrypoint.starts_with("connector.")
1527        || entrypoint.starts_with("integration.")
1528        || entrypoint.contains("external")
1529    {
1530        return true;
1531    }
1532    routine
1533        .args
1534        .get("uses_external_integrations")
1535        .and_then(|v| v.as_bool())
1536        .unwrap_or(false)
1537        || routine
1538            .args
1539            .get("connector_id")
1540            .and_then(|v| v.as_str())
1541            .is_some()
1542}
1543
1544pub fn evaluate_routine_execution_policy(
1545    routine: &RoutineSpec,
1546    trigger_type: &str,
1547) -> RoutineExecutionDecision {
1548    if !routine_uses_external_integrations(routine) {
1549        return RoutineExecutionDecision::Allowed;
1550    }
1551    if !routine.external_integrations_allowed {
1552        return RoutineExecutionDecision::Blocked {
1553            reason: "external integrations are disabled by policy".to_string(),
1554        };
1555    }
1556    if routine.requires_approval {
1557        return RoutineExecutionDecision::RequiresApproval {
1558            reason: format!(
1559                "manual approval required before external side effects ({})",
1560                trigger_type
1561            ),
1562        };
1563    }
1564    RoutineExecutionDecision::Allowed
1565}
1566
1567fn is_valid_resource_key(key: &str) -> bool {
1568    let trimmed = key.trim();
1569    if trimmed.is_empty() {
1570        return false;
1571    }
1572    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
1573    if !allowed_prefix
1574        .iter()
1575        .any(|prefix| trimmed.starts_with(prefix))
1576    {
1577        return false;
1578    }
1579    !trimmed.contains("//")
1580}
1581
1582impl Deref for AppState {
1583    type Target = RuntimeState;
1584
1585    fn deref(&self) -> &Self::Target {
1586        self.runtime
1587            .get()
1588            .expect("runtime accessed before startup completion")
1589    }
1590}
1591
1592fn extract_event_session_id(properties: &Value) -> Option<String> {
1593    properties
1594        .get("sessionID")
1595        .or_else(|| properties.get("sessionId"))
1596        .or_else(|| properties.get("id"))
1597        .and_then(|v| v.as_str())
1598        .map(|s| s.to_string())
1599}
1600
1601fn extract_event_run_id(properties: &Value) -> Option<String> {
1602    properties
1603        .get("runID")
1604        .or_else(|| properties.get("run_id"))
1605        .and_then(|v| v.as_str())
1606        .map(|s| s.to_string())
1607}
1608
1609fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1610    let session_id = extract_event_session_id(&event.properties)?;
1611    let run_id = extract_event_run_id(&event.properties);
1612    let key = format!("run/{session_id}/status");
1613
1614    let mut base = serde_json::Map::new();
1615    base.insert("sessionID".to_string(), Value::String(session_id));
1616    if let Some(run_id) = run_id {
1617        base.insert("runID".to_string(), Value::String(run_id));
1618    }
1619
1620    match event.event_type.as_str() {
1621        "session.run.started" => {
1622            base.insert("state".to_string(), Value::String("running".to_string()));
1623            base.insert("phase".to_string(), Value::String("run".to_string()));
1624            base.insert(
1625                "eventType".to_string(),
1626                Value::String("session.run.started".to_string()),
1627            );
1628            Some(StatusIndexUpdate {
1629                key,
1630                value: Value::Object(base),
1631            })
1632        }
1633        "session.run.finished" => {
1634            base.insert("state".to_string(), Value::String("finished".to_string()));
1635            base.insert("phase".to_string(), Value::String("run".to_string()));
1636            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1637                base.insert("result".to_string(), Value::String(status.to_string()));
1638            }
1639            base.insert(
1640                "eventType".to_string(),
1641                Value::String("session.run.finished".to_string()),
1642            );
1643            Some(StatusIndexUpdate {
1644                key,
1645                value: Value::Object(base),
1646            })
1647        }
1648        "message.part.updated" => {
1649            let part_type = event
1650                .properties
1651                .get("part")
1652                .and_then(|v| v.get("type"))
1653                .and_then(|v| v.as_str())?;
1654            let (phase, tool_active) = match part_type {
1655                "tool-invocation" => ("tool", true),
1656                "tool-result" => ("run", false),
1657                _ => return None,
1658            };
1659            base.insert("state".to_string(), Value::String("running".to_string()));
1660            base.insert("phase".to_string(), Value::String(phase.to_string()));
1661            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1662            if let Some(tool_name) = event
1663                .properties
1664                .get("part")
1665                .and_then(|v| v.get("tool"))
1666                .and_then(|v| v.as_str())
1667            {
1668                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1669            }
1670            base.insert(
1671                "eventType".to_string(),
1672                Value::String("message.part.updated".to_string()),
1673            );
1674            Some(StatusIndexUpdate {
1675                key,
1676                value: Value::Object(base),
1677            })
1678        }
1679        _ => None,
1680    }
1681}
1682
1683pub async fn run_status_indexer(state: AppState) {
1684    let mut rx = state.event_bus.subscribe();
1685    loop {
1686        match rx.recv().await {
1687            Ok(event) => {
1688                if let Some(update) = derive_status_index_update(&event) {
1689                    if let Err(error) = state
1690                        .put_shared_resource(
1691                            update.key,
1692                            update.value,
1693                            None,
1694                            "system.status_indexer".to_string(),
1695                            None,
1696                        )
1697                        .await
1698                    {
1699                        tracing::warn!("status indexer failed to persist update: {error:?}");
1700                    }
1701                }
1702            }
1703            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1704            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1705        }
1706    }
1707}
1708
1709pub async fn run_agent_team_supervisor(state: AppState) {
1710    let mut rx = state.event_bus.subscribe();
1711    loop {
1712        match rx.recv().await {
1713            Ok(event) => {
1714                state.agent_teams.handle_engine_event(&state, &event).await;
1715            }
1716            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1717            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1718        }
1719    }
1720}
1721
1722pub async fn run_routine_scheduler(state: AppState) {
1723    loop {
1724        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1725        let now = now_ms();
1726        let plans = state.evaluate_routine_misfires(now).await;
1727        for plan in plans {
1728            let Some(routine) = state.get_routine(&plan.routine_id).await else {
1729                continue;
1730            };
1731            match evaluate_routine_execution_policy(&routine, "scheduled") {
1732                RoutineExecutionDecision::Allowed => {
1733                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
1734                    let run = state
1735                        .create_routine_run(
1736                            &routine,
1737                            "scheduled",
1738                            plan.run_count,
1739                            RoutineRunStatus::Queued,
1740                            None,
1741                        )
1742                        .await;
1743                    state
1744                        .append_routine_history(RoutineHistoryEvent {
1745                            routine_id: plan.routine_id.clone(),
1746                            trigger_type: "scheduled".to_string(),
1747                            run_count: plan.run_count,
1748                            fired_at_ms: now,
1749                            status: "queued".to_string(),
1750                            detail: None,
1751                        })
1752                        .await;
1753                    state.event_bus.publish(EngineEvent::new(
1754                        "routine.fired",
1755                        serde_json::json!({
1756                            "routineID": plan.routine_id,
1757                            "runID": run.run_id,
1758                            "runCount": plan.run_count,
1759                            "scheduledAtMs": plan.scheduled_at_ms,
1760                            "nextFireAtMs": plan.next_fire_at_ms,
1761                        }),
1762                    ));
1763                    state.event_bus.publish(EngineEvent::new(
1764                        "routine.run.created",
1765                        serde_json::json!({
1766                            "run": run,
1767                        }),
1768                    ));
1769                }
1770                RoutineExecutionDecision::RequiresApproval { reason } => {
1771                    let run = state
1772                        .create_routine_run(
1773                            &routine,
1774                            "scheduled",
1775                            plan.run_count,
1776                            RoutineRunStatus::PendingApproval,
1777                            Some(reason.clone()),
1778                        )
1779                        .await;
1780                    state
1781                        .append_routine_history(RoutineHistoryEvent {
1782                            routine_id: plan.routine_id.clone(),
1783                            trigger_type: "scheduled".to_string(),
1784                            run_count: plan.run_count,
1785                            fired_at_ms: now,
1786                            status: "pending_approval".to_string(),
1787                            detail: Some(reason.clone()),
1788                        })
1789                        .await;
1790                    state.event_bus.publish(EngineEvent::new(
1791                        "routine.approval_required",
1792                        serde_json::json!({
1793                            "routineID": plan.routine_id,
1794                            "runID": run.run_id,
1795                            "runCount": plan.run_count,
1796                            "triggerType": "scheduled",
1797                            "reason": reason,
1798                        }),
1799                    ));
1800                    state.event_bus.publish(EngineEvent::new(
1801                        "routine.run.created",
1802                        serde_json::json!({
1803                            "run": run,
1804                        }),
1805                    ));
1806                }
1807                RoutineExecutionDecision::Blocked { reason } => {
1808                    let run = state
1809                        .create_routine_run(
1810                            &routine,
1811                            "scheduled",
1812                            plan.run_count,
1813                            RoutineRunStatus::BlockedPolicy,
1814                            Some(reason.clone()),
1815                        )
1816                        .await;
1817                    state
1818                        .append_routine_history(RoutineHistoryEvent {
1819                            routine_id: plan.routine_id.clone(),
1820                            trigger_type: "scheduled".to_string(),
1821                            run_count: plan.run_count,
1822                            fired_at_ms: now,
1823                            status: "blocked_policy".to_string(),
1824                            detail: Some(reason.clone()),
1825                        })
1826                        .await;
1827                    state.event_bus.publish(EngineEvent::new(
1828                        "routine.blocked",
1829                        serde_json::json!({
1830                            "routineID": plan.routine_id,
1831                            "runID": run.run_id,
1832                            "runCount": plan.run_count,
1833                            "triggerType": "scheduled",
1834                            "reason": reason,
1835                        }),
1836                    ));
1837                    state.event_bus.publish(EngineEvent::new(
1838                        "routine.run.created",
1839                        serde_json::json!({
1840                            "run": run,
1841                        }),
1842                    ));
1843                }
1844            }
1845        }
1846    }
1847}
1848
1849pub async fn run_routine_executor(state: AppState) {
1850    loop {
1851        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1852        let Some(run) = state.claim_next_queued_routine_run().await else {
1853            continue;
1854        };
1855
1856        state.event_bus.publish(EngineEvent::new(
1857            "routine.run.started",
1858            serde_json::json!({
1859                "runID": run.run_id,
1860                "routineID": run.routine_id,
1861                "triggerType": run.trigger_type,
1862                "startedAtMs": now_ms(),
1863            }),
1864        ));
1865
1866        let workspace_root = state.workspace_index.snapshot().await.root;
1867        let mut session = Session::new(
1868            Some(format!("Routine {}", run.routine_id)),
1869            Some(workspace_root.clone()),
1870        );
1871        let session_id = session.id.clone();
1872        session.workspace_root = Some(workspace_root);
1873
1874        if let Err(error) = state.storage.save_session(session).await {
1875            let detail = format!("failed to create routine session: {error}");
1876            let _ = state
1877                .update_routine_run_status(
1878                    &run.run_id,
1879                    RoutineRunStatus::Failed,
1880                    Some(detail.clone()),
1881                )
1882                .await;
1883            state.event_bus.publish(EngineEvent::new(
1884                "routine.run.failed",
1885                serde_json::json!({
1886                    "runID": run.run_id,
1887                    "routineID": run.routine_id,
1888                    "reason": detail,
1889                }),
1890            ));
1891            continue;
1892        }
1893
1894        state
1895            .set_routine_session_policy(
1896                session_id.clone(),
1897                run.run_id.clone(),
1898                run.routine_id.clone(),
1899                run.allowed_tools.clone(),
1900            )
1901            .await;
1902        state
1903            .engine_loop
1904            .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
1905            .await;
1906
1907        let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
1908        if let Some(spec) = selected_model.as_ref() {
1909            state.event_bus.publish(EngineEvent::new(
1910                "routine.run.model_selected",
1911                serde_json::json!({
1912                    "runID": run.run_id,
1913                    "routineID": run.routine_id,
1914                    "providerID": spec.provider_id,
1915                    "modelID": spec.model_id,
1916                    "source": model_source,
1917                }),
1918            ));
1919        }
1920
1921        let request = SendMessageRequest {
1922            parts: vec![MessagePartInput::Text {
1923                text: build_routine_prompt(&state, &run).await,
1924            }],
1925            model: selected_model,
1926            agent: None,
1927        };
1928
1929        let run_result = state
1930            .engine_loop
1931            .run_prompt_async_with_context(
1932                session_id.clone(),
1933                request,
1934                Some(format!("routine:{}", run.run_id)),
1935            )
1936            .await;
1937
1938        state.clear_routine_session_policy(&session_id).await;
1939        state
1940            .engine_loop
1941            .clear_session_allowed_tools(&session_id)
1942            .await;
1943
1944        match run_result {
1945            Ok(()) => {
1946                append_configured_output_artifacts(&state, &run).await;
1947                let _ = state
1948                    .update_routine_run_status(
1949                        &run.run_id,
1950                        RoutineRunStatus::Completed,
1951                        Some("routine run completed".to_string()),
1952                    )
1953                    .await;
1954                state.event_bus.publish(EngineEvent::new(
1955                    "routine.run.completed",
1956                    serde_json::json!({
1957                        "runID": run.run_id,
1958                        "routineID": run.routine_id,
1959                        "sessionID": session_id,
1960                        "finishedAtMs": now_ms(),
1961                    }),
1962                ));
1963            }
1964            Err(error) => {
1965                let detail = truncate_text(&error.to_string(), 500);
1966                let _ = state
1967                    .update_routine_run_status(
1968                        &run.run_id,
1969                        RoutineRunStatus::Failed,
1970                        Some(detail.clone()),
1971                    )
1972                    .await;
1973                state.event_bus.publish(EngineEvent::new(
1974                    "routine.run.failed",
1975                    serde_json::json!({
1976                        "runID": run.run_id,
1977                        "routineID": run.routine_id,
1978                        "sessionID": session_id,
1979                        "reason": detail,
1980                        "finishedAtMs": now_ms(),
1981                    }),
1982                ));
1983            }
1984        }
1985    }
1986}
1987
1988async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1989    let normalized_entrypoint = run.entrypoint.trim();
1990    let known_tool = state
1991        .tools
1992        .list()
1993        .await
1994        .into_iter()
1995        .any(|schema| schema.name == normalized_entrypoint);
1996    if known_tool {
1997        let args = if run.args.is_object() {
1998            run.args.clone()
1999        } else {
2000            serde_json::json!({})
2001        };
2002        return format!("/tool {} {}", normalized_entrypoint, args);
2003    }
2004
2005    if let Some(objective) = routine_objective_from_args(run) {
2006        return build_routine_mission_prompt(run, &objective);
2007    }
2008
2009    format!(
2010        "Execute routine '{}' using entrypoint '{}' with args: {}",
2011        run.routine_id, run.entrypoint, run.args
2012    )
2013}
2014
2015fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
2016    run.args
2017        .get("prompt")
2018        .and_then(|v| v.as_str())
2019        .map(str::trim)
2020        .filter(|v| !v.is_empty())
2021        .map(ToString::to_string)
2022}
2023
2024fn routine_mode_from_args(args: &Value) -> &str {
2025    args.get("mode")
2026        .and_then(|v| v.as_str())
2027        .map(str::trim)
2028        .filter(|v| !v.is_empty())
2029        .unwrap_or("standalone")
2030}
2031
2032fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
2033    args.get("success_criteria")
2034        .and_then(|v| v.as_array())
2035        .map(|rows| {
2036            rows.iter()
2037                .filter_map(|row| row.as_str())
2038                .map(str::trim)
2039                .filter(|row| !row.is_empty())
2040                .map(ToString::to_string)
2041                .collect::<Vec<_>>()
2042        })
2043        .unwrap_or_default()
2044}
2045
2046fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
2047    let mode = routine_mode_from_args(&run.args);
2048    let success_criteria = routine_success_criteria_from_args(&run.args);
2049    let orchestrator_only_tool_calls = run
2050        .args
2051        .get("orchestrator_only_tool_calls")
2052        .and_then(|v| v.as_bool())
2053        .unwrap_or(false);
2054
2055    let mut lines = vec![
2056        format!("Automation ID: {}", run.routine_id),
2057        format!("Run ID: {}", run.run_id),
2058        format!("Mode: {}", mode),
2059        format!("Mission Objective: {}", objective),
2060    ];
2061
2062    if !success_criteria.is_empty() {
2063        lines.push("Success Criteria:".to_string());
2064        for criterion in success_criteria {
2065            lines.push(format!("- {}", criterion));
2066        }
2067    }
2068
2069    if run.allowed_tools.is_empty() {
2070        lines.push("Allowed Tools: all available by current policy".to_string());
2071    } else {
2072        lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
2073    }
2074
2075    if run.output_targets.is_empty() {
2076        lines.push("Output Targets: none configured".to_string());
2077    } else {
2078        lines.push("Output Targets:".to_string());
2079        for target in &run.output_targets {
2080            lines.push(format!("- {}", target));
2081        }
2082    }
2083
2084    if mode.eq_ignore_ascii_case("orchestrated") {
2085        lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
2086        lines
2087            .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
2088        if orchestrator_only_tool_calls {
2089            lines.push(
2090                "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
2091                    .to_string(),
2092            );
2093        }
2094    } else {
2095        lines.push("Execution Pattern: Standalone mission run".to_string());
2096    }
2097
2098    lines.push(
2099        "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
2100            .to_string(),
2101    );
2102
2103    lines.join("\n")
2104}
2105
2106fn truncate_text(input: &str, max_len: usize) -> String {
2107    if input.len() <= max_len {
2108        return input.to_string();
2109    }
2110    let mut out = input[..max_len].to_string();
2111    out.push_str("...<truncated>");
2112    out
2113}
2114
2115async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
2116    if run.output_targets.is_empty() {
2117        return;
2118    }
2119    for target in &run.output_targets {
2120        let artifact = RoutineRunArtifact {
2121            artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
2122            uri: target.clone(),
2123            kind: "output_target".to_string(),
2124            label: Some("configured output target".to_string()),
2125            created_at_ms: now_ms(),
2126            metadata: Some(serde_json::json!({
2127                "source": "routine.output_targets",
2128                "runID": run.run_id,
2129                "routineID": run.routine_id,
2130            })),
2131        };
2132        let _ = state
2133            .append_routine_run_artifact(&run.run_id, artifact.clone())
2134            .await;
2135        state.event_bus.publish(EngineEvent::new(
2136            "routine.run.artifact_added",
2137            serde_json::json!({
2138                "runID": run.run_id,
2139                "routineID": run.routine_id,
2140                "artifact": artifact,
2141            }),
2142        ));
2143    }
2144}
2145
2146fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
2147    let obj = value.as_object()?;
2148    let provider_id = obj.get("provider_id")?.as_str()?.trim();
2149    let model_id = obj.get("model_id")?.as_str()?.trim();
2150    if provider_id.is_empty() || model_id.is_empty() {
2151        return None;
2152    }
2153    Some(ModelSpec {
2154        provider_id: provider_id.to_string(),
2155        model_id: model_id.to_string(),
2156    })
2157}
2158
2159fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
2160    args.get("model_policy")
2161        .and_then(|v| v.get("role_models"))
2162        .and_then(|v| v.get(role))
2163        .and_then(parse_model_spec)
2164}
2165
2166fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
2167    args.get("model_policy")
2168        .and_then(|v| v.get("default_model"))
2169        .and_then(parse_model_spec)
2170}
2171
2172fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
2173    providers.iter().any(|provider| {
2174        provider.id == spec.provider_id
2175            && provider
2176                .models
2177                .iter()
2178                .any(|model| model.id == spec.model_id)
2179    })
2180}
2181
2182async fn resolve_routine_model_spec_for_run(
2183    state: &AppState,
2184    run: &RoutineRunRecord,
2185) -> (Option<ModelSpec>, String) {
2186    let providers = state.providers.list().await;
2187    let mode = routine_mode_from_args(&run.args);
2188    let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
2189
2190    if mode.eq_ignore_ascii_case("orchestrated") {
2191        if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
2192            requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
2193        }
2194    }
2195    if let Some(default_model) = default_model_spec_from_args(&run.args) {
2196        requested.push((default_model, "args.model_policy.default_model"));
2197    }
2198
2199    for (candidate, source) in requested {
2200        if provider_catalog_has_model(&providers, &candidate) {
2201            return (Some(candidate), source.to_string());
2202        }
2203    }
2204
2205    let fallback = providers
2206        .into_iter()
2207        .find(|provider| !provider.models.is_empty())
2208        .and_then(|provider| {
2209            let model = provider.models.first()?;
2210            Some(ModelSpec {
2211                provider_id: provider.id,
2212                model_id: model.id.clone(),
2213            })
2214        });
2215
2216    (fallback, "provider_catalog_fallback".to_string())
2217}
2218
2219#[cfg(test)]
2220mod tests {
2221    use super::*;
2222
2223    fn test_state_with_path(path: PathBuf) -> AppState {
2224        let mut state = AppState::new_starting("test-attempt".to_string(), true);
2225        state.shared_resources_path = path;
2226        state.routines_path = tmp_routines_file("shared-state");
2227        state.routine_history_path = tmp_routines_file("routine-history");
2228        state.routine_runs_path = tmp_routines_file("routine-runs");
2229        state
2230    }
2231
2232    fn tmp_resource_file(name: &str) -> PathBuf {
2233        std::env::temp_dir().join(format!(
2234            "tandem-server-{name}-{}.json",
2235            uuid::Uuid::new_v4()
2236        ))
2237    }
2238
2239    fn tmp_routines_file(name: &str) -> PathBuf {
2240        std::env::temp_dir().join(format!(
2241            "tandem-server-routines-{name}-{}.json",
2242            uuid::Uuid::new_v4()
2243        ))
2244    }
2245
2246    #[tokio::test]
2247    async fn shared_resource_put_increments_revision() {
2248        let path = tmp_resource_file("shared-resource-put");
2249        let state = test_state_with_path(path.clone());
2250
2251        let first = state
2252            .put_shared_resource(
2253                "project/demo/board".to_string(),
2254                serde_json::json!({"status":"todo"}),
2255                None,
2256                "agent-1".to_string(),
2257                None,
2258            )
2259            .await
2260            .expect("first put");
2261        assert_eq!(first.rev, 1);
2262
2263        let second = state
2264            .put_shared_resource(
2265                "project/demo/board".to_string(),
2266                serde_json::json!({"status":"doing"}),
2267                Some(1),
2268                "agent-2".to_string(),
2269                Some(60_000),
2270            )
2271            .await
2272            .expect("second put");
2273        assert_eq!(second.rev, 2);
2274        assert_eq!(second.updated_by, "agent-2");
2275        assert_eq!(second.ttl_ms, Some(60_000));
2276
2277        let raw = tokio::fs::read_to_string(path.clone())
2278            .await
2279            .expect("persisted");
2280        assert!(raw.contains("\"rev\": 2"));
2281        let _ = tokio::fs::remove_file(path).await;
2282    }
2283
2284    #[tokio::test]
2285    async fn shared_resource_put_detects_revision_conflict() {
2286        let path = tmp_resource_file("shared-resource-conflict");
2287        let state = test_state_with_path(path.clone());
2288
2289        let _ = state
2290            .put_shared_resource(
2291                "mission/demo/card-1".to_string(),
2292                serde_json::json!({"title":"Card 1"}),
2293                None,
2294                "agent-1".to_string(),
2295                None,
2296            )
2297            .await
2298            .expect("seed put");
2299
2300        let conflict = state
2301            .put_shared_resource(
2302                "mission/demo/card-1".to_string(),
2303                serde_json::json!({"title":"Card 1 edited"}),
2304                Some(99),
2305                "agent-2".to_string(),
2306                None,
2307            )
2308            .await
2309            .expect_err("expected conflict");
2310
2311        match conflict {
2312            ResourceStoreError::RevisionConflict(conflict) => {
2313                assert_eq!(conflict.expected_rev, Some(99));
2314                assert_eq!(conflict.current_rev, Some(1));
2315            }
2316            other => panic!("unexpected error: {other:?}"),
2317        }
2318
2319        let _ = tokio::fs::remove_file(path).await;
2320    }
2321
2322    #[tokio::test]
2323    async fn shared_resource_rejects_invalid_namespace_key() {
2324        let path = tmp_resource_file("shared-resource-invalid-key");
2325        let state = test_state_with_path(path.clone());
2326
2327        let error = state
2328            .put_shared_resource(
2329                "global/demo/key".to_string(),
2330                serde_json::json!({"x":1}),
2331                None,
2332                "agent-1".to_string(),
2333                None,
2334            )
2335            .await
2336            .expect_err("invalid key should fail");
2337
2338        match error {
2339            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
2340            other => panic!("unexpected error: {other:?}"),
2341        }
2342
2343        assert!(!path.exists());
2344    }
2345
2346    #[test]
2347    fn derive_status_index_update_for_run_started() {
2348        let event = EngineEvent::new(
2349            "session.run.started",
2350            serde_json::json!({
2351                "sessionID": "s-1",
2352                "runID": "r-1"
2353            }),
2354        );
2355        let update = derive_status_index_update(&event).expect("update");
2356        assert_eq!(update.key, "run/s-1/status");
2357        assert_eq!(
2358            update.value.get("state").and_then(|v| v.as_str()),
2359            Some("running")
2360        );
2361        assert_eq!(
2362            update.value.get("phase").and_then(|v| v.as_str()),
2363            Some("run")
2364        );
2365    }
2366
2367    #[test]
2368    fn derive_status_index_update_for_tool_invocation() {
2369        let event = EngineEvent::new(
2370            "message.part.updated",
2371            serde_json::json!({
2372                "sessionID": "s-2",
2373                "runID": "r-2",
2374                "part": { "type": "tool-invocation", "tool": "todo_write" }
2375            }),
2376        );
2377        let update = derive_status_index_update(&event).expect("update");
2378        assert_eq!(update.key, "run/s-2/status");
2379        assert_eq!(
2380            update.value.get("phase").and_then(|v| v.as_str()),
2381            Some("tool")
2382        );
2383        assert_eq!(
2384            update.value.get("toolActive").and_then(|v| v.as_bool()),
2385            Some(true)
2386        );
2387        assert_eq!(
2388            update.value.get("tool").and_then(|v| v.as_str()),
2389            Some("todo_write")
2390        );
2391    }
2392
2393    #[test]
2394    fn misfire_skip_drops_runs_and_advances_next_fire() {
2395        let (count, next_fire) =
2396            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
2397        assert_eq!(count, 0);
2398        assert_eq!(next_fire, 11_000);
2399    }
2400
2401    #[test]
2402    fn misfire_run_once_emits_single_trigger() {
2403        let (count, next_fire) =
2404            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
2405        assert_eq!(count, 1);
2406        assert_eq!(next_fire, 11_000);
2407    }
2408
2409    #[test]
2410    fn misfire_catch_up_caps_trigger_count() {
2411        let (count, next_fire) = compute_misfire_plan(
2412            25_000,
2413            5_000,
2414            1_000,
2415            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2416        );
2417        assert_eq!(count, 3);
2418        assert_eq!(next_fire, 26_000);
2419    }
2420
2421    #[tokio::test]
2422    async fn routine_put_persists_and_loads() {
2423        let routines_path = tmp_routines_file("persist-load");
2424        let mut state = AppState::new_starting("routines-put".to_string(), true);
2425        state.routines_path = routines_path.clone();
2426
2427        let routine = RoutineSpec {
2428            routine_id: "routine-1".to_string(),
2429            name: "Digest".to_string(),
2430            status: RoutineStatus::Active,
2431            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2432            timezone: "UTC".to_string(),
2433            misfire_policy: RoutineMisfirePolicy::RunOnce,
2434            entrypoint: "mission.default".to_string(),
2435            args: serde_json::json!({"topic":"status"}),
2436            allowed_tools: vec![],
2437            output_targets: vec![],
2438            creator_type: "user".to_string(),
2439            creator_id: "user-1".to_string(),
2440            requires_approval: true,
2441            external_integrations_allowed: false,
2442            next_fire_at_ms: Some(5_000),
2443            last_fired_at_ms: None,
2444        };
2445
2446        state.put_routine(routine).await.expect("store routine");
2447
2448        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
2449        reloaded.routines_path = routines_path.clone();
2450        reloaded.load_routines().await.expect("load routines");
2451        let list = reloaded.list_routines().await;
2452        assert_eq!(list.len(), 1);
2453        assert_eq!(list[0].routine_id, "routine-1");
2454
2455        let _ = tokio::fs::remove_file(routines_path).await;
2456    }
2457
2458    #[tokio::test]
2459    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
2460        let routines_path = tmp_routines_file("misfire-eval");
2461        let mut state = AppState::new_starting("routines-eval".to_string(), true);
2462        state.routines_path = routines_path.clone();
2463
2464        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
2465            routine_id: id.to_string(),
2466            name: id.to_string(),
2467            status: RoutineStatus::Active,
2468            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
2469            timezone: "UTC".to_string(),
2470            misfire_policy: policy,
2471            entrypoint: "mission.default".to_string(),
2472            args: serde_json::json!({}),
2473            allowed_tools: vec![],
2474            output_targets: vec![],
2475            creator_type: "user".to_string(),
2476            creator_id: "u-1".to_string(),
2477            requires_approval: false,
2478            external_integrations_allowed: false,
2479            next_fire_at_ms: Some(5_000),
2480            last_fired_at_ms: None,
2481        };
2482
2483        state
2484            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
2485            .await
2486            .expect("put skip");
2487        state
2488            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
2489            .await
2490            .expect("put once");
2491        state
2492            .put_routine(base(
2493                "routine-catch",
2494                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2495            ))
2496            .await
2497            .expect("put catch");
2498
2499        let plans = state.evaluate_routine_misfires(10_500).await;
2500        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
2501        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
2502        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
2503
2504        assert!(plan_skip.is_none());
2505        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
2506        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
2507
2508        let stored = state.list_routines().await;
2509        let skip_next = stored
2510            .iter()
2511            .find(|r| r.routine_id == "routine-skip")
2512            .and_then(|r| r.next_fire_at_ms)
2513            .expect("skip next");
2514        assert!(skip_next > 10_500);
2515
2516        let _ = tokio::fs::remove_file(routines_path).await;
2517    }
2518
2519    #[test]
2520    fn routine_policy_blocks_external_side_effects_by_default() {
2521        let routine = RoutineSpec {
2522            routine_id: "routine-policy-1".to_string(),
2523            name: "Connector routine".to_string(),
2524            status: RoutineStatus::Active,
2525            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2526            timezone: "UTC".to_string(),
2527            misfire_policy: RoutineMisfirePolicy::RunOnce,
2528            entrypoint: "connector.email.reply".to_string(),
2529            args: serde_json::json!({}),
2530            allowed_tools: vec![],
2531            output_targets: vec![],
2532            creator_type: "user".to_string(),
2533            creator_id: "u-1".to_string(),
2534            requires_approval: true,
2535            external_integrations_allowed: false,
2536            next_fire_at_ms: None,
2537            last_fired_at_ms: None,
2538        };
2539
2540        let decision = evaluate_routine_execution_policy(&routine, "manual");
2541        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
2542    }
2543
2544    #[test]
2545    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
2546        let routine = RoutineSpec {
2547            routine_id: "routine-policy-2".to_string(),
2548            name: "Connector routine".to_string(),
2549            status: RoutineStatus::Active,
2550            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2551            timezone: "UTC".to_string(),
2552            misfire_policy: RoutineMisfirePolicy::RunOnce,
2553            entrypoint: "connector.email.reply".to_string(),
2554            args: serde_json::json!({}),
2555            allowed_tools: vec![],
2556            output_targets: vec![],
2557            creator_type: "user".to_string(),
2558            creator_id: "u-1".to_string(),
2559            requires_approval: true,
2560            external_integrations_allowed: true,
2561            next_fire_at_ms: None,
2562            last_fired_at_ms: None,
2563        };
2564
2565        let decision = evaluate_routine_execution_policy(&routine, "manual");
2566        assert!(matches!(
2567            decision,
2568            RoutineExecutionDecision::RequiresApproval { .. }
2569        ));
2570    }
2571
2572    #[test]
2573    fn routine_policy_allows_non_external_entrypoints() {
2574        let routine = RoutineSpec {
2575            routine_id: "routine-policy-3".to_string(),
2576            name: "Internal mission routine".to_string(),
2577            status: RoutineStatus::Active,
2578            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2579            timezone: "UTC".to_string(),
2580            misfire_policy: RoutineMisfirePolicy::RunOnce,
2581            entrypoint: "mission.default".to_string(),
2582            args: serde_json::json!({}),
2583            allowed_tools: vec![],
2584            output_targets: vec![],
2585            creator_type: "user".to_string(),
2586            creator_id: "u-1".to_string(),
2587            requires_approval: true,
2588            external_integrations_allowed: false,
2589            next_fire_at_ms: None,
2590            last_fired_at_ms: None,
2591        };
2592
2593        let decision = evaluate_routine_execution_policy(&routine, "manual");
2594        assert_eq!(decision, RoutineExecutionDecision::Allowed);
2595    }
2596
2597    #[tokio::test]
2598    async fn claim_next_queued_routine_run_marks_oldest_running() {
2599        let mut state = AppState::new_starting("routine-claim".to_string(), true);
2600        state.routine_runs_path = tmp_routines_file("routine-claim-runs");
2601
2602        let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
2603            run_id: run_id.to_string(),
2604            routine_id: "routine-claim".to_string(),
2605            trigger_type: "manual".to_string(),
2606            run_count: 1,
2607            status: RoutineRunStatus::Queued,
2608            created_at_ms,
2609            updated_at_ms: created_at_ms,
2610            fired_at_ms: Some(created_at_ms),
2611            started_at_ms: None,
2612            finished_at_ms: None,
2613            requires_approval: false,
2614            approval_reason: None,
2615            denial_reason: None,
2616            paused_reason: None,
2617            detail: None,
2618            entrypoint: "mission.default".to_string(),
2619            args: serde_json::json!({}),
2620            allowed_tools: vec![],
2621            output_targets: vec![],
2622            artifacts: vec![],
2623        };
2624
2625        {
2626            let mut guard = state.routine_runs.write().await;
2627            guard.insert("run-late".to_string(), mk("run-late", 2_000));
2628            guard.insert("run-early".to_string(), mk("run-early", 1_000));
2629        }
2630        state.persist_routine_runs().await.expect("persist");
2631
2632        let claimed = state
2633            .claim_next_queued_routine_run()
2634            .await
2635            .expect("claimed run");
2636        assert_eq!(claimed.run_id, "run-early");
2637        assert_eq!(claimed.status, RoutineRunStatus::Running);
2638        assert!(claimed.started_at_ms.is_some());
2639    }
2640
2641    #[tokio::test]
2642    async fn routine_session_policy_roundtrip_normalizes_tools() {
2643        let state = AppState::new_starting("routine-policy-hook".to_string(), true);
2644        state
2645            .set_routine_session_policy(
2646                "session-routine-1".to_string(),
2647                "run-1".to_string(),
2648                "routine-1".to_string(),
2649                vec![
2650                    "read".to_string(),
2651                    " mcp.arcade.search ".to_string(),
2652                    "read".to_string(),
2653                    "".to_string(),
2654                ],
2655            )
2656            .await;
2657
2658        let policy = state
2659            .routine_session_policy("session-routine-1")
2660            .await
2661            .expect("policy");
2662        assert_eq!(
2663            policy.allowed_tools,
2664            vec!["read".to_string(), "mcp.arcade.search".to_string()]
2665        );
2666    }
2667
2668    #[test]
2669    fn routine_mission_prompt_includes_orchestrated_contract() {
2670        let run = RoutineRunRecord {
2671            run_id: "run-orchestrated-1".to_string(),
2672            routine_id: "automation-orchestrated".to_string(),
2673            trigger_type: "manual".to_string(),
2674            run_count: 1,
2675            status: RoutineRunStatus::Queued,
2676            created_at_ms: 1_000,
2677            updated_at_ms: 1_000,
2678            fired_at_ms: Some(1_000),
2679            started_at_ms: None,
2680            finished_at_ms: None,
2681            requires_approval: true,
2682            approval_reason: None,
2683            denial_reason: None,
2684            paused_reason: None,
2685            detail: None,
2686            entrypoint: "mission.default".to_string(),
2687            args: serde_json::json!({
2688                "prompt": "Coordinate a multi-step release readiness check.",
2689                "mode": "orchestrated",
2690                "success_criteria": ["All blockers listed", "Output artifact written"],
2691                "orchestrator_only_tool_calls": true
2692            }),
2693            allowed_tools: vec!["read".to_string(), "webfetch_document".to_string()],
2694            output_targets: vec!["file://reports/release-readiness.md".to_string()],
2695            artifacts: vec![],
2696        };
2697
2698        let objective = routine_objective_from_args(&run).expect("objective");
2699        let prompt = build_routine_mission_prompt(&run, &objective);
2700
2701        assert!(prompt.contains("Mode: orchestrated"));
2702        assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
2703        assert!(prompt.contains("only the orchestrator may execute tools"));
2704        assert!(prompt.contains("Allowed Tools: read, webfetch_document"));
2705        assert!(prompt.contains("file://reports/release-readiness.md"));
2706    }
2707
2708    #[test]
2709    fn routine_mission_prompt_includes_standalone_defaults() {
2710        let run = RoutineRunRecord {
2711            run_id: "run-standalone-1".to_string(),
2712            routine_id: "automation-standalone".to_string(),
2713            trigger_type: "manual".to_string(),
2714            run_count: 1,
2715            status: RoutineRunStatus::Queued,
2716            created_at_ms: 2_000,
2717            updated_at_ms: 2_000,
2718            fired_at_ms: Some(2_000),
2719            started_at_ms: None,
2720            finished_at_ms: None,
2721            requires_approval: false,
2722            approval_reason: None,
2723            denial_reason: None,
2724            paused_reason: None,
2725            detail: None,
2726            entrypoint: "mission.default".to_string(),
2727            args: serde_json::json!({
2728                "prompt": "Summarize top engineering updates.",
2729                "success_criteria": ["Three bullet summary"]
2730            }),
2731            allowed_tools: vec![],
2732            output_targets: vec![],
2733            artifacts: vec![],
2734        };
2735
2736        let objective = routine_objective_from_args(&run).expect("objective");
2737        let prompt = build_routine_mission_prompt(&run, &objective);
2738
2739        assert!(prompt.contains("Mode: standalone"));
2740        assert!(prompt.contains("Execution Pattern: Standalone mission run"));
2741        assert!(prompt.contains("Allowed Tools: all available by current policy"));
2742        assert!(prompt.contains("Output Targets: none configured"));
2743    }
2744}