Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "256"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
12use tandem_orchestrator::MissionState;
13use tandem_types::EngineEvent;
14use tokio::fs;
15use tokio::sync::RwLock;
16
17use tandem_core::{
18    AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus, PermissionManager,
19    PluginRegistry, Storage,
20};
21use tandem_providers::ProviderRegistry;
22use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
23use tandem_tools::ToolRegistry;
24
25mod agent_teams;
26mod http;
27
28pub use agent_teams::AgentTeamRuntime;
29pub use http::serve;
30
31#[derive(Debug, Clone)]
32pub struct EngineLease {
33    pub lease_id: String,
34    pub client_id: String,
35    pub client_type: String,
36    pub acquired_at_ms: u64,
37    pub last_renewed_at_ms: u64,
38    pub ttl_ms: u64,
39}
40
41impl EngineLease {
42    pub fn is_expired(&self, now_ms: u64) -> bool {
43        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
44    }
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct ActiveRun {
49    #[serde(rename = "runID")]
50    pub run_id: String,
51    #[serde(rename = "startedAtMs")]
52    pub started_at_ms: u64,
53    #[serde(rename = "lastActivityAtMs")]
54    pub last_activity_at_ms: u64,
55    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
56    pub client_id: Option<String>,
57    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
58    pub agent_id: Option<String>,
59    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
60    pub agent_profile: Option<String>,
61}
62
63#[derive(Clone, Default)]
64pub struct RunRegistry {
65    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
66}
67
68impl RunRegistry {
69    pub fn new() -> Self {
70        Self::default()
71    }
72
73    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
74        self.active.read().await.get(session_id).cloned()
75    }
76
77    pub async fn acquire(
78        &self,
79        session_id: &str,
80        run_id: String,
81        client_id: Option<String>,
82        agent_id: Option<String>,
83        agent_profile: Option<String>,
84    ) -> std::result::Result<ActiveRun, ActiveRun> {
85        let mut guard = self.active.write().await;
86        if let Some(existing) = guard.get(session_id).cloned() {
87            return Err(existing);
88        }
89        let now = now_ms();
90        let run = ActiveRun {
91            run_id,
92            started_at_ms: now,
93            last_activity_at_ms: now,
94            client_id,
95            agent_id,
96            agent_profile,
97        };
98        guard.insert(session_id.to_string(), run.clone());
99        Ok(run)
100    }
101
102    pub async fn touch(&self, session_id: &str, run_id: &str) {
103        let mut guard = self.active.write().await;
104        if let Some(run) = guard.get_mut(session_id) {
105            if run.run_id == run_id {
106                run.last_activity_at_ms = now_ms();
107            }
108        }
109    }
110
111    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
112        let mut guard = self.active.write().await;
113        if let Some(run) = guard.get(session_id) {
114            if run.run_id == run_id {
115                return guard.remove(session_id);
116            }
117        }
118        None
119    }
120
121    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
122        self.active.write().await.remove(session_id)
123    }
124
125    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
126        let now = now_ms();
127        let mut guard = self.active.write().await;
128        let stale_ids = guard
129            .iter()
130            .filter_map(|(session_id, run)| {
131                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
132                    Some(session_id.clone())
133                } else {
134                    None
135                }
136            })
137            .collect::<Vec<_>>();
138        let mut out = Vec::with_capacity(stale_ids.len());
139        for session_id in stale_ids {
140            if let Some(run) = guard.remove(&session_id) {
141                out.push((session_id, run));
142            }
143        }
144        out
145    }
146}
147
148pub fn now_ms() -> u64 {
149    SystemTime::now()
150        .duration_since(UNIX_EPOCH)
151        .map(|d| d.as_millis() as u64)
152        .unwrap_or(0)
153}
154
155pub fn build_id() -> String {
156    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
157        let trimmed = explicit.trim();
158        if !trimmed.is_empty() {
159            return trimmed.to_string();
160        }
161    }
162    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
163        let trimmed = git_sha.trim();
164        if !trimmed.is_empty() {
165            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
166        }
167    }
168    env!("CARGO_PKG_VERSION").to_string()
169}
170
171pub fn binary_path_for_health() -> Option<String> {
172    #[cfg(debug_assertions)]
173    {
174        std::env::current_exe()
175            .ok()
176            .map(|p| p.to_string_lossy().to_string())
177    }
178    #[cfg(not(debug_assertions))]
179    {
180        None
181    }
182}
183
184#[derive(Clone)]
185pub struct RuntimeState {
186    pub storage: Arc<Storage>,
187    pub config: ConfigStore,
188    pub event_bus: EventBus,
189    pub providers: ProviderRegistry,
190    pub plugins: PluginRegistry,
191    pub agents: AgentRegistry,
192    pub tools: ToolRegistry,
193    pub permissions: PermissionManager,
194    pub mcp: McpRegistry,
195    pub pty: PtyManager,
196    pub lsp: LspManager,
197    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
198    pub logs: Arc<RwLock<Vec<Value>>>,
199    pub workspace_index: WorkspaceIndex,
200    pub cancellations: CancellationRegistry,
201    pub engine_loop: EngineLoop,
202}
203
204#[derive(Debug, Clone)]
205pub struct GovernedMemoryRecord {
206    pub id: String,
207    pub run_id: String,
208    pub partition: MemoryPartition,
209    pub kind: MemoryContentKind,
210    pub content: String,
211    pub artifact_refs: Vec<String>,
212    pub classification: MemoryClassification,
213    pub metadata: Option<Value>,
214    pub source_memory_id: Option<String>,
215    pub created_at_ms: u64,
216}
217
218#[derive(Debug, Clone, Serialize)]
219pub struct MemoryAuditEvent {
220    pub audit_id: String,
221    pub action: String,
222    pub run_id: String,
223    pub memory_id: Option<String>,
224    pub source_memory_id: Option<String>,
225    pub to_tier: Option<GovernedMemoryTier>,
226    pub partition_key: String,
227    pub actor: String,
228    pub status: String,
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub detail: Option<String>,
231    pub created_at_ms: u64,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct SharedResourceRecord {
236    pub key: String,
237    pub value: Value,
238    pub rev: u64,
239    pub updated_at_ms: u64,
240    pub updated_by: String,
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub ttl_ms: Option<u64>,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
246#[serde(rename_all = "snake_case")]
247pub enum RoutineSchedule {
248    IntervalSeconds { seconds: u64 },
249    Cron { expression: String },
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
253#[serde(rename_all = "snake_case", tag = "type")]
254pub enum RoutineMisfirePolicy {
255    Skip,
256    RunOnce,
257    CatchUp { max_runs: u32 },
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
261#[serde(rename_all = "snake_case")]
262pub enum RoutineStatus {
263    Active,
264    Paused,
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct RoutineSpec {
269    pub routine_id: String,
270    pub name: String,
271    pub status: RoutineStatus,
272    pub schedule: RoutineSchedule,
273    pub timezone: String,
274    pub misfire_policy: RoutineMisfirePolicy,
275    pub entrypoint: String,
276    #[serde(default)]
277    pub args: Value,
278    pub creator_type: String,
279    pub creator_id: String,
280    pub requires_approval: bool,
281    pub external_integrations_allowed: bool,
282    #[serde(default, skip_serializing_if = "Option::is_none")]
283    pub next_fire_at_ms: Option<u64>,
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    pub last_fired_at_ms: Option<u64>,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct RoutineHistoryEvent {
290    pub routine_id: String,
291    pub trigger_type: String,
292    pub run_count: u32,
293    pub fired_at_ms: u64,
294    pub status: String,
295    #[serde(default, skip_serializing_if = "Option::is_none")]
296    pub detail: Option<String>,
297}
298
299#[derive(Debug, Clone, Serialize)]
300pub struct RoutineTriggerPlan {
301    pub routine_id: String,
302    pub run_count: u32,
303    pub scheduled_at_ms: u64,
304    pub next_fire_at_ms: u64,
305}
306
307#[derive(Debug, Clone, Serialize)]
308pub struct ResourceConflict {
309    pub key: String,
310    pub expected_rev: Option<u64>,
311    pub current_rev: Option<u64>,
312}
313
314#[derive(Debug, Clone, Serialize)]
315#[serde(tag = "type", rename_all = "snake_case")]
316pub enum ResourceStoreError {
317    InvalidKey { key: String },
318    RevisionConflict(ResourceConflict),
319    PersistFailed { message: String },
320}
321
322#[derive(Debug, Clone, Serialize)]
323#[serde(tag = "type", rename_all = "snake_case")]
324pub enum RoutineStoreError {
325    InvalidRoutineId { routine_id: String },
326    InvalidSchedule { detail: String },
327    PersistFailed { message: String },
328}
329
330#[derive(Debug, Clone)]
331pub enum StartupStatus {
332    Starting,
333    Ready,
334    Failed,
335}
336
337#[derive(Debug, Clone)]
338pub struct StartupState {
339    pub status: StartupStatus,
340    pub phase: String,
341    pub started_at_ms: u64,
342    pub attempt_id: String,
343    pub last_error: Option<String>,
344}
345
346#[derive(Debug, Clone)]
347pub struct StartupSnapshot {
348    pub status: StartupStatus,
349    pub phase: String,
350    pub started_at_ms: u64,
351    pub attempt_id: String,
352    pub last_error: Option<String>,
353    pub elapsed_ms: u64,
354}
355
356#[derive(Clone)]
357pub struct AppState {
358    pub runtime: Arc<OnceLock<RuntimeState>>,
359    pub startup: Arc<RwLock<StartupState>>,
360    pub in_process_mode: Arc<AtomicBool>,
361    pub api_token: Arc<RwLock<Option<String>>>,
362    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
363    pub run_registry: RunRegistry,
364    pub run_stale_ms: u64,
365    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
366    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
367    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
368    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
369    pub shared_resources_path: PathBuf,
370    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
371    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
372    pub routines_path: PathBuf,
373    pub agent_teams: AgentTeamRuntime,
374}
375
376#[derive(Debug, Clone)]
377struct StatusIndexUpdate {
378    key: String,
379    value: Value,
380}
381
382impl AppState {
383    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
384        Self {
385            runtime: Arc::new(OnceLock::new()),
386            startup: Arc::new(RwLock::new(StartupState {
387                status: StartupStatus::Starting,
388                phase: "boot".to_string(),
389                started_at_ms: now_ms(),
390                attempt_id,
391                last_error: None,
392            })),
393            in_process_mode: Arc::new(AtomicBool::new(in_process)),
394            api_token: Arc::new(RwLock::new(None)),
395            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
396            run_registry: RunRegistry::new(),
397            run_stale_ms: resolve_run_stale_ms(),
398            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
399            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
400            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
401            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
402            shared_resources_path: resolve_shared_resources_path(),
403            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
404            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
405            routines_path: resolve_routines_path(),
406            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
407        }
408    }
409
410    pub fn is_ready(&self) -> bool {
411        self.runtime.get().is_some()
412    }
413
414    pub fn mode_label(&self) -> &'static str {
415        if self.in_process_mode.load(Ordering::Relaxed) {
416            "in-process"
417        } else {
418            "sidecar"
419        }
420    }
421
422    pub async fn api_token(&self) -> Option<String> {
423        self.api_token.read().await.clone()
424    }
425
426    pub async fn set_api_token(&self, token: Option<String>) {
427        *self.api_token.write().await = token;
428    }
429
430    pub async fn startup_snapshot(&self) -> StartupSnapshot {
431        let state = self.startup.read().await.clone();
432        StartupSnapshot {
433            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
434            status: state.status,
435            phase: state.phase,
436            started_at_ms: state.started_at_ms,
437            attempt_id: state.attempt_id,
438            last_error: state.last_error,
439        }
440    }
441
442    pub async fn set_phase(&self, phase: impl Into<String>) {
443        let mut startup = self.startup.write().await;
444        startup.phase = phase.into();
445    }
446
447    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
448        self.runtime
449            .set(runtime)
450            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
451        self.engine_loop
452            .set_spawn_agent_hook(std::sync::Arc::new(
453                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
454            ))
455            .await;
456        self.engine_loop
457            .set_tool_policy_hook(std::sync::Arc::new(
458                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
459            ))
460            .await;
461        let _ = self.load_shared_resources().await;
462        let _ = self.load_routines().await;
463        let workspace_root = self.workspace_index.snapshot().await.root;
464        let _ = self
465            .agent_teams
466            .ensure_loaded_for_workspace(&workspace_root)
467            .await;
468        let mut startup = self.startup.write().await;
469        startup.status = StartupStatus::Ready;
470        startup.phase = "ready".to_string();
471        startup.last_error = None;
472        Ok(())
473    }
474
475    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
476        let mut startup = self.startup.write().await;
477        startup.status = StartupStatus::Failed;
478        startup.phase = phase.into();
479        startup.last_error = Some(error.into());
480    }
481
482    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
483        if !self.shared_resources_path.exists() {
484            return Ok(());
485        }
486        let raw = fs::read_to_string(&self.shared_resources_path).await?;
487        let parsed =
488            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
489                .unwrap_or_default();
490        let mut guard = self.shared_resources.write().await;
491        *guard = parsed;
492        Ok(())
493    }
494
495    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
496        if let Some(parent) = self.shared_resources_path.parent() {
497            fs::create_dir_all(parent).await?;
498        }
499        let payload = {
500            let guard = self.shared_resources.read().await;
501            serde_json::to_string_pretty(&*guard)?
502        };
503        fs::write(&self.shared_resources_path, payload).await?;
504        Ok(())
505    }
506
507    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
508        self.shared_resources.read().await.get(key).cloned()
509    }
510
511    pub async fn list_shared_resources(
512        &self,
513        prefix: Option<&str>,
514        limit: usize,
515    ) -> Vec<SharedResourceRecord> {
516        let limit = limit.clamp(1, 500);
517        let mut rows = self
518            .shared_resources
519            .read()
520            .await
521            .values()
522            .filter(|record| {
523                if let Some(prefix) = prefix {
524                    record.key.starts_with(prefix)
525                } else {
526                    true
527                }
528            })
529            .cloned()
530            .collect::<Vec<_>>();
531        rows.sort_by(|a, b| a.key.cmp(&b.key));
532        rows.truncate(limit);
533        rows
534    }
535
536    pub async fn put_shared_resource(
537        &self,
538        key: String,
539        value: Value,
540        if_match_rev: Option<u64>,
541        updated_by: String,
542        ttl_ms: Option<u64>,
543    ) -> Result<SharedResourceRecord, ResourceStoreError> {
544        if !is_valid_resource_key(&key) {
545            return Err(ResourceStoreError::InvalidKey { key });
546        }
547
548        let now = now_ms();
549        let mut guard = self.shared_resources.write().await;
550        let existing = guard.get(&key).cloned();
551
552        if let Some(expected) = if_match_rev {
553            let current = existing.as_ref().map(|row| row.rev);
554            if current != Some(expected) {
555                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
556                    key,
557                    expected_rev: Some(expected),
558                    current_rev: current,
559                }));
560            }
561        }
562
563        let next_rev = existing
564            .as_ref()
565            .map(|row| row.rev.saturating_add(1))
566            .unwrap_or(1);
567
568        let record = SharedResourceRecord {
569            key: key.clone(),
570            value,
571            rev: next_rev,
572            updated_at_ms: now,
573            updated_by,
574            ttl_ms,
575        };
576
577        let previous = guard.insert(key.clone(), record.clone());
578        drop(guard);
579
580        if let Err(error) = self.persist_shared_resources().await {
581            let mut rollback = self.shared_resources.write().await;
582            if let Some(previous) = previous {
583                rollback.insert(key, previous);
584            } else {
585                rollback.remove(&key);
586            }
587            return Err(ResourceStoreError::PersistFailed {
588                message: error.to_string(),
589            });
590        }
591
592        Ok(record)
593    }
594
595    pub async fn delete_shared_resource(
596        &self,
597        key: &str,
598        if_match_rev: Option<u64>,
599    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
600        if !is_valid_resource_key(key) {
601            return Err(ResourceStoreError::InvalidKey {
602                key: key.to_string(),
603            });
604        }
605
606        let mut guard = self.shared_resources.write().await;
607        let current = guard.get(key).cloned();
608        if let Some(expected) = if_match_rev {
609            let current_rev = current.as_ref().map(|row| row.rev);
610            if current_rev != Some(expected) {
611                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
612                    key: key.to_string(),
613                    expected_rev: Some(expected),
614                    current_rev,
615                }));
616            }
617        }
618
619        let removed = guard.remove(key);
620        drop(guard);
621
622        if let Err(error) = self.persist_shared_resources().await {
623            if let Some(record) = removed.clone() {
624                self.shared_resources
625                    .write()
626                    .await
627                    .insert(record.key.clone(), record);
628            }
629            return Err(ResourceStoreError::PersistFailed {
630                message: error.to_string(),
631            });
632        }
633
634        Ok(removed)
635    }
636
637    pub async fn load_routines(&self) -> anyhow::Result<()> {
638        if !self.routines_path.exists() {
639            return Ok(());
640        }
641        let raw = fs::read_to_string(&self.routines_path).await?;
642        let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
643            .unwrap_or_default();
644        let mut guard = self.routines.write().await;
645        *guard = parsed;
646        Ok(())
647    }
648
649    pub async fn persist_routines(&self) -> anyhow::Result<()> {
650        if let Some(parent) = self.routines_path.parent() {
651            fs::create_dir_all(parent).await?;
652        }
653        let payload = {
654            let guard = self.routines.read().await;
655            serde_json::to_string_pretty(&*guard)?
656        };
657        fs::write(&self.routines_path, payload).await?;
658        Ok(())
659    }
660
661    pub async fn put_routine(
662        &self,
663        mut routine: RoutineSpec,
664    ) -> Result<RoutineSpec, RoutineStoreError> {
665        if routine.routine_id.trim().is_empty() {
666            return Err(RoutineStoreError::InvalidRoutineId {
667                routine_id: routine.routine_id,
668            });
669        }
670
671        let interval = match routine.schedule {
672            RoutineSchedule::IntervalSeconds { seconds } => {
673                if seconds == 0 {
674                    return Err(RoutineStoreError::InvalidSchedule {
675                        detail: "interval_seconds must be > 0".to_string(),
676                    });
677                }
678                Some(seconds)
679            }
680            RoutineSchedule::Cron { .. } => None,
681        };
682        if routine.next_fire_at_ms.is_none() {
683            routine.next_fire_at_ms = Some(now_ms().saturating_add(interval.unwrap_or(60) * 1000));
684        }
685
686        let mut guard = self.routines.write().await;
687        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
688        drop(guard);
689
690        if let Err(error) = self.persist_routines().await {
691            let mut rollback = self.routines.write().await;
692            if let Some(previous) = previous {
693                rollback.insert(previous.routine_id.clone(), previous);
694            } else {
695                rollback.remove(&routine.routine_id);
696            }
697            return Err(RoutineStoreError::PersistFailed {
698                message: error.to_string(),
699            });
700        }
701
702        Ok(routine)
703    }
704
705    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
706        let mut rows = self
707            .routines
708            .read()
709            .await
710            .values()
711            .cloned()
712            .collect::<Vec<_>>();
713        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
714        rows
715    }
716
717    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
718        self.routines.read().await.get(routine_id).cloned()
719    }
720
721    pub async fn delete_routine(
722        &self,
723        routine_id: &str,
724    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
725        let mut guard = self.routines.write().await;
726        let removed = guard.remove(routine_id);
727        drop(guard);
728
729        if let Err(error) = self.persist_routines().await {
730            if let Some(removed) = removed.clone() {
731                self.routines
732                    .write()
733                    .await
734                    .insert(removed.routine_id.clone(), removed);
735            }
736            return Err(RoutineStoreError::PersistFailed {
737                message: error.to_string(),
738            });
739        }
740        Ok(removed)
741    }
742
743    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
744        let mut plans = Vec::new();
745        let mut guard = self.routines.write().await;
746        for routine in guard.values_mut() {
747            if routine.status != RoutineStatus::Active {
748                continue;
749            }
750            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
751                continue;
752            };
753            let Some(interval_ms) = routine_interval_ms(&routine.schedule) else {
754                continue;
755            };
756            if now_ms < next_fire_at_ms {
757                continue;
758            }
759            let (run_count, next_fire_at_ms) = compute_misfire_plan(
760                now_ms,
761                next_fire_at_ms,
762                interval_ms,
763                &routine.misfire_policy,
764            );
765            routine.next_fire_at_ms = Some(next_fire_at_ms);
766            if run_count == 0 {
767                continue;
768            }
769            plans.push(RoutineTriggerPlan {
770                routine_id: routine.routine_id.clone(),
771                run_count,
772                scheduled_at_ms: now_ms,
773                next_fire_at_ms,
774            });
775        }
776        drop(guard);
777        let _ = self.persist_routines().await;
778        plans
779    }
780
781    pub async fn mark_routine_fired(
782        &self,
783        routine_id: &str,
784        fired_at_ms: u64,
785    ) -> Option<RoutineSpec> {
786        let mut guard = self.routines.write().await;
787        let routine = guard.get_mut(routine_id)?;
788        routine.last_fired_at_ms = Some(fired_at_ms);
789        let updated = routine.clone();
790        drop(guard);
791        let _ = self.persist_routines().await;
792        Some(updated)
793    }
794
795    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
796        let mut history = self.routine_history.write().await;
797        history
798            .entry(event.routine_id.clone())
799            .or_default()
800            .push(event);
801    }
802
803    pub async fn list_routine_history(
804        &self,
805        routine_id: &str,
806        limit: usize,
807    ) -> Vec<RoutineHistoryEvent> {
808        let limit = limit.clamp(1, 500);
809        let mut rows = self
810            .routine_history
811            .read()
812            .await
813            .get(routine_id)
814            .cloned()
815            .unwrap_or_default();
816        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
817        rows.truncate(limit);
818        rows
819    }
820}
821
822fn resolve_run_stale_ms() -> u64 {
823    std::env::var("TANDEM_RUN_STALE_MS")
824        .ok()
825        .and_then(|v| v.trim().parse::<u64>().ok())
826        .unwrap_or(120_000)
827        .clamp(30_000, 600_000)
828}
829
830fn resolve_shared_resources_path() -> PathBuf {
831    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
832        let trimmed = dir.trim();
833        if !trimmed.is_empty() {
834            return PathBuf::from(trimmed).join("shared_resources.json");
835        }
836    }
837    PathBuf::from(".tandem").join("shared_resources.json")
838}
839
840fn resolve_routines_path() -> PathBuf {
841    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
842        let trimmed = dir.trim();
843        if !trimmed.is_empty() {
844            return PathBuf::from(trimmed).join("routines.json");
845        }
846    }
847    PathBuf::from(".tandem").join("routines.json")
848}
849
850fn resolve_agent_team_audit_path() -> PathBuf {
851    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
852        let trimmed = base.trim();
853        if !trimmed.is_empty() {
854            return PathBuf::from(trimmed)
855                .join("agent-team")
856                .join("audit.log.jsonl");
857        }
858    }
859    PathBuf::from(".tandem")
860        .join("agent-team")
861        .join("audit.log.jsonl")
862}
863
864fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
865    match schedule {
866        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
867        RoutineSchedule::Cron { .. } => None,
868    }
869}
870
871fn compute_misfire_plan(
872    now_ms: u64,
873    next_fire_at_ms: u64,
874    interval_ms: u64,
875    policy: &RoutineMisfirePolicy,
876) -> (u32, u64) {
877    if now_ms < next_fire_at_ms || interval_ms == 0 {
878        return (0, next_fire_at_ms);
879    }
880    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
881    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
882    match policy {
883        RoutineMisfirePolicy::Skip => (0, aligned_next),
884        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
885        RoutineMisfirePolicy::CatchUp { max_runs } => {
886            let count = missed.min(u64::from(*max_runs)) as u32;
887            (count, aligned_next)
888        }
889    }
890}
891
892#[derive(Debug, Clone, PartialEq, Eq)]
893pub enum RoutineExecutionDecision {
894    Allowed,
895    RequiresApproval { reason: String },
896    Blocked { reason: String },
897}
898
899pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
900    let entrypoint = routine.entrypoint.to_ascii_lowercase();
901    if entrypoint.starts_with("connector.")
902        || entrypoint.starts_with("integration.")
903        || entrypoint.contains("external")
904    {
905        return true;
906    }
907    routine
908        .args
909        .get("uses_external_integrations")
910        .and_then(|v| v.as_bool())
911        .unwrap_or(false)
912        || routine
913            .args
914            .get("connector_id")
915            .and_then(|v| v.as_str())
916            .is_some()
917}
918
919pub fn evaluate_routine_execution_policy(
920    routine: &RoutineSpec,
921    trigger_type: &str,
922) -> RoutineExecutionDecision {
923    if !routine_uses_external_integrations(routine) {
924        return RoutineExecutionDecision::Allowed;
925    }
926    if !routine.external_integrations_allowed {
927        return RoutineExecutionDecision::Blocked {
928            reason: "external integrations are disabled by policy".to_string(),
929        };
930    }
931    if routine.requires_approval {
932        return RoutineExecutionDecision::RequiresApproval {
933            reason: format!(
934                "manual approval required before external side effects ({})",
935                trigger_type
936            ),
937        };
938    }
939    RoutineExecutionDecision::Allowed
940}
941
942fn is_valid_resource_key(key: &str) -> bool {
943    let trimmed = key.trim();
944    if trimmed.is_empty() {
945        return false;
946    }
947    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
948    if !allowed_prefix
949        .iter()
950        .any(|prefix| trimmed.starts_with(prefix))
951    {
952        return false;
953    }
954    !trimmed.contains("//")
955}
956
957impl Deref for AppState {
958    type Target = RuntimeState;
959
960    fn deref(&self) -> &Self::Target {
961        self.runtime
962            .get()
963            .expect("runtime accessed before startup completion")
964    }
965}
966
967fn extract_event_session_id(properties: &Value) -> Option<String> {
968    properties
969        .get("sessionID")
970        .or_else(|| properties.get("sessionId"))
971        .or_else(|| properties.get("id"))
972        .and_then(|v| v.as_str())
973        .map(|s| s.to_string())
974}
975
976fn extract_event_run_id(properties: &Value) -> Option<String> {
977    properties
978        .get("runID")
979        .or_else(|| properties.get("run_id"))
980        .and_then(|v| v.as_str())
981        .map(|s| s.to_string())
982}
983
984fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
985    let session_id = extract_event_session_id(&event.properties)?;
986    let run_id = extract_event_run_id(&event.properties);
987    let key = format!("run/{session_id}/status");
988
989    let mut base = serde_json::Map::new();
990    base.insert("sessionID".to_string(), Value::String(session_id));
991    if let Some(run_id) = run_id {
992        base.insert("runID".to_string(), Value::String(run_id));
993    }
994
995    match event.event_type.as_str() {
996        "session.run.started" => {
997            base.insert("state".to_string(), Value::String("running".to_string()));
998            base.insert("phase".to_string(), Value::String("run".to_string()));
999            base.insert(
1000                "eventType".to_string(),
1001                Value::String("session.run.started".to_string()),
1002            );
1003            Some(StatusIndexUpdate {
1004                key,
1005                value: Value::Object(base),
1006            })
1007        }
1008        "session.run.finished" => {
1009            base.insert("state".to_string(), Value::String("finished".to_string()));
1010            base.insert("phase".to_string(), Value::String("run".to_string()));
1011            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1012                base.insert("result".to_string(), Value::String(status.to_string()));
1013            }
1014            base.insert(
1015                "eventType".to_string(),
1016                Value::String("session.run.finished".to_string()),
1017            );
1018            Some(StatusIndexUpdate {
1019                key,
1020                value: Value::Object(base),
1021            })
1022        }
1023        "message.part.updated" => {
1024            let part_type = event
1025                .properties
1026                .get("part")
1027                .and_then(|v| v.get("type"))
1028                .and_then(|v| v.as_str())?;
1029            let (phase, tool_active) = match part_type {
1030                "tool-invocation" => ("tool", true),
1031                "tool-result" => ("run", false),
1032                _ => return None,
1033            };
1034            base.insert("state".to_string(), Value::String("running".to_string()));
1035            base.insert("phase".to_string(), Value::String(phase.to_string()));
1036            base.insert("toolActive".to_string(), Value::Bool(tool_active));
1037            if let Some(tool_name) = event
1038                .properties
1039                .get("part")
1040                .and_then(|v| v.get("tool"))
1041                .and_then(|v| v.as_str())
1042            {
1043                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1044            }
1045            base.insert(
1046                "eventType".to_string(),
1047                Value::String("message.part.updated".to_string()),
1048            );
1049            Some(StatusIndexUpdate {
1050                key,
1051                value: Value::Object(base),
1052            })
1053        }
1054        _ => None,
1055    }
1056}
1057
1058pub async fn run_status_indexer(state: AppState) {
1059    let mut rx = state.event_bus.subscribe();
1060    loop {
1061        match rx.recv().await {
1062            Ok(event) => {
1063                if let Some(update) = derive_status_index_update(&event) {
1064                    if let Err(error) = state
1065                        .put_shared_resource(
1066                            update.key,
1067                            update.value,
1068                            None,
1069                            "system.status_indexer".to_string(),
1070                            None,
1071                        )
1072                        .await
1073                    {
1074                        tracing::warn!("status indexer failed to persist update: {error:?}");
1075                    }
1076                }
1077            }
1078            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1079            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1080        }
1081    }
1082}
1083
1084pub async fn run_agent_team_supervisor(state: AppState) {
1085    let mut rx = state.event_bus.subscribe();
1086    loop {
1087        match rx.recv().await {
1088            Ok(event) => {
1089                state.agent_teams.handle_engine_event(&state, &event).await;
1090            }
1091            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1092            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1093        }
1094    }
1095}
1096
1097pub async fn run_routine_scheduler(state: AppState) {
1098    loop {
1099        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1100        let now = now_ms();
1101        let plans = state.evaluate_routine_misfires(now).await;
1102        for plan in plans {
1103            let Some(routine) = state.get_routine(&plan.routine_id).await else {
1104                continue;
1105            };
1106            match evaluate_routine_execution_policy(&routine, "scheduled") {
1107                RoutineExecutionDecision::Allowed => {
1108                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
1109                    state
1110                        .append_routine_history(RoutineHistoryEvent {
1111                            routine_id: plan.routine_id.clone(),
1112                            trigger_type: "scheduled".to_string(),
1113                            run_count: plan.run_count,
1114                            fired_at_ms: now,
1115                            status: "queued".to_string(),
1116                            detail: None,
1117                        })
1118                        .await;
1119                    state.event_bus.publish(EngineEvent::new(
1120                        "routine.fired",
1121                        serde_json::json!({
1122                            "routineID": plan.routine_id,
1123                            "runCount": plan.run_count,
1124                            "scheduledAtMs": plan.scheduled_at_ms,
1125                            "nextFireAtMs": plan.next_fire_at_ms,
1126                        }),
1127                    ));
1128                }
1129                RoutineExecutionDecision::RequiresApproval { reason } => {
1130                    state
1131                        .append_routine_history(RoutineHistoryEvent {
1132                            routine_id: plan.routine_id.clone(),
1133                            trigger_type: "scheduled".to_string(),
1134                            run_count: plan.run_count,
1135                            fired_at_ms: now,
1136                            status: "pending_approval".to_string(),
1137                            detail: Some(reason.clone()),
1138                        })
1139                        .await;
1140                    state.event_bus.publish(EngineEvent::new(
1141                        "routine.approval_required",
1142                        serde_json::json!({
1143                            "routineID": plan.routine_id,
1144                            "runCount": plan.run_count,
1145                            "triggerType": "scheduled",
1146                            "reason": reason,
1147                        }),
1148                    ));
1149                }
1150                RoutineExecutionDecision::Blocked { reason } => {
1151                    state
1152                        .append_routine_history(RoutineHistoryEvent {
1153                            routine_id: plan.routine_id.clone(),
1154                            trigger_type: "scheduled".to_string(),
1155                            run_count: plan.run_count,
1156                            fired_at_ms: now,
1157                            status: "blocked_policy".to_string(),
1158                            detail: Some(reason.clone()),
1159                        })
1160                        .await;
1161                    state.event_bus.publish(EngineEvent::new(
1162                        "routine.blocked",
1163                        serde_json::json!({
1164                            "routineID": plan.routine_id,
1165                            "runCount": plan.run_count,
1166                            "triggerType": "scheduled",
1167                            "reason": reason,
1168                        }),
1169                    ));
1170                }
1171            }
1172        }
1173    }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use super::*;
1179
1180    fn test_state_with_path(path: PathBuf) -> AppState {
1181        let mut state = AppState::new_starting("test-attempt".to_string(), true);
1182        state.shared_resources_path = path;
1183        state.routines_path = tmp_routines_file("shared-state");
1184        state
1185    }
1186
1187    fn tmp_resource_file(name: &str) -> PathBuf {
1188        std::env::temp_dir().join(format!(
1189            "tandem-server-{name}-{}.json",
1190            uuid::Uuid::new_v4()
1191        ))
1192    }
1193
1194    fn tmp_routines_file(name: &str) -> PathBuf {
1195        std::env::temp_dir().join(format!(
1196            "tandem-server-routines-{name}-{}.json",
1197            uuid::Uuid::new_v4()
1198        ))
1199    }
1200
1201    #[tokio::test]
1202    async fn shared_resource_put_increments_revision() {
1203        let path = tmp_resource_file("shared-resource-put");
1204        let state = test_state_with_path(path.clone());
1205
1206        let first = state
1207            .put_shared_resource(
1208                "project/demo/board".to_string(),
1209                serde_json::json!({"status":"todo"}),
1210                None,
1211                "agent-1".to_string(),
1212                None,
1213            )
1214            .await
1215            .expect("first put");
1216        assert_eq!(first.rev, 1);
1217
1218        let second = state
1219            .put_shared_resource(
1220                "project/demo/board".to_string(),
1221                serde_json::json!({"status":"doing"}),
1222                Some(1),
1223                "agent-2".to_string(),
1224                Some(60_000),
1225            )
1226            .await
1227            .expect("second put");
1228        assert_eq!(second.rev, 2);
1229        assert_eq!(second.updated_by, "agent-2");
1230        assert_eq!(second.ttl_ms, Some(60_000));
1231
1232        let raw = tokio::fs::read_to_string(path.clone())
1233            .await
1234            .expect("persisted");
1235        assert!(raw.contains("\"rev\": 2"));
1236        let _ = tokio::fs::remove_file(path).await;
1237    }
1238
1239    #[tokio::test]
1240    async fn shared_resource_put_detects_revision_conflict() {
1241        let path = tmp_resource_file("shared-resource-conflict");
1242        let state = test_state_with_path(path.clone());
1243
1244        let _ = state
1245            .put_shared_resource(
1246                "mission/demo/card-1".to_string(),
1247                serde_json::json!({"title":"Card 1"}),
1248                None,
1249                "agent-1".to_string(),
1250                None,
1251            )
1252            .await
1253            .expect("seed put");
1254
1255        let conflict = state
1256            .put_shared_resource(
1257                "mission/demo/card-1".to_string(),
1258                serde_json::json!({"title":"Card 1 edited"}),
1259                Some(99),
1260                "agent-2".to_string(),
1261                None,
1262            )
1263            .await
1264            .expect_err("expected conflict");
1265
1266        match conflict {
1267            ResourceStoreError::RevisionConflict(conflict) => {
1268                assert_eq!(conflict.expected_rev, Some(99));
1269                assert_eq!(conflict.current_rev, Some(1));
1270            }
1271            other => panic!("unexpected error: {other:?}"),
1272        }
1273
1274        let _ = tokio::fs::remove_file(path).await;
1275    }
1276
1277    #[tokio::test]
1278    async fn shared_resource_rejects_invalid_namespace_key() {
1279        let path = tmp_resource_file("shared-resource-invalid-key");
1280        let state = test_state_with_path(path.clone());
1281
1282        let error = state
1283            .put_shared_resource(
1284                "global/demo/key".to_string(),
1285                serde_json::json!({"x":1}),
1286                None,
1287                "agent-1".to_string(),
1288                None,
1289            )
1290            .await
1291            .expect_err("invalid key should fail");
1292
1293        match error {
1294            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
1295            other => panic!("unexpected error: {other:?}"),
1296        }
1297
1298        assert!(!path.exists());
1299    }
1300
1301    #[test]
1302    fn derive_status_index_update_for_run_started() {
1303        let event = EngineEvent::new(
1304            "session.run.started",
1305            serde_json::json!({
1306                "sessionID": "s-1",
1307                "runID": "r-1"
1308            }),
1309        );
1310        let update = derive_status_index_update(&event).expect("update");
1311        assert_eq!(update.key, "run/s-1/status");
1312        assert_eq!(
1313            update.value.get("state").and_then(|v| v.as_str()),
1314            Some("running")
1315        );
1316        assert_eq!(
1317            update.value.get("phase").and_then(|v| v.as_str()),
1318            Some("run")
1319        );
1320    }
1321
1322    #[test]
1323    fn derive_status_index_update_for_tool_invocation() {
1324        let event = EngineEvent::new(
1325            "message.part.updated",
1326            serde_json::json!({
1327                "sessionID": "s-2",
1328                "runID": "r-2",
1329                "part": { "type": "tool-invocation", "tool": "todo_write" }
1330            }),
1331        );
1332        let update = derive_status_index_update(&event).expect("update");
1333        assert_eq!(update.key, "run/s-2/status");
1334        assert_eq!(
1335            update.value.get("phase").and_then(|v| v.as_str()),
1336            Some("tool")
1337        );
1338        assert_eq!(
1339            update.value.get("toolActive").and_then(|v| v.as_bool()),
1340            Some(true)
1341        );
1342        assert_eq!(
1343            update.value.get("tool").and_then(|v| v.as_str()),
1344            Some("todo_write")
1345        );
1346    }
1347
1348    #[test]
1349    fn misfire_skip_drops_runs_and_advances_next_fire() {
1350        let (count, next_fire) =
1351            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
1352        assert_eq!(count, 0);
1353        assert_eq!(next_fire, 11_000);
1354    }
1355
1356    #[test]
1357    fn misfire_run_once_emits_single_trigger() {
1358        let (count, next_fire) =
1359            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
1360        assert_eq!(count, 1);
1361        assert_eq!(next_fire, 11_000);
1362    }
1363
1364    #[test]
1365    fn misfire_catch_up_caps_trigger_count() {
1366        let (count, next_fire) = compute_misfire_plan(
1367            25_000,
1368            5_000,
1369            1_000,
1370            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
1371        );
1372        assert_eq!(count, 3);
1373        assert_eq!(next_fire, 26_000);
1374    }
1375
1376    #[tokio::test]
1377    async fn routine_put_persists_and_loads() {
1378        let routines_path = tmp_routines_file("persist-load");
1379        let mut state = AppState::new_starting("routines-put".to_string(), true);
1380        state.routines_path = routines_path.clone();
1381
1382        let routine = RoutineSpec {
1383            routine_id: "routine-1".to_string(),
1384            name: "Digest".to_string(),
1385            status: RoutineStatus::Active,
1386            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
1387            timezone: "UTC".to_string(),
1388            misfire_policy: RoutineMisfirePolicy::RunOnce,
1389            entrypoint: "mission.default".to_string(),
1390            args: serde_json::json!({"topic":"status"}),
1391            creator_type: "user".to_string(),
1392            creator_id: "user-1".to_string(),
1393            requires_approval: true,
1394            external_integrations_allowed: false,
1395            next_fire_at_ms: Some(5_000),
1396            last_fired_at_ms: None,
1397        };
1398
1399        state.put_routine(routine).await.expect("store routine");
1400
1401        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
1402        reloaded.routines_path = routines_path.clone();
1403        reloaded.load_routines().await.expect("load routines");
1404        let list = reloaded.list_routines().await;
1405        assert_eq!(list.len(), 1);
1406        assert_eq!(list[0].routine_id, "routine-1");
1407
1408        let _ = tokio::fs::remove_file(routines_path).await;
1409    }
1410
1411    #[tokio::test]
1412    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
1413        let routines_path = tmp_routines_file("misfire-eval");
1414        let mut state = AppState::new_starting("routines-eval".to_string(), true);
1415        state.routines_path = routines_path.clone();
1416
1417        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
1418            routine_id: id.to_string(),
1419            name: id.to_string(),
1420            status: RoutineStatus::Active,
1421            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
1422            timezone: "UTC".to_string(),
1423            misfire_policy: policy,
1424            entrypoint: "mission.default".to_string(),
1425            args: serde_json::json!({}),
1426            creator_type: "user".to_string(),
1427            creator_id: "u-1".to_string(),
1428            requires_approval: false,
1429            external_integrations_allowed: false,
1430            next_fire_at_ms: Some(5_000),
1431            last_fired_at_ms: None,
1432        };
1433
1434        state
1435            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
1436            .await
1437            .expect("put skip");
1438        state
1439            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
1440            .await
1441            .expect("put once");
1442        state
1443            .put_routine(base(
1444                "routine-catch",
1445                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
1446            ))
1447            .await
1448            .expect("put catch");
1449
1450        let plans = state.evaluate_routine_misfires(10_500).await;
1451        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
1452        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
1453        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
1454
1455        assert!(plan_skip.is_none());
1456        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
1457        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
1458
1459        let stored = state.list_routines().await;
1460        let skip_next = stored
1461            .iter()
1462            .find(|r| r.routine_id == "routine-skip")
1463            .and_then(|r| r.next_fire_at_ms)
1464            .expect("skip next");
1465        assert!(skip_next > 10_500);
1466
1467        let _ = tokio::fs::remove_file(routines_path).await;
1468    }
1469
1470    #[test]
1471    fn routine_policy_blocks_external_side_effects_by_default() {
1472        let routine = RoutineSpec {
1473            routine_id: "routine-policy-1".to_string(),
1474            name: "Connector routine".to_string(),
1475            status: RoutineStatus::Active,
1476            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
1477            timezone: "UTC".to_string(),
1478            misfire_policy: RoutineMisfirePolicy::RunOnce,
1479            entrypoint: "connector.email.reply".to_string(),
1480            args: serde_json::json!({}),
1481            creator_type: "user".to_string(),
1482            creator_id: "u-1".to_string(),
1483            requires_approval: true,
1484            external_integrations_allowed: false,
1485            next_fire_at_ms: None,
1486            last_fired_at_ms: None,
1487        };
1488
1489        let decision = evaluate_routine_execution_policy(&routine, "manual");
1490        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
1491    }
1492
1493    #[test]
1494    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
1495        let routine = RoutineSpec {
1496            routine_id: "routine-policy-2".to_string(),
1497            name: "Connector routine".to_string(),
1498            status: RoutineStatus::Active,
1499            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
1500            timezone: "UTC".to_string(),
1501            misfire_policy: RoutineMisfirePolicy::RunOnce,
1502            entrypoint: "connector.email.reply".to_string(),
1503            args: serde_json::json!({}),
1504            creator_type: "user".to_string(),
1505            creator_id: "u-1".to_string(),
1506            requires_approval: true,
1507            external_integrations_allowed: true,
1508            next_fire_at_ms: None,
1509            last_fired_at_ms: None,
1510        };
1511
1512        let decision = evaluate_routine_execution_policy(&routine, "manual");
1513        assert!(matches!(
1514            decision,
1515            RoutineExecutionDecision::RequiresApproval { .. }
1516        ));
1517    }
1518
1519    #[test]
1520    fn routine_policy_allows_non_external_entrypoints() {
1521        let routine = RoutineSpec {
1522            routine_id: "routine-policy-3".to_string(),
1523            name: "Internal mission routine".to_string(),
1524            status: RoutineStatus::Active,
1525            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
1526            timezone: "UTC".to_string(),
1527            misfire_policy: RoutineMisfirePolicy::RunOnce,
1528            entrypoint: "mission.default".to_string(),
1529            args: serde_json::json!({}),
1530            creator_type: "user".to_string(),
1531            creator_id: "u-1".to_string(),
1532            requires_approval: true,
1533            external_integrations_allowed: false,
1534            next_fire_at_ms: None,
1535            last_fired_at_ms: None,
1536        };
1537
1538        let decision = evaluate_routine_execution_policy(&routine, "manual");
1539        assert_eq!(decision, RoutineExecutionDecision::Allowed);
1540    }
1541}