1#![recursion_limit = "256"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
12use tandem_orchestrator::MissionState;
13use tandem_types::EngineEvent;
14use tokio::fs;
15use tokio::sync::RwLock;
16
17use tandem_core::{
18 AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus, PermissionManager,
19 PluginRegistry, Storage,
20};
21use tandem_providers::ProviderRegistry;
22use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
23use tandem_tools::ToolRegistry;
24
25mod agent_teams;
26mod http;
27
28pub use agent_teams::AgentTeamRuntime;
29pub use http::serve;
30
31#[derive(Debug, Clone)]
32pub struct EngineLease {
33 pub lease_id: String,
34 pub client_id: String,
35 pub client_type: String,
36 pub acquired_at_ms: u64,
37 pub last_renewed_at_ms: u64,
38 pub ttl_ms: u64,
39}
40
41impl EngineLease {
42 pub fn is_expired(&self, now_ms: u64) -> bool {
43 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
44 }
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct ActiveRun {
49 #[serde(rename = "runID")]
50 pub run_id: String,
51 #[serde(rename = "startedAtMs")]
52 pub started_at_ms: u64,
53 #[serde(rename = "lastActivityAtMs")]
54 pub last_activity_at_ms: u64,
55 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
56 pub client_id: Option<String>,
57 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
58 pub agent_id: Option<String>,
59 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
60 pub agent_profile: Option<String>,
61}
62
63#[derive(Clone, Default)]
64pub struct RunRegistry {
65 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
66}
67
68impl RunRegistry {
69 pub fn new() -> Self {
70 Self::default()
71 }
72
73 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
74 self.active.read().await.get(session_id).cloned()
75 }
76
77 pub async fn acquire(
78 &self,
79 session_id: &str,
80 run_id: String,
81 client_id: Option<String>,
82 agent_id: Option<String>,
83 agent_profile: Option<String>,
84 ) -> std::result::Result<ActiveRun, ActiveRun> {
85 let mut guard = self.active.write().await;
86 if let Some(existing) = guard.get(session_id).cloned() {
87 return Err(existing);
88 }
89 let now = now_ms();
90 let run = ActiveRun {
91 run_id,
92 started_at_ms: now,
93 last_activity_at_ms: now,
94 client_id,
95 agent_id,
96 agent_profile,
97 };
98 guard.insert(session_id.to_string(), run.clone());
99 Ok(run)
100 }
101
102 pub async fn touch(&self, session_id: &str, run_id: &str) {
103 let mut guard = self.active.write().await;
104 if let Some(run) = guard.get_mut(session_id) {
105 if run.run_id == run_id {
106 run.last_activity_at_ms = now_ms();
107 }
108 }
109 }
110
111 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
112 let mut guard = self.active.write().await;
113 if let Some(run) = guard.get(session_id) {
114 if run.run_id == run_id {
115 return guard.remove(session_id);
116 }
117 }
118 None
119 }
120
121 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
122 self.active.write().await.remove(session_id)
123 }
124
125 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
126 let now = now_ms();
127 let mut guard = self.active.write().await;
128 let stale_ids = guard
129 .iter()
130 .filter_map(|(session_id, run)| {
131 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
132 Some(session_id.clone())
133 } else {
134 None
135 }
136 })
137 .collect::<Vec<_>>();
138 let mut out = Vec::with_capacity(stale_ids.len());
139 for session_id in stale_ids {
140 if let Some(run) = guard.remove(&session_id) {
141 out.push((session_id, run));
142 }
143 }
144 out
145 }
146}
147
148pub fn now_ms() -> u64 {
149 SystemTime::now()
150 .duration_since(UNIX_EPOCH)
151 .map(|d| d.as_millis() as u64)
152 .unwrap_or(0)
153}
154
155pub fn build_id() -> String {
156 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
157 let trimmed = explicit.trim();
158 if !trimmed.is_empty() {
159 return trimmed.to_string();
160 }
161 }
162 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
163 let trimmed = git_sha.trim();
164 if !trimmed.is_empty() {
165 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
166 }
167 }
168 env!("CARGO_PKG_VERSION").to_string()
169}
170
171pub fn binary_path_for_health() -> Option<String> {
172 #[cfg(debug_assertions)]
173 {
174 std::env::current_exe()
175 .ok()
176 .map(|p| p.to_string_lossy().to_string())
177 }
178 #[cfg(not(debug_assertions))]
179 {
180 None
181 }
182}
183
184#[derive(Clone)]
185pub struct RuntimeState {
186 pub storage: Arc<Storage>,
187 pub config: ConfigStore,
188 pub event_bus: EventBus,
189 pub providers: ProviderRegistry,
190 pub plugins: PluginRegistry,
191 pub agents: AgentRegistry,
192 pub tools: ToolRegistry,
193 pub permissions: PermissionManager,
194 pub mcp: McpRegistry,
195 pub pty: PtyManager,
196 pub lsp: LspManager,
197 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
198 pub logs: Arc<RwLock<Vec<Value>>>,
199 pub workspace_index: WorkspaceIndex,
200 pub cancellations: CancellationRegistry,
201 pub engine_loop: EngineLoop,
202}
203
204#[derive(Debug, Clone)]
205pub struct GovernedMemoryRecord {
206 pub id: String,
207 pub run_id: String,
208 pub partition: MemoryPartition,
209 pub kind: MemoryContentKind,
210 pub content: String,
211 pub artifact_refs: Vec<String>,
212 pub classification: MemoryClassification,
213 pub metadata: Option<Value>,
214 pub source_memory_id: Option<String>,
215 pub created_at_ms: u64,
216}
217
218#[derive(Debug, Clone, Serialize)]
219pub struct MemoryAuditEvent {
220 pub audit_id: String,
221 pub action: String,
222 pub run_id: String,
223 pub memory_id: Option<String>,
224 pub source_memory_id: Option<String>,
225 pub to_tier: Option<GovernedMemoryTier>,
226 pub partition_key: String,
227 pub actor: String,
228 pub status: String,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub detail: Option<String>,
231 pub created_at_ms: u64,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct SharedResourceRecord {
236 pub key: String,
237 pub value: Value,
238 pub rev: u64,
239 pub updated_at_ms: u64,
240 pub updated_by: String,
241 #[serde(skip_serializing_if = "Option::is_none")]
242 pub ttl_ms: Option<u64>,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
246#[serde(rename_all = "snake_case")]
247pub enum RoutineSchedule {
248 IntervalSeconds { seconds: u64 },
249 Cron { expression: String },
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
253#[serde(rename_all = "snake_case", tag = "type")]
254pub enum RoutineMisfirePolicy {
255 Skip,
256 RunOnce,
257 CatchUp { max_runs: u32 },
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
261#[serde(rename_all = "snake_case")]
262pub enum RoutineStatus {
263 Active,
264 Paused,
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct RoutineSpec {
269 pub routine_id: String,
270 pub name: String,
271 pub status: RoutineStatus,
272 pub schedule: RoutineSchedule,
273 pub timezone: String,
274 pub misfire_policy: RoutineMisfirePolicy,
275 pub entrypoint: String,
276 #[serde(default)]
277 pub args: Value,
278 pub creator_type: String,
279 pub creator_id: String,
280 pub requires_approval: bool,
281 pub external_integrations_allowed: bool,
282 #[serde(default, skip_serializing_if = "Option::is_none")]
283 pub next_fire_at_ms: Option<u64>,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
285 pub last_fired_at_ms: Option<u64>,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct RoutineHistoryEvent {
290 pub routine_id: String,
291 pub trigger_type: String,
292 pub run_count: u32,
293 pub fired_at_ms: u64,
294 pub status: String,
295 #[serde(default, skip_serializing_if = "Option::is_none")]
296 pub detail: Option<String>,
297}
298
299#[derive(Debug, Clone, Serialize)]
300pub struct RoutineTriggerPlan {
301 pub routine_id: String,
302 pub run_count: u32,
303 pub scheduled_at_ms: u64,
304 pub next_fire_at_ms: u64,
305}
306
307#[derive(Debug, Clone, Serialize)]
308pub struct ResourceConflict {
309 pub key: String,
310 pub expected_rev: Option<u64>,
311 pub current_rev: Option<u64>,
312}
313
314#[derive(Debug, Clone, Serialize)]
315#[serde(tag = "type", rename_all = "snake_case")]
316pub enum ResourceStoreError {
317 InvalidKey { key: String },
318 RevisionConflict(ResourceConflict),
319 PersistFailed { message: String },
320}
321
322#[derive(Debug, Clone, Serialize)]
323#[serde(tag = "type", rename_all = "snake_case")]
324pub enum RoutineStoreError {
325 InvalidRoutineId { routine_id: String },
326 InvalidSchedule { detail: String },
327 PersistFailed { message: String },
328}
329
330#[derive(Debug, Clone)]
331pub enum StartupStatus {
332 Starting,
333 Ready,
334 Failed,
335}
336
337#[derive(Debug, Clone)]
338pub struct StartupState {
339 pub status: StartupStatus,
340 pub phase: String,
341 pub started_at_ms: u64,
342 pub attempt_id: String,
343 pub last_error: Option<String>,
344}
345
346#[derive(Debug, Clone)]
347pub struct StartupSnapshot {
348 pub status: StartupStatus,
349 pub phase: String,
350 pub started_at_ms: u64,
351 pub attempt_id: String,
352 pub last_error: Option<String>,
353 pub elapsed_ms: u64,
354}
355
356#[derive(Clone)]
357pub struct AppState {
358 pub runtime: Arc<OnceLock<RuntimeState>>,
359 pub startup: Arc<RwLock<StartupState>>,
360 pub in_process_mode: Arc<AtomicBool>,
361 pub api_token: Arc<RwLock<Option<String>>>,
362 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
363 pub run_registry: RunRegistry,
364 pub run_stale_ms: u64,
365 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
366 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
367 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
368 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
369 pub shared_resources_path: PathBuf,
370 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
371 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
372 pub routines_path: PathBuf,
373 pub agent_teams: AgentTeamRuntime,
374}
375
376#[derive(Debug, Clone)]
377struct StatusIndexUpdate {
378 key: String,
379 value: Value,
380}
381
382impl AppState {
383 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
384 Self {
385 runtime: Arc::new(OnceLock::new()),
386 startup: Arc::new(RwLock::new(StartupState {
387 status: StartupStatus::Starting,
388 phase: "boot".to_string(),
389 started_at_ms: now_ms(),
390 attempt_id,
391 last_error: None,
392 })),
393 in_process_mode: Arc::new(AtomicBool::new(in_process)),
394 api_token: Arc::new(RwLock::new(None)),
395 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
396 run_registry: RunRegistry::new(),
397 run_stale_ms: resolve_run_stale_ms(),
398 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
399 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
400 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
401 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
402 shared_resources_path: resolve_shared_resources_path(),
403 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
404 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
405 routines_path: resolve_routines_path(),
406 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
407 }
408 }
409
410 pub fn is_ready(&self) -> bool {
411 self.runtime.get().is_some()
412 }
413
414 pub fn mode_label(&self) -> &'static str {
415 if self.in_process_mode.load(Ordering::Relaxed) {
416 "in-process"
417 } else {
418 "sidecar"
419 }
420 }
421
422 pub async fn api_token(&self) -> Option<String> {
423 self.api_token.read().await.clone()
424 }
425
426 pub async fn set_api_token(&self, token: Option<String>) {
427 *self.api_token.write().await = token;
428 }
429
430 pub async fn startup_snapshot(&self) -> StartupSnapshot {
431 let state = self.startup.read().await.clone();
432 StartupSnapshot {
433 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
434 status: state.status,
435 phase: state.phase,
436 started_at_ms: state.started_at_ms,
437 attempt_id: state.attempt_id,
438 last_error: state.last_error,
439 }
440 }
441
442 pub async fn set_phase(&self, phase: impl Into<String>) {
443 let mut startup = self.startup.write().await;
444 startup.phase = phase.into();
445 }
446
447 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
448 self.runtime
449 .set(runtime)
450 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
451 self.engine_loop
452 .set_spawn_agent_hook(std::sync::Arc::new(
453 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
454 ))
455 .await;
456 self.engine_loop
457 .set_tool_policy_hook(std::sync::Arc::new(
458 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
459 ))
460 .await;
461 let _ = self.load_shared_resources().await;
462 let _ = self.load_routines().await;
463 let workspace_root = self.workspace_index.snapshot().await.root;
464 let _ = self
465 .agent_teams
466 .ensure_loaded_for_workspace(&workspace_root)
467 .await;
468 let mut startup = self.startup.write().await;
469 startup.status = StartupStatus::Ready;
470 startup.phase = "ready".to_string();
471 startup.last_error = None;
472 Ok(())
473 }
474
475 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
476 let mut startup = self.startup.write().await;
477 startup.status = StartupStatus::Failed;
478 startup.phase = phase.into();
479 startup.last_error = Some(error.into());
480 }
481
482 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
483 if !self.shared_resources_path.exists() {
484 return Ok(());
485 }
486 let raw = fs::read_to_string(&self.shared_resources_path).await?;
487 let parsed =
488 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
489 .unwrap_or_default();
490 let mut guard = self.shared_resources.write().await;
491 *guard = parsed;
492 Ok(())
493 }
494
495 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
496 if let Some(parent) = self.shared_resources_path.parent() {
497 fs::create_dir_all(parent).await?;
498 }
499 let payload = {
500 let guard = self.shared_resources.read().await;
501 serde_json::to_string_pretty(&*guard)?
502 };
503 fs::write(&self.shared_resources_path, payload).await?;
504 Ok(())
505 }
506
507 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
508 self.shared_resources.read().await.get(key).cloned()
509 }
510
511 pub async fn list_shared_resources(
512 &self,
513 prefix: Option<&str>,
514 limit: usize,
515 ) -> Vec<SharedResourceRecord> {
516 let limit = limit.clamp(1, 500);
517 let mut rows = self
518 .shared_resources
519 .read()
520 .await
521 .values()
522 .filter(|record| {
523 if let Some(prefix) = prefix {
524 record.key.starts_with(prefix)
525 } else {
526 true
527 }
528 })
529 .cloned()
530 .collect::<Vec<_>>();
531 rows.sort_by(|a, b| a.key.cmp(&b.key));
532 rows.truncate(limit);
533 rows
534 }
535
536 pub async fn put_shared_resource(
537 &self,
538 key: String,
539 value: Value,
540 if_match_rev: Option<u64>,
541 updated_by: String,
542 ttl_ms: Option<u64>,
543 ) -> Result<SharedResourceRecord, ResourceStoreError> {
544 if !is_valid_resource_key(&key) {
545 return Err(ResourceStoreError::InvalidKey { key });
546 }
547
548 let now = now_ms();
549 let mut guard = self.shared_resources.write().await;
550 let existing = guard.get(&key).cloned();
551
552 if let Some(expected) = if_match_rev {
553 let current = existing.as_ref().map(|row| row.rev);
554 if current != Some(expected) {
555 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
556 key,
557 expected_rev: Some(expected),
558 current_rev: current,
559 }));
560 }
561 }
562
563 let next_rev = existing
564 .as_ref()
565 .map(|row| row.rev.saturating_add(1))
566 .unwrap_or(1);
567
568 let record = SharedResourceRecord {
569 key: key.clone(),
570 value,
571 rev: next_rev,
572 updated_at_ms: now,
573 updated_by,
574 ttl_ms,
575 };
576
577 let previous = guard.insert(key.clone(), record.clone());
578 drop(guard);
579
580 if let Err(error) = self.persist_shared_resources().await {
581 let mut rollback = self.shared_resources.write().await;
582 if let Some(previous) = previous {
583 rollback.insert(key, previous);
584 } else {
585 rollback.remove(&key);
586 }
587 return Err(ResourceStoreError::PersistFailed {
588 message: error.to_string(),
589 });
590 }
591
592 Ok(record)
593 }
594
595 pub async fn delete_shared_resource(
596 &self,
597 key: &str,
598 if_match_rev: Option<u64>,
599 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
600 if !is_valid_resource_key(key) {
601 return Err(ResourceStoreError::InvalidKey {
602 key: key.to_string(),
603 });
604 }
605
606 let mut guard = self.shared_resources.write().await;
607 let current = guard.get(key).cloned();
608 if let Some(expected) = if_match_rev {
609 let current_rev = current.as_ref().map(|row| row.rev);
610 if current_rev != Some(expected) {
611 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
612 key: key.to_string(),
613 expected_rev: Some(expected),
614 current_rev,
615 }));
616 }
617 }
618
619 let removed = guard.remove(key);
620 drop(guard);
621
622 if let Err(error) = self.persist_shared_resources().await {
623 if let Some(record) = removed.clone() {
624 self.shared_resources
625 .write()
626 .await
627 .insert(record.key.clone(), record);
628 }
629 return Err(ResourceStoreError::PersistFailed {
630 message: error.to_string(),
631 });
632 }
633
634 Ok(removed)
635 }
636
637 pub async fn load_routines(&self) -> anyhow::Result<()> {
638 if !self.routines_path.exists() {
639 return Ok(());
640 }
641 let raw = fs::read_to_string(&self.routines_path).await?;
642 let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
643 .unwrap_or_default();
644 let mut guard = self.routines.write().await;
645 *guard = parsed;
646 Ok(())
647 }
648
649 pub async fn persist_routines(&self) -> anyhow::Result<()> {
650 if let Some(parent) = self.routines_path.parent() {
651 fs::create_dir_all(parent).await?;
652 }
653 let payload = {
654 let guard = self.routines.read().await;
655 serde_json::to_string_pretty(&*guard)?
656 };
657 fs::write(&self.routines_path, payload).await?;
658 Ok(())
659 }
660
661 pub async fn put_routine(
662 &self,
663 mut routine: RoutineSpec,
664 ) -> Result<RoutineSpec, RoutineStoreError> {
665 if routine.routine_id.trim().is_empty() {
666 return Err(RoutineStoreError::InvalidRoutineId {
667 routine_id: routine.routine_id,
668 });
669 }
670
671 let interval = match routine.schedule {
672 RoutineSchedule::IntervalSeconds { seconds } => {
673 if seconds == 0 {
674 return Err(RoutineStoreError::InvalidSchedule {
675 detail: "interval_seconds must be > 0".to_string(),
676 });
677 }
678 Some(seconds)
679 }
680 RoutineSchedule::Cron { .. } => None,
681 };
682 if routine.next_fire_at_ms.is_none() {
683 routine.next_fire_at_ms = Some(now_ms().saturating_add(interval.unwrap_or(60) * 1000));
684 }
685
686 let mut guard = self.routines.write().await;
687 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
688 drop(guard);
689
690 if let Err(error) = self.persist_routines().await {
691 let mut rollback = self.routines.write().await;
692 if let Some(previous) = previous {
693 rollback.insert(previous.routine_id.clone(), previous);
694 } else {
695 rollback.remove(&routine.routine_id);
696 }
697 return Err(RoutineStoreError::PersistFailed {
698 message: error.to_string(),
699 });
700 }
701
702 Ok(routine)
703 }
704
705 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
706 let mut rows = self
707 .routines
708 .read()
709 .await
710 .values()
711 .cloned()
712 .collect::<Vec<_>>();
713 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
714 rows
715 }
716
717 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
718 self.routines.read().await.get(routine_id).cloned()
719 }
720
721 pub async fn delete_routine(
722 &self,
723 routine_id: &str,
724 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
725 let mut guard = self.routines.write().await;
726 let removed = guard.remove(routine_id);
727 drop(guard);
728
729 if let Err(error) = self.persist_routines().await {
730 if let Some(removed) = removed.clone() {
731 self.routines
732 .write()
733 .await
734 .insert(removed.routine_id.clone(), removed);
735 }
736 return Err(RoutineStoreError::PersistFailed {
737 message: error.to_string(),
738 });
739 }
740 Ok(removed)
741 }
742
743 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
744 let mut plans = Vec::new();
745 let mut guard = self.routines.write().await;
746 for routine in guard.values_mut() {
747 if routine.status != RoutineStatus::Active {
748 continue;
749 }
750 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
751 continue;
752 };
753 let Some(interval_ms) = routine_interval_ms(&routine.schedule) else {
754 continue;
755 };
756 if now_ms < next_fire_at_ms {
757 continue;
758 }
759 let (run_count, next_fire_at_ms) = compute_misfire_plan(
760 now_ms,
761 next_fire_at_ms,
762 interval_ms,
763 &routine.misfire_policy,
764 );
765 routine.next_fire_at_ms = Some(next_fire_at_ms);
766 if run_count == 0 {
767 continue;
768 }
769 plans.push(RoutineTriggerPlan {
770 routine_id: routine.routine_id.clone(),
771 run_count,
772 scheduled_at_ms: now_ms,
773 next_fire_at_ms,
774 });
775 }
776 drop(guard);
777 let _ = self.persist_routines().await;
778 plans
779 }
780
781 pub async fn mark_routine_fired(
782 &self,
783 routine_id: &str,
784 fired_at_ms: u64,
785 ) -> Option<RoutineSpec> {
786 let mut guard = self.routines.write().await;
787 let routine = guard.get_mut(routine_id)?;
788 routine.last_fired_at_ms = Some(fired_at_ms);
789 let updated = routine.clone();
790 drop(guard);
791 let _ = self.persist_routines().await;
792 Some(updated)
793 }
794
795 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
796 let mut history = self.routine_history.write().await;
797 history
798 .entry(event.routine_id.clone())
799 .or_default()
800 .push(event);
801 }
802
803 pub async fn list_routine_history(
804 &self,
805 routine_id: &str,
806 limit: usize,
807 ) -> Vec<RoutineHistoryEvent> {
808 let limit = limit.clamp(1, 500);
809 let mut rows = self
810 .routine_history
811 .read()
812 .await
813 .get(routine_id)
814 .cloned()
815 .unwrap_or_default();
816 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
817 rows.truncate(limit);
818 rows
819 }
820}
821
822fn resolve_run_stale_ms() -> u64 {
823 std::env::var("TANDEM_RUN_STALE_MS")
824 .ok()
825 .and_then(|v| v.trim().parse::<u64>().ok())
826 .unwrap_or(120_000)
827 .clamp(30_000, 600_000)
828}
829
830fn resolve_shared_resources_path() -> PathBuf {
831 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
832 let trimmed = dir.trim();
833 if !trimmed.is_empty() {
834 return PathBuf::from(trimmed).join("shared_resources.json");
835 }
836 }
837 PathBuf::from(".tandem").join("shared_resources.json")
838}
839
840fn resolve_routines_path() -> PathBuf {
841 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
842 let trimmed = dir.trim();
843 if !trimmed.is_empty() {
844 return PathBuf::from(trimmed).join("routines.json");
845 }
846 }
847 PathBuf::from(".tandem").join("routines.json")
848}
849
850fn resolve_agent_team_audit_path() -> PathBuf {
851 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
852 let trimmed = base.trim();
853 if !trimmed.is_empty() {
854 return PathBuf::from(trimmed)
855 .join("agent-team")
856 .join("audit.log.jsonl");
857 }
858 }
859 PathBuf::from(".tandem")
860 .join("agent-team")
861 .join("audit.log.jsonl")
862}
863
864fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
865 match schedule {
866 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
867 RoutineSchedule::Cron { .. } => None,
868 }
869}
870
871fn compute_misfire_plan(
872 now_ms: u64,
873 next_fire_at_ms: u64,
874 interval_ms: u64,
875 policy: &RoutineMisfirePolicy,
876) -> (u32, u64) {
877 if now_ms < next_fire_at_ms || interval_ms == 0 {
878 return (0, next_fire_at_ms);
879 }
880 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
881 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
882 match policy {
883 RoutineMisfirePolicy::Skip => (0, aligned_next),
884 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
885 RoutineMisfirePolicy::CatchUp { max_runs } => {
886 let count = missed.min(u64::from(*max_runs)) as u32;
887 (count, aligned_next)
888 }
889 }
890}
891
892#[derive(Debug, Clone, PartialEq, Eq)]
893pub enum RoutineExecutionDecision {
894 Allowed,
895 RequiresApproval { reason: String },
896 Blocked { reason: String },
897}
898
899pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
900 let entrypoint = routine.entrypoint.to_ascii_lowercase();
901 if entrypoint.starts_with("connector.")
902 || entrypoint.starts_with("integration.")
903 || entrypoint.contains("external")
904 {
905 return true;
906 }
907 routine
908 .args
909 .get("uses_external_integrations")
910 .and_then(|v| v.as_bool())
911 .unwrap_or(false)
912 || routine
913 .args
914 .get("connector_id")
915 .and_then(|v| v.as_str())
916 .is_some()
917}
918
919pub fn evaluate_routine_execution_policy(
920 routine: &RoutineSpec,
921 trigger_type: &str,
922) -> RoutineExecutionDecision {
923 if !routine_uses_external_integrations(routine) {
924 return RoutineExecutionDecision::Allowed;
925 }
926 if !routine.external_integrations_allowed {
927 return RoutineExecutionDecision::Blocked {
928 reason: "external integrations are disabled by policy".to_string(),
929 };
930 }
931 if routine.requires_approval {
932 return RoutineExecutionDecision::RequiresApproval {
933 reason: format!(
934 "manual approval required before external side effects ({})",
935 trigger_type
936 ),
937 };
938 }
939 RoutineExecutionDecision::Allowed
940}
941
942fn is_valid_resource_key(key: &str) -> bool {
943 let trimmed = key.trim();
944 if trimmed.is_empty() {
945 return false;
946 }
947 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
948 if !allowed_prefix
949 .iter()
950 .any(|prefix| trimmed.starts_with(prefix))
951 {
952 return false;
953 }
954 !trimmed.contains("//")
955}
956
957impl Deref for AppState {
958 type Target = RuntimeState;
959
960 fn deref(&self) -> &Self::Target {
961 self.runtime
962 .get()
963 .expect("runtime accessed before startup completion")
964 }
965}
966
967fn extract_event_session_id(properties: &Value) -> Option<String> {
968 properties
969 .get("sessionID")
970 .or_else(|| properties.get("sessionId"))
971 .or_else(|| properties.get("id"))
972 .and_then(|v| v.as_str())
973 .map(|s| s.to_string())
974}
975
976fn extract_event_run_id(properties: &Value) -> Option<String> {
977 properties
978 .get("runID")
979 .or_else(|| properties.get("run_id"))
980 .and_then(|v| v.as_str())
981 .map(|s| s.to_string())
982}
983
984fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
985 let session_id = extract_event_session_id(&event.properties)?;
986 let run_id = extract_event_run_id(&event.properties);
987 let key = format!("run/{session_id}/status");
988
989 let mut base = serde_json::Map::new();
990 base.insert("sessionID".to_string(), Value::String(session_id));
991 if let Some(run_id) = run_id {
992 base.insert("runID".to_string(), Value::String(run_id));
993 }
994
995 match event.event_type.as_str() {
996 "session.run.started" => {
997 base.insert("state".to_string(), Value::String("running".to_string()));
998 base.insert("phase".to_string(), Value::String("run".to_string()));
999 base.insert(
1000 "eventType".to_string(),
1001 Value::String("session.run.started".to_string()),
1002 );
1003 Some(StatusIndexUpdate {
1004 key,
1005 value: Value::Object(base),
1006 })
1007 }
1008 "session.run.finished" => {
1009 base.insert("state".to_string(), Value::String("finished".to_string()));
1010 base.insert("phase".to_string(), Value::String("run".to_string()));
1011 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1012 base.insert("result".to_string(), Value::String(status.to_string()));
1013 }
1014 base.insert(
1015 "eventType".to_string(),
1016 Value::String("session.run.finished".to_string()),
1017 );
1018 Some(StatusIndexUpdate {
1019 key,
1020 value: Value::Object(base),
1021 })
1022 }
1023 "message.part.updated" => {
1024 let part_type = event
1025 .properties
1026 .get("part")
1027 .and_then(|v| v.get("type"))
1028 .and_then(|v| v.as_str())?;
1029 let (phase, tool_active) = match part_type {
1030 "tool-invocation" => ("tool", true),
1031 "tool-result" => ("run", false),
1032 _ => return None,
1033 };
1034 base.insert("state".to_string(), Value::String("running".to_string()));
1035 base.insert("phase".to_string(), Value::String(phase.to_string()));
1036 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1037 if let Some(tool_name) = event
1038 .properties
1039 .get("part")
1040 .and_then(|v| v.get("tool"))
1041 .and_then(|v| v.as_str())
1042 {
1043 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1044 }
1045 base.insert(
1046 "eventType".to_string(),
1047 Value::String("message.part.updated".to_string()),
1048 );
1049 Some(StatusIndexUpdate {
1050 key,
1051 value: Value::Object(base),
1052 })
1053 }
1054 _ => None,
1055 }
1056}
1057
1058pub async fn run_status_indexer(state: AppState) {
1059 let mut rx = state.event_bus.subscribe();
1060 loop {
1061 match rx.recv().await {
1062 Ok(event) => {
1063 if let Some(update) = derive_status_index_update(&event) {
1064 if let Err(error) = state
1065 .put_shared_resource(
1066 update.key,
1067 update.value,
1068 None,
1069 "system.status_indexer".to_string(),
1070 None,
1071 )
1072 .await
1073 {
1074 tracing::warn!("status indexer failed to persist update: {error:?}");
1075 }
1076 }
1077 }
1078 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1079 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1080 }
1081 }
1082}
1083
1084pub async fn run_agent_team_supervisor(state: AppState) {
1085 let mut rx = state.event_bus.subscribe();
1086 loop {
1087 match rx.recv().await {
1088 Ok(event) => {
1089 state.agent_teams.handle_engine_event(&state, &event).await;
1090 }
1091 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1092 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1093 }
1094 }
1095}
1096
1097pub async fn run_routine_scheduler(state: AppState) {
1098 loop {
1099 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1100 let now = now_ms();
1101 let plans = state.evaluate_routine_misfires(now).await;
1102 for plan in plans {
1103 let Some(routine) = state.get_routine(&plan.routine_id).await else {
1104 continue;
1105 };
1106 match evaluate_routine_execution_policy(&routine, "scheduled") {
1107 RoutineExecutionDecision::Allowed => {
1108 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
1109 state
1110 .append_routine_history(RoutineHistoryEvent {
1111 routine_id: plan.routine_id.clone(),
1112 trigger_type: "scheduled".to_string(),
1113 run_count: plan.run_count,
1114 fired_at_ms: now,
1115 status: "queued".to_string(),
1116 detail: None,
1117 })
1118 .await;
1119 state.event_bus.publish(EngineEvent::new(
1120 "routine.fired",
1121 serde_json::json!({
1122 "routineID": plan.routine_id,
1123 "runCount": plan.run_count,
1124 "scheduledAtMs": plan.scheduled_at_ms,
1125 "nextFireAtMs": plan.next_fire_at_ms,
1126 }),
1127 ));
1128 }
1129 RoutineExecutionDecision::RequiresApproval { reason } => {
1130 state
1131 .append_routine_history(RoutineHistoryEvent {
1132 routine_id: plan.routine_id.clone(),
1133 trigger_type: "scheduled".to_string(),
1134 run_count: plan.run_count,
1135 fired_at_ms: now,
1136 status: "pending_approval".to_string(),
1137 detail: Some(reason.clone()),
1138 })
1139 .await;
1140 state.event_bus.publish(EngineEvent::new(
1141 "routine.approval_required",
1142 serde_json::json!({
1143 "routineID": plan.routine_id,
1144 "runCount": plan.run_count,
1145 "triggerType": "scheduled",
1146 "reason": reason,
1147 }),
1148 ));
1149 }
1150 RoutineExecutionDecision::Blocked { reason } => {
1151 state
1152 .append_routine_history(RoutineHistoryEvent {
1153 routine_id: plan.routine_id.clone(),
1154 trigger_type: "scheduled".to_string(),
1155 run_count: plan.run_count,
1156 fired_at_ms: now,
1157 status: "blocked_policy".to_string(),
1158 detail: Some(reason.clone()),
1159 })
1160 .await;
1161 state.event_bus.publish(EngineEvent::new(
1162 "routine.blocked",
1163 serde_json::json!({
1164 "routineID": plan.routine_id,
1165 "runCount": plan.run_count,
1166 "triggerType": "scheduled",
1167 "reason": reason,
1168 }),
1169 ));
1170 }
1171 }
1172 }
1173 }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178 use super::*;
1179
1180 fn test_state_with_path(path: PathBuf) -> AppState {
1181 let mut state = AppState::new_starting("test-attempt".to_string(), true);
1182 state.shared_resources_path = path;
1183 state.routines_path = tmp_routines_file("shared-state");
1184 state
1185 }
1186
1187 fn tmp_resource_file(name: &str) -> PathBuf {
1188 std::env::temp_dir().join(format!(
1189 "tandem-server-{name}-{}.json",
1190 uuid::Uuid::new_v4()
1191 ))
1192 }
1193
1194 fn tmp_routines_file(name: &str) -> PathBuf {
1195 std::env::temp_dir().join(format!(
1196 "tandem-server-routines-{name}-{}.json",
1197 uuid::Uuid::new_v4()
1198 ))
1199 }
1200
1201 #[tokio::test]
1202 async fn shared_resource_put_increments_revision() {
1203 let path = tmp_resource_file("shared-resource-put");
1204 let state = test_state_with_path(path.clone());
1205
1206 let first = state
1207 .put_shared_resource(
1208 "project/demo/board".to_string(),
1209 serde_json::json!({"status":"todo"}),
1210 None,
1211 "agent-1".to_string(),
1212 None,
1213 )
1214 .await
1215 .expect("first put");
1216 assert_eq!(first.rev, 1);
1217
1218 let second = state
1219 .put_shared_resource(
1220 "project/demo/board".to_string(),
1221 serde_json::json!({"status":"doing"}),
1222 Some(1),
1223 "agent-2".to_string(),
1224 Some(60_000),
1225 )
1226 .await
1227 .expect("second put");
1228 assert_eq!(second.rev, 2);
1229 assert_eq!(second.updated_by, "agent-2");
1230 assert_eq!(second.ttl_ms, Some(60_000));
1231
1232 let raw = tokio::fs::read_to_string(path.clone())
1233 .await
1234 .expect("persisted");
1235 assert!(raw.contains("\"rev\": 2"));
1236 let _ = tokio::fs::remove_file(path).await;
1237 }
1238
1239 #[tokio::test]
1240 async fn shared_resource_put_detects_revision_conflict() {
1241 let path = tmp_resource_file("shared-resource-conflict");
1242 let state = test_state_with_path(path.clone());
1243
1244 let _ = state
1245 .put_shared_resource(
1246 "mission/demo/card-1".to_string(),
1247 serde_json::json!({"title":"Card 1"}),
1248 None,
1249 "agent-1".to_string(),
1250 None,
1251 )
1252 .await
1253 .expect("seed put");
1254
1255 let conflict = state
1256 .put_shared_resource(
1257 "mission/demo/card-1".to_string(),
1258 serde_json::json!({"title":"Card 1 edited"}),
1259 Some(99),
1260 "agent-2".to_string(),
1261 None,
1262 )
1263 .await
1264 .expect_err("expected conflict");
1265
1266 match conflict {
1267 ResourceStoreError::RevisionConflict(conflict) => {
1268 assert_eq!(conflict.expected_rev, Some(99));
1269 assert_eq!(conflict.current_rev, Some(1));
1270 }
1271 other => panic!("unexpected error: {other:?}"),
1272 }
1273
1274 let _ = tokio::fs::remove_file(path).await;
1275 }
1276
1277 #[tokio::test]
1278 async fn shared_resource_rejects_invalid_namespace_key() {
1279 let path = tmp_resource_file("shared-resource-invalid-key");
1280 let state = test_state_with_path(path.clone());
1281
1282 let error = state
1283 .put_shared_resource(
1284 "global/demo/key".to_string(),
1285 serde_json::json!({"x":1}),
1286 None,
1287 "agent-1".to_string(),
1288 None,
1289 )
1290 .await
1291 .expect_err("invalid key should fail");
1292
1293 match error {
1294 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
1295 other => panic!("unexpected error: {other:?}"),
1296 }
1297
1298 assert!(!path.exists());
1299 }
1300
1301 #[test]
1302 fn derive_status_index_update_for_run_started() {
1303 let event = EngineEvent::new(
1304 "session.run.started",
1305 serde_json::json!({
1306 "sessionID": "s-1",
1307 "runID": "r-1"
1308 }),
1309 );
1310 let update = derive_status_index_update(&event).expect("update");
1311 assert_eq!(update.key, "run/s-1/status");
1312 assert_eq!(
1313 update.value.get("state").and_then(|v| v.as_str()),
1314 Some("running")
1315 );
1316 assert_eq!(
1317 update.value.get("phase").and_then(|v| v.as_str()),
1318 Some("run")
1319 );
1320 }
1321
1322 #[test]
1323 fn derive_status_index_update_for_tool_invocation() {
1324 let event = EngineEvent::new(
1325 "message.part.updated",
1326 serde_json::json!({
1327 "sessionID": "s-2",
1328 "runID": "r-2",
1329 "part": { "type": "tool-invocation", "tool": "todo_write" }
1330 }),
1331 );
1332 let update = derive_status_index_update(&event).expect("update");
1333 assert_eq!(update.key, "run/s-2/status");
1334 assert_eq!(
1335 update.value.get("phase").and_then(|v| v.as_str()),
1336 Some("tool")
1337 );
1338 assert_eq!(
1339 update.value.get("toolActive").and_then(|v| v.as_bool()),
1340 Some(true)
1341 );
1342 assert_eq!(
1343 update.value.get("tool").and_then(|v| v.as_str()),
1344 Some("todo_write")
1345 );
1346 }
1347
1348 #[test]
1349 fn misfire_skip_drops_runs_and_advances_next_fire() {
1350 let (count, next_fire) =
1351 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
1352 assert_eq!(count, 0);
1353 assert_eq!(next_fire, 11_000);
1354 }
1355
1356 #[test]
1357 fn misfire_run_once_emits_single_trigger() {
1358 let (count, next_fire) =
1359 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
1360 assert_eq!(count, 1);
1361 assert_eq!(next_fire, 11_000);
1362 }
1363
1364 #[test]
1365 fn misfire_catch_up_caps_trigger_count() {
1366 let (count, next_fire) = compute_misfire_plan(
1367 25_000,
1368 5_000,
1369 1_000,
1370 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
1371 );
1372 assert_eq!(count, 3);
1373 assert_eq!(next_fire, 26_000);
1374 }
1375
1376 #[tokio::test]
1377 async fn routine_put_persists_and_loads() {
1378 let routines_path = tmp_routines_file("persist-load");
1379 let mut state = AppState::new_starting("routines-put".to_string(), true);
1380 state.routines_path = routines_path.clone();
1381
1382 let routine = RoutineSpec {
1383 routine_id: "routine-1".to_string(),
1384 name: "Digest".to_string(),
1385 status: RoutineStatus::Active,
1386 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
1387 timezone: "UTC".to_string(),
1388 misfire_policy: RoutineMisfirePolicy::RunOnce,
1389 entrypoint: "mission.default".to_string(),
1390 args: serde_json::json!({"topic":"status"}),
1391 creator_type: "user".to_string(),
1392 creator_id: "user-1".to_string(),
1393 requires_approval: true,
1394 external_integrations_allowed: false,
1395 next_fire_at_ms: Some(5_000),
1396 last_fired_at_ms: None,
1397 };
1398
1399 state.put_routine(routine).await.expect("store routine");
1400
1401 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
1402 reloaded.routines_path = routines_path.clone();
1403 reloaded.load_routines().await.expect("load routines");
1404 let list = reloaded.list_routines().await;
1405 assert_eq!(list.len(), 1);
1406 assert_eq!(list[0].routine_id, "routine-1");
1407
1408 let _ = tokio::fs::remove_file(routines_path).await;
1409 }
1410
1411 #[tokio::test]
1412 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
1413 let routines_path = tmp_routines_file("misfire-eval");
1414 let mut state = AppState::new_starting("routines-eval".to_string(), true);
1415 state.routines_path = routines_path.clone();
1416
1417 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
1418 routine_id: id.to_string(),
1419 name: id.to_string(),
1420 status: RoutineStatus::Active,
1421 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
1422 timezone: "UTC".to_string(),
1423 misfire_policy: policy,
1424 entrypoint: "mission.default".to_string(),
1425 args: serde_json::json!({}),
1426 creator_type: "user".to_string(),
1427 creator_id: "u-1".to_string(),
1428 requires_approval: false,
1429 external_integrations_allowed: false,
1430 next_fire_at_ms: Some(5_000),
1431 last_fired_at_ms: None,
1432 };
1433
1434 state
1435 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
1436 .await
1437 .expect("put skip");
1438 state
1439 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
1440 .await
1441 .expect("put once");
1442 state
1443 .put_routine(base(
1444 "routine-catch",
1445 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
1446 ))
1447 .await
1448 .expect("put catch");
1449
1450 let plans = state.evaluate_routine_misfires(10_500).await;
1451 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
1452 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
1453 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
1454
1455 assert!(plan_skip.is_none());
1456 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
1457 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
1458
1459 let stored = state.list_routines().await;
1460 let skip_next = stored
1461 .iter()
1462 .find(|r| r.routine_id == "routine-skip")
1463 .and_then(|r| r.next_fire_at_ms)
1464 .expect("skip next");
1465 assert!(skip_next > 10_500);
1466
1467 let _ = tokio::fs::remove_file(routines_path).await;
1468 }
1469
1470 #[test]
1471 fn routine_policy_blocks_external_side_effects_by_default() {
1472 let routine = RoutineSpec {
1473 routine_id: "routine-policy-1".to_string(),
1474 name: "Connector routine".to_string(),
1475 status: RoutineStatus::Active,
1476 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
1477 timezone: "UTC".to_string(),
1478 misfire_policy: RoutineMisfirePolicy::RunOnce,
1479 entrypoint: "connector.email.reply".to_string(),
1480 args: serde_json::json!({}),
1481 creator_type: "user".to_string(),
1482 creator_id: "u-1".to_string(),
1483 requires_approval: true,
1484 external_integrations_allowed: false,
1485 next_fire_at_ms: None,
1486 last_fired_at_ms: None,
1487 };
1488
1489 let decision = evaluate_routine_execution_policy(&routine, "manual");
1490 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
1491 }
1492
1493 #[test]
1494 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
1495 let routine = RoutineSpec {
1496 routine_id: "routine-policy-2".to_string(),
1497 name: "Connector routine".to_string(),
1498 status: RoutineStatus::Active,
1499 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
1500 timezone: "UTC".to_string(),
1501 misfire_policy: RoutineMisfirePolicy::RunOnce,
1502 entrypoint: "connector.email.reply".to_string(),
1503 args: serde_json::json!({}),
1504 creator_type: "user".to_string(),
1505 creator_id: "u-1".to_string(),
1506 requires_approval: true,
1507 external_integrations_allowed: true,
1508 next_fire_at_ms: None,
1509 last_fired_at_ms: None,
1510 };
1511
1512 let decision = evaluate_routine_execution_policy(&routine, "manual");
1513 assert!(matches!(
1514 decision,
1515 RoutineExecutionDecision::RequiresApproval { .. }
1516 ));
1517 }
1518
1519 #[test]
1520 fn routine_policy_allows_non_external_entrypoints() {
1521 let routine = RoutineSpec {
1522 routine_id: "routine-policy-3".to_string(),
1523 name: "Internal mission routine".to_string(),
1524 status: RoutineStatus::Active,
1525 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
1526 timezone: "UTC".to_string(),
1527 misfire_policy: RoutineMisfirePolicy::RunOnce,
1528 entrypoint: "mission.default".to_string(),
1529 args: serde_json::json!({}),
1530 creator_type: "user".to_string(),
1531 creator_id: "u-1".to_string(),
1532 requires_approval: true,
1533 external_integrations_allowed: false,
1534 next_fire_at_ms: None,
1535 last_fired_at_ms: None,
1536 };
1537
1538 let decision = evaluate_routine_execution_policy(&routine, "manual");
1539 assert_eq!(decision, RoutineExecutionDecision::Allowed);
1540 }
1541}