1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
12use tandem_orchestrator::MissionState;
13use tandem_types::{
14 EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
15 SendMessageRequest, Session, ShellFamily,
16};
17use tokio::fs;
18use tokio::sync::RwLock;
19
20use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
21use tandem_core::{
22 resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
23 PermissionManager, PluginRegistry, Storage,
24};
25use tandem_providers::ProviderRegistry;
26use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
27use tandem_tools::ToolRegistry;
28
29mod agent_teams;
30mod http;
31pub mod webui;
32
33pub use agent_teams::AgentTeamRuntime;
34pub use http::serve;
35
36#[derive(Debug, Clone, Serialize, Deserialize, Default)]
37pub struct ChannelStatus {
38 pub enabled: bool,
39 pub connected: bool,
40 pub last_error: Option<String>,
41 pub active_sessions: u64,
42 pub meta: Value,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Default)]
46pub struct WebUiConfig {
47 #[serde(default)]
48 pub enabled: bool,
49 #[serde(default = "default_web_ui_prefix")]
50 pub path_prefix: String,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, Default)]
54pub struct ChannelsConfigFile {
55 pub telegram: Option<TelegramConfigFile>,
56 pub discord: Option<DiscordConfigFile>,
57 pub slack: Option<SlackConfigFile>,
58 #[serde(default)]
59 pub tool_policy: tandem_channels::config::ChannelToolPolicy,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TelegramConfigFile {
64 pub bot_token: String,
65 #[serde(default = "default_allow_all")]
66 pub allowed_users: Vec<String>,
67 #[serde(default)]
68 pub mention_only: bool,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct DiscordConfigFile {
73 pub bot_token: String,
74 #[serde(default)]
75 pub guild_id: Option<String>,
76 #[serde(default = "default_allow_all")]
77 pub allowed_users: Vec<String>,
78 #[serde(default = "default_discord_mention_only")]
79 pub mention_only: bool,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct SlackConfigFile {
84 pub bot_token: String,
85 pub channel_id: String,
86 #[serde(default = "default_allow_all")]
87 pub allowed_users: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, Default)]
91struct EffectiveAppConfig {
92 #[serde(default)]
93 pub channels: ChannelsConfigFile,
94 #[serde(default)]
95 pub web_ui: WebUiConfig,
96 #[serde(default)]
97 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
98}
99
100#[derive(Default)]
101pub struct ChannelRuntime {
102 pub listeners: Option<tokio::task::JoinSet<()>>,
103 pub statuses: std::collections::HashMap<String, ChannelStatus>,
104}
105
106#[derive(Debug, Clone)]
107pub struct EngineLease {
108 pub lease_id: String,
109 pub client_id: String,
110 pub client_type: String,
111 pub acquired_at_ms: u64,
112 pub last_renewed_at_ms: u64,
113 pub ttl_ms: u64,
114}
115
116impl EngineLease {
117 pub fn is_expired(&self, now_ms: u64) -> bool {
118 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
119 }
120}
121
122#[derive(Debug, Clone, Serialize)]
123pub struct ActiveRun {
124 #[serde(rename = "runID")]
125 pub run_id: String,
126 #[serde(rename = "startedAtMs")]
127 pub started_at_ms: u64,
128 #[serde(rename = "lastActivityAtMs")]
129 pub last_activity_at_ms: u64,
130 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
131 pub client_id: Option<String>,
132 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
133 pub agent_id: Option<String>,
134 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
135 pub agent_profile: Option<String>,
136}
137
138#[derive(Clone, Default)]
139pub struct RunRegistry {
140 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
141}
142
143impl RunRegistry {
144 pub fn new() -> Self {
145 Self::default()
146 }
147
148 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
149 self.active.read().await.get(session_id).cloned()
150 }
151
152 pub async fn acquire(
153 &self,
154 session_id: &str,
155 run_id: String,
156 client_id: Option<String>,
157 agent_id: Option<String>,
158 agent_profile: Option<String>,
159 ) -> std::result::Result<ActiveRun, ActiveRun> {
160 let mut guard = self.active.write().await;
161 if let Some(existing) = guard.get(session_id).cloned() {
162 return Err(existing);
163 }
164 let now = now_ms();
165 let run = ActiveRun {
166 run_id,
167 started_at_ms: now,
168 last_activity_at_ms: now,
169 client_id,
170 agent_id,
171 agent_profile,
172 };
173 guard.insert(session_id.to_string(), run.clone());
174 Ok(run)
175 }
176
177 pub async fn touch(&self, session_id: &str, run_id: &str) {
178 let mut guard = self.active.write().await;
179 if let Some(run) = guard.get_mut(session_id) {
180 if run.run_id == run_id {
181 run.last_activity_at_ms = now_ms();
182 }
183 }
184 }
185
186 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
187 let mut guard = self.active.write().await;
188 if let Some(run) = guard.get(session_id) {
189 if run.run_id == run_id {
190 return guard.remove(session_id);
191 }
192 }
193 None
194 }
195
196 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
197 self.active.write().await.remove(session_id)
198 }
199
200 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
201 let now = now_ms();
202 let mut guard = self.active.write().await;
203 let stale_ids = guard
204 .iter()
205 .filter_map(|(session_id, run)| {
206 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
207 Some(session_id.clone())
208 } else {
209 None
210 }
211 })
212 .collect::<Vec<_>>();
213 let mut out = Vec::with_capacity(stale_ids.len());
214 for session_id in stale_ids {
215 if let Some(run) = guard.remove(&session_id) {
216 out.push((session_id, run));
217 }
218 }
219 out
220 }
221}
222
223pub fn now_ms() -> u64 {
224 SystemTime::now()
225 .duration_since(UNIX_EPOCH)
226 .map(|d| d.as_millis() as u64)
227 .unwrap_or(0)
228}
229
230pub fn build_id() -> String {
231 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
232 let trimmed = explicit.trim();
233 if !trimmed.is_empty() {
234 return trimmed.to_string();
235 }
236 }
237 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
238 let trimmed = git_sha.trim();
239 if !trimmed.is_empty() {
240 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
241 }
242 }
243 env!("CARGO_PKG_VERSION").to_string()
244}
245
246pub fn detect_host_runtime_context() -> HostRuntimeContext {
247 let os = if cfg!(target_os = "windows") {
248 HostOs::Windows
249 } else if cfg!(target_os = "macos") {
250 HostOs::Macos
251 } else {
252 HostOs::Linux
253 };
254 let (shell_family, path_style) = match os {
255 HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
256 HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
257 };
258 HostRuntimeContext {
259 os,
260 arch: std::env::consts::ARCH.to_string(),
261 shell_family,
262 path_style,
263 }
264}
265
266pub fn binary_path_for_health() -> Option<String> {
267 #[cfg(debug_assertions)]
268 {
269 std::env::current_exe()
270 .ok()
271 .map(|p| p.to_string_lossy().to_string())
272 }
273 #[cfg(not(debug_assertions))]
274 {
275 None
276 }
277}
278
279#[derive(Clone)]
280pub struct RuntimeState {
281 pub storage: Arc<Storage>,
282 pub config: ConfigStore,
283 pub event_bus: EventBus,
284 pub providers: ProviderRegistry,
285 pub plugins: PluginRegistry,
286 pub agents: AgentRegistry,
287 pub tools: ToolRegistry,
288 pub permissions: PermissionManager,
289 pub mcp: McpRegistry,
290 pub pty: PtyManager,
291 pub lsp: LspManager,
292 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
293 pub logs: Arc<RwLock<Vec<Value>>>,
294 pub workspace_index: WorkspaceIndex,
295 pub cancellations: CancellationRegistry,
296 pub engine_loop: EngineLoop,
297 pub host_runtime_context: HostRuntimeContext,
298}
299
300#[derive(Debug, Clone)]
301pub struct GovernedMemoryRecord {
302 pub id: String,
303 pub run_id: String,
304 pub partition: MemoryPartition,
305 pub kind: MemoryContentKind,
306 pub content: String,
307 pub artifact_refs: Vec<String>,
308 pub classification: MemoryClassification,
309 pub metadata: Option<Value>,
310 pub source_memory_id: Option<String>,
311 pub created_at_ms: u64,
312}
313
314#[derive(Debug, Clone, Serialize)]
315pub struct MemoryAuditEvent {
316 pub audit_id: String,
317 pub action: String,
318 pub run_id: String,
319 pub memory_id: Option<String>,
320 pub source_memory_id: Option<String>,
321 pub to_tier: Option<GovernedMemoryTier>,
322 pub partition_key: String,
323 pub actor: String,
324 pub status: String,
325 #[serde(skip_serializing_if = "Option::is_none")]
326 pub detail: Option<String>,
327 pub created_at_ms: u64,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct SharedResourceRecord {
332 pub key: String,
333 pub value: Value,
334 pub rev: u64,
335 pub updated_at_ms: u64,
336 pub updated_by: String,
337 #[serde(skip_serializing_if = "Option::is_none")]
338 pub ttl_ms: Option<u64>,
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
342#[serde(rename_all = "snake_case")]
343pub enum RoutineSchedule {
344 IntervalSeconds { seconds: u64 },
345 Cron { expression: String },
346}
347
348#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
349#[serde(rename_all = "snake_case", tag = "type")]
350pub enum RoutineMisfirePolicy {
351 Skip,
352 RunOnce,
353 CatchUp { max_runs: u32 },
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
357#[serde(rename_all = "snake_case")]
358pub enum RoutineStatus {
359 Active,
360 Paused,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct RoutineSpec {
365 pub routine_id: String,
366 pub name: String,
367 pub status: RoutineStatus,
368 pub schedule: RoutineSchedule,
369 pub timezone: String,
370 pub misfire_policy: RoutineMisfirePolicy,
371 pub entrypoint: String,
372 #[serde(default)]
373 pub args: Value,
374 #[serde(default)]
375 pub allowed_tools: Vec<String>,
376 #[serde(default)]
377 pub output_targets: Vec<String>,
378 pub creator_type: String,
379 pub creator_id: String,
380 pub requires_approval: bool,
381 pub external_integrations_allowed: bool,
382 #[serde(default, skip_serializing_if = "Option::is_none")]
383 pub next_fire_at_ms: Option<u64>,
384 #[serde(default, skip_serializing_if = "Option::is_none")]
385 pub last_fired_at_ms: Option<u64>,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct RoutineHistoryEvent {
390 pub routine_id: String,
391 pub trigger_type: String,
392 pub run_count: u32,
393 pub fired_at_ms: u64,
394 pub status: String,
395 #[serde(default, skip_serializing_if = "Option::is_none")]
396 pub detail: Option<String>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineRunStatus {
402 Queued,
403 PendingApproval,
404 Running,
405 Paused,
406 BlockedPolicy,
407 Denied,
408 Completed,
409 Failed,
410 Cancelled,
411}
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
414pub struct RoutineRunArtifact {
415 pub artifact_id: String,
416 pub uri: String,
417 pub kind: String,
418 #[serde(default, skip_serializing_if = "Option::is_none")]
419 pub label: Option<String>,
420 pub created_at_ms: u64,
421 #[serde(default, skip_serializing_if = "Option::is_none")]
422 pub metadata: Option<Value>,
423}
424
425#[derive(Debug, Clone, Serialize, Deserialize)]
426pub struct RoutineRunRecord {
427 pub run_id: String,
428 pub routine_id: String,
429 pub trigger_type: String,
430 pub run_count: u32,
431 pub status: RoutineRunStatus,
432 pub created_at_ms: u64,
433 pub updated_at_ms: u64,
434 #[serde(default, skip_serializing_if = "Option::is_none")]
435 pub fired_at_ms: Option<u64>,
436 #[serde(default, skip_serializing_if = "Option::is_none")]
437 pub started_at_ms: Option<u64>,
438 #[serde(default, skip_serializing_if = "Option::is_none")]
439 pub finished_at_ms: Option<u64>,
440 pub requires_approval: bool,
441 #[serde(default, skip_serializing_if = "Option::is_none")]
442 pub approval_reason: Option<String>,
443 #[serde(default, skip_serializing_if = "Option::is_none")]
444 pub denial_reason: Option<String>,
445 #[serde(default, skip_serializing_if = "Option::is_none")]
446 pub paused_reason: Option<String>,
447 #[serde(default, skip_serializing_if = "Option::is_none")]
448 pub detail: Option<String>,
449 pub entrypoint: String,
450 #[serde(default)]
451 pub args: Value,
452 #[serde(default)]
453 pub allowed_tools: Vec<String>,
454 #[serde(default)]
455 pub output_targets: Vec<String>,
456 #[serde(default)]
457 pub artifacts: Vec<RoutineRunArtifact>,
458}
459
460#[derive(Debug, Clone)]
461pub struct RoutineSessionPolicy {
462 pub session_id: String,
463 pub run_id: String,
464 pub routine_id: String,
465 pub allowed_tools: Vec<String>,
466}
467
468#[derive(Debug, Clone, Serialize)]
469pub struct RoutineTriggerPlan {
470 pub routine_id: String,
471 pub run_count: u32,
472 pub scheduled_at_ms: u64,
473 pub next_fire_at_ms: u64,
474}
475
476#[derive(Debug, Clone, Serialize)]
477pub struct ResourceConflict {
478 pub key: String,
479 pub expected_rev: Option<u64>,
480 pub current_rev: Option<u64>,
481}
482
483#[derive(Debug, Clone, Serialize)]
484#[serde(tag = "type", rename_all = "snake_case")]
485pub enum ResourceStoreError {
486 InvalidKey { key: String },
487 RevisionConflict(ResourceConflict),
488 PersistFailed { message: String },
489}
490
491#[derive(Debug, Clone, Serialize)]
492#[serde(tag = "type", rename_all = "snake_case")]
493pub enum RoutineStoreError {
494 InvalidRoutineId { routine_id: String },
495 InvalidSchedule { detail: String },
496 PersistFailed { message: String },
497}
498
499#[derive(Debug, Clone)]
500pub enum StartupStatus {
501 Starting,
502 Ready,
503 Failed,
504}
505
506#[derive(Debug, Clone)]
507pub struct StartupState {
508 pub status: StartupStatus,
509 pub phase: String,
510 pub started_at_ms: u64,
511 pub attempt_id: String,
512 pub last_error: Option<String>,
513}
514
515#[derive(Debug, Clone)]
516pub struct StartupSnapshot {
517 pub status: StartupStatus,
518 pub phase: String,
519 pub started_at_ms: u64,
520 pub attempt_id: String,
521 pub last_error: Option<String>,
522 pub elapsed_ms: u64,
523}
524
525#[derive(Clone)]
526pub struct AppState {
527 pub runtime: Arc<OnceLock<RuntimeState>>,
528 pub startup: Arc<RwLock<StartupState>>,
529 pub in_process_mode: Arc<AtomicBool>,
530 pub api_token: Arc<RwLock<Option<String>>>,
531 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
532 pub run_registry: RunRegistry,
533 pub run_stale_ms: u64,
534 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
535 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
536 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
537 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
538 pub shared_resources_path: PathBuf,
539 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
540 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
541 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
542 pub routine_session_policies:
543 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
544 pub routines_path: PathBuf,
545 pub routine_history_path: PathBuf,
546 pub routine_runs_path: PathBuf,
547 pub agent_teams: AgentTeamRuntime,
548 pub web_ui_enabled: Arc<AtomicBool>,
549 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
550 pub server_base_url: Arc<std::sync::RwLock<String>>,
551 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
552 pub host_runtime_context: HostRuntimeContext,
553}
554
555#[derive(Debug, Clone)]
556struct StatusIndexUpdate {
557 key: String,
558 value: Value,
559}
560
561impl AppState {
562 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
563 Self {
564 runtime: Arc::new(OnceLock::new()),
565 startup: Arc::new(RwLock::new(StartupState {
566 status: StartupStatus::Starting,
567 phase: "boot".to_string(),
568 started_at_ms: now_ms(),
569 attempt_id,
570 last_error: None,
571 })),
572 in_process_mode: Arc::new(AtomicBool::new(in_process)),
573 api_token: Arc::new(RwLock::new(None)),
574 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
575 run_registry: RunRegistry::new(),
576 run_stale_ms: resolve_run_stale_ms(),
577 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
578 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
579 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
580 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
581 shared_resources_path: resolve_shared_resources_path(),
582 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
583 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
584 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
585 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
586 routines_path: resolve_routines_path(),
587 routine_history_path: resolve_routine_history_path(),
588 routine_runs_path: resolve_routine_runs_path(),
589 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
590 web_ui_enabled: Arc::new(AtomicBool::new(false)),
591 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
592 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
593 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
594 host_runtime_context: detect_host_runtime_context(),
595 }
596 }
597
598 pub fn is_ready(&self) -> bool {
599 self.runtime.get().is_some()
600 }
601
602 pub fn mode_label(&self) -> &'static str {
603 if self.in_process_mode.load(Ordering::Relaxed) {
604 "in-process"
605 } else {
606 "sidecar"
607 }
608 }
609
610 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
611 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
612 if let Ok(mut guard) = self.web_ui_prefix.write() {
613 *guard = normalize_web_ui_prefix(&prefix);
614 }
615 }
616
617 pub fn web_ui_enabled(&self) -> bool {
618 self.web_ui_enabled.load(Ordering::Relaxed)
619 }
620
621 pub fn web_ui_prefix(&self) -> String {
622 self.web_ui_prefix
623 .read()
624 .map(|v| v.clone())
625 .unwrap_or_else(|_| "/admin".to_string())
626 }
627
628 pub fn set_server_base_url(&self, base_url: String) {
629 if let Ok(mut guard) = self.server_base_url.write() {
630 *guard = base_url;
631 }
632 }
633
634 pub fn server_base_url(&self) -> String {
635 self.server_base_url
636 .read()
637 .map(|v| v.clone())
638 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
639 }
640
641 pub async fn api_token(&self) -> Option<String> {
642 self.api_token.read().await.clone()
643 }
644
645 pub async fn set_api_token(&self, token: Option<String>) {
646 *self.api_token.write().await = token;
647 }
648
649 pub async fn startup_snapshot(&self) -> StartupSnapshot {
650 let state = self.startup.read().await.clone();
651 StartupSnapshot {
652 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
653 status: state.status,
654 phase: state.phase,
655 started_at_ms: state.started_at_ms,
656 attempt_id: state.attempt_id,
657 last_error: state.last_error,
658 }
659 }
660
661 pub fn host_runtime_context(&self) -> HostRuntimeContext {
662 self.runtime
663 .get()
664 .map(|runtime| runtime.host_runtime_context.clone())
665 .unwrap_or_else(|| self.host_runtime_context.clone())
666 }
667
668 pub async fn set_phase(&self, phase: impl Into<String>) {
669 let mut startup = self.startup.write().await;
670 startup.phase = phase.into();
671 }
672
673 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
674 self.runtime
675 .set(runtime)
676 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
677 self.engine_loop
678 .set_spawn_agent_hook(std::sync::Arc::new(
679 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
680 ))
681 .await;
682 self.engine_loop
683 .set_tool_policy_hook(std::sync::Arc::new(
684 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
685 ))
686 .await;
687 let _ = self.load_shared_resources().await;
688 let _ = self.load_routines().await;
689 let _ = self.load_routine_history().await;
690 let _ = self.load_routine_runs().await;
691 let workspace_root = self.workspace_index.snapshot().await.root;
692 let _ = self
693 .agent_teams
694 .ensure_loaded_for_workspace(&workspace_root)
695 .await;
696 let mut startup = self.startup.write().await;
697 startup.status = StartupStatus::Ready;
698 startup.phase = "ready".to_string();
699 startup.last_error = None;
700 Ok(())
701 }
702
703 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
704 let mut startup = self.startup.write().await;
705 startup.status = StartupStatus::Failed;
706 startup.phase = phase.into();
707 startup.last_error = Some(error.into());
708 }
709
710 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
711 let runtime = self.channels_runtime.lock().await;
712 runtime.statuses.clone()
713 }
714
715 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
716 let effective = self.config.get_effective_value().await;
717 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
718 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
719
720 let mut runtime = self.channels_runtime.lock().await;
721 if let Some(listeners) = runtime.listeners.as_mut() {
722 listeners.abort_all();
723 }
724 runtime.listeners = None;
725 runtime.statuses.clear();
726
727 let mut status_map = std::collections::HashMap::new();
728 status_map.insert(
729 "telegram".to_string(),
730 ChannelStatus {
731 enabled: parsed.channels.telegram.is_some(),
732 connected: false,
733 last_error: None,
734 active_sessions: 0,
735 meta: serde_json::json!({}),
736 },
737 );
738 status_map.insert(
739 "discord".to_string(),
740 ChannelStatus {
741 enabled: parsed.channels.discord.is_some(),
742 connected: false,
743 last_error: None,
744 active_sessions: 0,
745 meta: serde_json::json!({}),
746 },
747 );
748 status_map.insert(
749 "slack".to_string(),
750 ChannelStatus {
751 enabled: parsed.channels.slack.is_some(),
752 connected: false,
753 last_error: None,
754 active_sessions: 0,
755 meta: serde_json::json!({}),
756 },
757 );
758
759 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
760 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
761 runtime.listeners = Some(listeners);
762 for status in status_map.values_mut() {
763 if status.enabled {
764 status.connected = true;
765 }
766 }
767 }
768
769 runtime.statuses = status_map.clone();
770 drop(runtime);
771
772 self.event_bus.publish(EngineEvent::new(
773 "channel.status.changed",
774 serde_json::json!({ "channels": status_map }),
775 ));
776 Ok(())
777 }
778
779 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
780 if !self.shared_resources_path.exists() {
781 return Ok(());
782 }
783 let raw = fs::read_to_string(&self.shared_resources_path).await?;
784 let parsed =
785 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
786 .unwrap_or_default();
787 let mut guard = self.shared_resources.write().await;
788 *guard = parsed;
789 Ok(())
790 }
791
792 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
793 if let Some(parent) = self.shared_resources_path.parent() {
794 fs::create_dir_all(parent).await?;
795 }
796 let payload = {
797 let guard = self.shared_resources.read().await;
798 serde_json::to_string_pretty(&*guard)?
799 };
800 fs::write(&self.shared_resources_path, payload).await?;
801 Ok(())
802 }
803
804 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
805 self.shared_resources.read().await.get(key).cloned()
806 }
807
808 pub async fn list_shared_resources(
809 &self,
810 prefix: Option<&str>,
811 limit: usize,
812 ) -> Vec<SharedResourceRecord> {
813 let limit = limit.clamp(1, 500);
814 let mut rows = self
815 .shared_resources
816 .read()
817 .await
818 .values()
819 .filter(|record| {
820 if let Some(prefix) = prefix {
821 record.key.starts_with(prefix)
822 } else {
823 true
824 }
825 })
826 .cloned()
827 .collect::<Vec<_>>();
828 rows.sort_by(|a, b| a.key.cmp(&b.key));
829 rows.truncate(limit);
830 rows
831 }
832
833 pub async fn put_shared_resource(
834 &self,
835 key: String,
836 value: Value,
837 if_match_rev: Option<u64>,
838 updated_by: String,
839 ttl_ms: Option<u64>,
840 ) -> Result<SharedResourceRecord, ResourceStoreError> {
841 if !is_valid_resource_key(&key) {
842 return Err(ResourceStoreError::InvalidKey { key });
843 }
844
845 let now = now_ms();
846 let mut guard = self.shared_resources.write().await;
847 let existing = guard.get(&key).cloned();
848
849 if let Some(expected) = if_match_rev {
850 let current = existing.as_ref().map(|row| row.rev);
851 if current != Some(expected) {
852 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
853 key,
854 expected_rev: Some(expected),
855 current_rev: current,
856 }));
857 }
858 }
859
860 let next_rev = existing
861 .as_ref()
862 .map(|row| row.rev.saturating_add(1))
863 .unwrap_or(1);
864
865 let record = SharedResourceRecord {
866 key: key.clone(),
867 value,
868 rev: next_rev,
869 updated_at_ms: now,
870 updated_by,
871 ttl_ms,
872 };
873
874 let previous = guard.insert(key.clone(), record.clone());
875 drop(guard);
876
877 if let Err(error) = self.persist_shared_resources().await {
878 let mut rollback = self.shared_resources.write().await;
879 if let Some(previous) = previous {
880 rollback.insert(key, previous);
881 } else {
882 rollback.remove(&key);
883 }
884 return Err(ResourceStoreError::PersistFailed {
885 message: error.to_string(),
886 });
887 }
888
889 Ok(record)
890 }
891
892 pub async fn delete_shared_resource(
893 &self,
894 key: &str,
895 if_match_rev: Option<u64>,
896 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
897 if !is_valid_resource_key(key) {
898 return Err(ResourceStoreError::InvalidKey {
899 key: key.to_string(),
900 });
901 }
902
903 let mut guard = self.shared_resources.write().await;
904 let current = guard.get(key).cloned();
905 if let Some(expected) = if_match_rev {
906 let current_rev = current.as_ref().map(|row| row.rev);
907 if current_rev != Some(expected) {
908 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
909 key: key.to_string(),
910 expected_rev: Some(expected),
911 current_rev,
912 }));
913 }
914 }
915
916 let removed = guard.remove(key);
917 drop(guard);
918
919 if let Err(error) = self.persist_shared_resources().await {
920 if let Some(record) = removed.clone() {
921 self.shared_resources
922 .write()
923 .await
924 .insert(record.key.clone(), record);
925 }
926 return Err(ResourceStoreError::PersistFailed {
927 message: error.to_string(),
928 });
929 }
930
931 Ok(removed)
932 }
933
934 pub async fn load_routines(&self) -> anyhow::Result<()> {
935 if !self.routines_path.exists() {
936 return Ok(());
937 }
938 let raw = fs::read_to_string(&self.routines_path).await?;
939 let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
940 .unwrap_or_default();
941 let mut guard = self.routines.write().await;
942 *guard = parsed;
943 Ok(())
944 }
945
946 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
947 if !self.routine_history_path.exists() {
948 return Ok(());
949 }
950 let raw = fs::read_to_string(&self.routine_history_path).await?;
951 let parsed = serde_json::from_str::<
952 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
953 >(&raw)
954 .unwrap_or_default();
955 let mut guard = self.routine_history.write().await;
956 *guard = parsed;
957 Ok(())
958 }
959
960 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
961 if !self.routine_runs_path.exists() {
962 return Ok(());
963 }
964 let raw = fs::read_to_string(&self.routine_runs_path).await?;
965 let parsed =
966 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
967 .unwrap_or_default();
968 let mut guard = self.routine_runs.write().await;
969 *guard = parsed;
970 Ok(())
971 }
972
973 pub async fn persist_routines(&self) -> anyhow::Result<()> {
974 if let Some(parent) = self.routines_path.parent() {
975 fs::create_dir_all(parent).await?;
976 }
977 let payload = {
978 let guard = self.routines.read().await;
979 serde_json::to_string_pretty(&*guard)?
980 };
981 fs::write(&self.routines_path, payload).await?;
982 Ok(())
983 }
984
985 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
986 if let Some(parent) = self.routine_history_path.parent() {
987 fs::create_dir_all(parent).await?;
988 }
989 let payload = {
990 let guard = self.routine_history.read().await;
991 serde_json::to_string_pretty(&*guard)?
992 };
993 fs::write(&self.routine_history_path, payload).await?;
994 Ok(())
995 }
996
997 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
998 if let Some(parent) = self.routine_runs_path.parent() {
999 fs::create_dir_all(parent).await?;
1000 }
1001 let payload = {
1002 let guard = self.routine_runs.read().await;
1003 serde_json::to_string_pretty(&*guard)?
1004 };
1005 fs::write(&self.routine_runs_path, payload).await?;
1006 Ok(())
1007 }
1008
1009 pub async fn put_routine(
1010 &self,
1011 mut routine: RoutineSpec,
1012 ) -> Result<RoutineSpec, RoutineStoreError> {
1013 if routine.routine_id.trim().is_empty() {
1014 return Err(RoutineStoreError::InvalidRoutineId {
1015 routine_id: routine.routine_id,
1016 });
1017 }
1018
1019 routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1020 routine.output_targets = normalize_non_empty_list(routine.output_targets);
1021
1022 let interval = match routine.schedule {
1023 RoutineSchedule::IntervalSeconds { seconds } => {
1024 if seconds == 0 {
1025 return Err(RoutineStoreError::InvalidSchedule {
1026 detail: "interval_seconds must be > 0".to_string(),
1027 });
1028 }
1029 Some(seconds)
1030 }
1031 RoutineSchedule::Cron { .. } => None,
1032 };
1033 if routine.next_fire_at_ms.is_none() {
1034 routine.next_fire_at_ms = Some(now_ms().saturating_add(interval.unwrap_or(60) * 1000));
1035 }
1036
1037 let mut guard = self.routines.write().await;
1038 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1039 drop(guard);
1040
1041 if let Err(error) = self.persist_routines().await {
1042 let mut rollback = self.routines.write().await;
1043 if let Some(previous) = previous {
1044 rollback.insert(previous.routine_id.clone(), previous);
1045 } else {
1046 rollback.remove(&routine.routine_id);
1047 }
1048 return Err(RoutineStoreError::PersistFailed {
1049 message: error.to_string(),
1050 });
1051 }
1052
1053 Ok(routine)
1054 }
1055
1056 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1057 let mut rows = self
1058 .routines
1059 .read()
1060 .await
1061 .values()
1062 .cloned()
1063 .collect::<Vec<_>>();
1064 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1065 rows
1066 }
1067
1068 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1069 self.routines.read().await.get(routine_id).cloned()
1070 }
1071
1072 pub async fn delete_routine(
1073 &self,
1074 routine_id: &str,
1075 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1076 let mut guard = self.routines.write().await;
1077 let removed = guard.remove(routine_id);
1078 drop(guard);
1079
1080 if let Err(error) = self.persist_routines().await {
1081 if let Some(removed) = removed.clone() {
1082 self.routines
1083 .write()
1084 .await
1085 .insert(removed.routine_id.clone(), removed);
1086 }
1087 return Err(RoutineStoreError::PersistFailed {
1088 message: error.to_string(),
1089 });
1090 }
1091 Ok(removed)
1092 }
1093
1094 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1095 let mut plans = Vec::new();
1096 let mut guard = self.routines.write().await;
1097 for routine in guard.values_mut() {
1098 if routine.status != RoutineStatus::Active {
1099 continue;
1100 }
1101 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1102 continue;
1103 };
1104 let Some(interval_ms) = routine_interval_ms(&routine.schedule) else {
1105 continue;
1106 };
1107 if now_ms < next_fire_at_ms {
1108 continue;
1109 }
1110 let (run_count, next_fire_at_ms) = compute_misfire_plan(
1111 now_ms,
1112 next_fire_at_ms,
1113 interval_ms,
1114 &routine.misfire_policy,
1115 );
1116 routine.next_fire_at_ms = Some(next_fire_at_ms);
1117 if run_count == 0 {
1118 continue;
1119 }
1120 plans.push(RoutineTriggerPlan {
1121 routine_id: routine.routine_id.clone(),
1122 run_count,
1123 scheduled_at_ms: now_ms,
1124 next_fire_at_ms,
1125 });
1126 }
1127 drop(guard);
1128 let _ = self.persist_routines().await;
1129 plans
1130 }
1131
1132 pub async fn mark_routine_fired(
1133 &self,
1134 routine_id: &str,
1135 fired_at_ms: u64,
1136 ) -> Option<RoutineSpec> {
1137 let mut guard = self.routines.write().await;
1138 let routine = guard.get_mut(routine_id)?;
1139 routine.last_fired_at_ms = Some(fired_at_ms);
1140 let updated = routine.clone();
1141 drop(guard);
1142 let _ = self.persist_routines().await;
1143 Some(updated)
1144 }
1145
1146 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1147 let mut history = self.routine_history.write().await;
1148 history
1149 .entry(event.routine_id.clone())
1150 .or_default()
1151 .push(event);
1152 drop(history);
1153 let _ = self.persist_routine_history().await;
1154 }
1155
1156 pub async fn list_routine_history(
1157 &self,
1158 routine_id: &str,
1159 limit: usize,
1160 ) -> Vec<RoutineHistoryEvent> {
1161 let limit = limit.clamp(1, 500);
1162 let mut rows = self
1163 .routine_history
1164 .read()
1165 .await
1166 .get(routine_id)
1167 .cloned()
1168 .unwrap_or_default();
1169 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1170 rows.truncate(limit);
1171 rows
1172 }
1173
1174 pub async fn create_routine_run(
1175 &self,
1176 routine: &RoutineSpec,
1177 trigger_type: &str,
1178 run_count: u32,
1179 status: RoutineRunStatus,
1180 detail: Option<String>,
1181 ) -> RoutineRunRecord {
1182 let now = now_ms();
1183 let record = RoutineRunRecord {
1184 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1185 routine_id: routine.routine_id.clone(),
1186 trigger_type: trigger_type.to_string(),
1187 run_count,
1188 status,
1189 created_at_ms: now,
1190 updated_at_ms: now,
1191 fired_at_ms: Some(now),
1192 started_at_ms: None,
1193 finished_at_ms: None,
1194 requires_approval: routine.requires_approval,
1195 approval_reason: None,
1196 denial_reason: None,
1197 paused_reason: None,
1198 detail,
1199 entrypoint: routine.entrypoint.clone(),
1200 args: routine.args.clone(),
1201 allowed_tools: routine.allowed_tools.clone(),
1202 output_targets: routine.output_targets.clone(),
1203 artifacts: Vec::new(),
1204 };
1205 self.routine_runs
1206 .write()
1207 .await
1208 .insert(record.run_id.clone(), record.clone());
1209 let _ = self.persist_routine_runs().await;
1210 record
1211 }
1212
1213 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1214 self.routine_runs.read().await.get(run_id).cloned()
1215 }
1216
1217 pub async fn list_routine_runs(
1218 &self,
1219 routine_id: Option<&str>,
1220 limit: usize,
1221 ) -> Vec<RoutineRunRecord> {
1222 let mut rows = self
1223 .routine_runs
1224 .read()
1225 .await
1226 .values()
1227 .filter(|row| {
1228 if let Some(id) = routine_id {
1229 row.routine_id == id
1230 } else {
1231 true
1232 }
1233 })
1234 .cloned()
1235 .collect::<Vec<_>>();
1236 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1237 rows.truncate(limit.clamp(1, 500));
1238 rows
1239 }
1240
1241 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1242 let mut guard = self.routine_runs.write().await;
1243 let next_run_id = guard
1244 .values()
1245 .filter(|row| row.status == RoutineRunStatus::Queued)
1246 .min_by(|a, b| {
1247 a.created_at_ms
1248 .cmp(&b.created_at_ms)
1249 .then_with(|| a.run_id.cmp(&b.run_id))
1250 })
1251 .map(|row| row.run_id.clone())?;
1252 let now = now_ms();
1253 let row = guard.get_mut(&next_run_id)?;
1254 row.status = RoutineRunStatus::Running;
1255 row.updated_at_ms = now;
1256 row.started_at_ms = Some(now);
1257 let claimed = row.clone();
1258 drop(guard);
1259 let _ = self.persist_routine_runs().await;
1260 Some(claimed)
1261 }
1262
1263 pub async fn set_routine_session_policy(
1264 &self,
1265 session_id: String,
1266 run_id: String,
1267 routine_id: String,
1268 allowed_tools: Vec<String>,
1269 ) {
1270 let policy = RoutineSessionPolicy {
1271 session_id: session_id.clone(),
1272 run_id,
1273 routine_id,
1274 allowed_tools: normalize_allowed_tools(allowed_tools),
1275 };
1276 self.routine_session_policies
1277 .write()
1278 .await
1279 .insert(session_id, policy);
1280 }
1281
1282 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1283 self.routine_session_policies
1284 .read()
1285 .await
1286 .get(session_id)
1287 .cloned()
1288 }
1289
1290 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1291 self.routine_session_policies
1292 .write()
1293 .await
1294 .remove(session_id);
1295 }
1296
1297 pub async fn update_routine_run_status(
1298 &self,
1299 run_id: &str,
1300 status: RoutineRunStatus,
1301 reason: Option<String>,
1302 ) -> Option<RoutineRunRecord> {
1303 let mut guard = self.routine_runs.write().await;
1304 let row = guard.get_mut(run_id)?;
1305 row.status = status.clone();
1306 row.updated_at_ms = now_ms();
1307 match status {
1308 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1309 RoutineRunStatus::Running => {
1310 row.started_at_ms.get_or_insert_with(now_ms);
1311 if let Some(detail) = reason {
1312 row.detail = Some(detail);
1313 }
1314 }
1315 RoutineRunStatus::Denied => row.denial_reason = reason,
1316 RoutineRunStatus::Paused => row.paused_reason = reason,
1317 RoutineRunStatus::Completed
1318 | RoutineRunStatus::Failed
1319 | RoutineRunStatus::Cancelled => {
1320 row.finished_at_ms = Some(now_ms());
1321 if let Some(detail) = reason {
1322 row.detail = Some(detail);
1323 }
1324 }
1325 _ => {
1326 if let Some(detail) = reason {
1327 row.detail = Some(detail);
1328 }
1329 }
1330 }
1331 let updated = row.clone();
1332 drop(guard);
1333 let _ = self.persist_routine_runs().await;
1334 Some(updated)
1335 }
1336
1337 pub async fn append_routine_run_artifact(
1338 &self,
1339 run_id: &str,
1340 artifact: RoutineRunArtifact,
1341 ) -> Option<RoutineRunRecord> {
1342 let mut guard = self.routine_runs.write().await;
1343 let row = guard.get_mut(run_id)?;
1344 row.updated_at_ms = now_ms();
1345 row.artifacts.push(artifact);
1346 let updated = row.clone();
1347 drop(guard);
1348 let _ = self.persist_routine_runs().await;
1349 Some(updated)
1350 }
1351}
1352
1353async fn build_channels_config(
1354 state: &AppState,
1355 channels: &ChannelsConfigFile,
1356) -> Option<ChannelsConfig> {
1357 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1358 return None;
1359 }
1360 Some(ChannelsConfig {
1361 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1362 bot_token: cfg.bot_token,
1363 allowed_users: cfg.allowed_users,
1364 mention_only: cfg.mention_only,
1365 }),
1366 discord: channels.discord.clone().map(|cfg| DiscordConfig {
1367 bot_token: cfg.bot_token,
1368 guild_id: cfg.guild_id,
1369 allowed_users: cfg.allowed_users,
1370 mention_only: cfg.mention_only,
1371 }),
1372 slack: channels.slack.clone().map(|cfg| SlackConfig {
1373 bot_token: cfg.bot_token,
1374 channel_id: cfg.channel_id,
1375 allowed_users: cfg.allowed_users,
1376 }),
1377 server_base_url: state.server_base_url(),
1378 api_token: state.api_token().await.unwrap_or_default(),
1379 tool_policy: channels.tool_policy.clone(),
1380 })
1381}
1382
1383fn normalize_web_ui_prefix(prefix: &str) -> String {
1384 let trimmed = prefix.trim();
1385 if trimmed.is_empty() || trimmed == "/" {
1386 return "/admin".to_string();
1387 }
1388 let with_leading = if trimmed.starts_with('/') {
1389 trimmed.to_string()
1390 } else {
1391 format!("/{trimmed}")
1392 };
1393 with_leading.trim_end_matches('/').to_string()
1394}
1395
1396fn default_web_ui_prefix() -> String {
1397 "/admin".to_string()
1398}
1399
1400fn default_allow_all() -> Vec<String> {
1401 vec!["*".to_string()]
1402}
1403
1404fn default_discord_mention_only() -> bool {
1405 true
1406}
1407
1408fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
1409 normalize_non_empty_list(raw)
1410}
1411
1412fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1413 let mut out = Vec::new();
1414 let mut seen = std::collections::HashSet::new();
1415 for item in raw {
1416 let normalized = item.trim().to_string();
1417 if normalized.is_empty() {
1418 continue;
1419 }
1420 if seen.insert(normalized.clone()) {
1421 out.push(normalized);
1422 }
1423 }
1424 out
1425}
1426
1427fn resolve_run_stale_ms() -> u64 {
1428 std::env::var("TANDEM_RUN_STALE_MS")
1429 .ok()
1430 .and_then(|v| v.trim().parse::<u64>().ok())
1431 .unwrap_or(120_000)
1432 .clamp(30_000, 600_000)
1433}
1434
1435fn resolve_shared_resources_path() -> PathBuf {
1436 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1437 let trimmed = dir.trim();
1438 if !trimmed.is_empty() {
1439 return PathBuf::from(trimmed).join("shared_resources.json");
1440 }
1441 }
1442 default_state_dir().join("shared_resources.json")
1443}
1444
1445fn resolve_routines_path() -> PathBuf {
1446 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1447 let trimmed = dir.trim();
1448 if !trimmed.is_empty() {
1449 return PathBuf::from(trimmed).join("routines.json");
1450 }
1451 }
1452 default_state_dir().join("routines.json")
1453}
1454
1455fn resolve_routine_history_path() -> PathBuf {
1456 if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
1457 let trimmed = root.trim();
1458 if !trimmed.is_empty() {
1459 return PathBuf::from(trimmed).join("routine_history.json");
1460 }
1461 }
1462 default_state_dir().join("routine_history.json")
1463}
1464
1465fn resolve_routine_runs_path() -> PathBuf {
1466 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
1467 let trimmed = root.trim();
1468 if !trimmed.is_empty() {
1469 return PathBuf::from(trimmed).join("routine_runs.json");
1470 }
1471 }
1472 default_state_dir().join("routine_runs.json")
1473}
1474
1475fn resolve_agent_team_audit_path() -> PathBuf {
1476 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
1477 let trimmed = base.trim();
1478 if !trimmed.is_empty() {
1479 return PathBuf::from(trimmed)
1480 .join("agent-team")
1481 .join("audit.log.jsonl");
1482 }
1483 }
1484 default_state_dir()
1485 .join("agent-team")
1486 .join("audit.log.jsonl")
1487}
1488
1489fn default_state_dir() -> PathBuf {
1490 if let Ok(paths) = resolve_shared_paths() {
1491 return paths.engine_state_dir;
1492 }
1493 if let Some(data_dir) = dirs::data_dir() {
1494 return data_dir.join("tandem").join("data");
1495 }
1496 dirs::home_dir()
1497 .map(|home| home.join(".tandem").join("data"))
1498 .unwrap_or_else(|| PathBuf::from(".tandem"))
1499}
1500
1501fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
1502 match schedule {
1503 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
1504 RoutineSchedule::Cron { .. } => None,
1505 }
1506}
1507
1508fn compute_misfire_plan(
1509 now_ms: u64,
1510 next_fire_at_ms: u64,
1511 interval_ms: u64,
1512 policy: &RoutineMisfirePolicy,
1513) -> (u32, u64) {
1514 if now_ms < next_fire_at_ms || interval_ms == 0 {
1515 return (0, next_fire_at_ms);
1516 }
1517 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
1518 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
1519 match policy {
1520 RoutineMisfirePolicy::Skip => (0, aligned_next),
1521 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
1522 RoutineMisfirePolicy::CatchUp { max_runs } => {
1523 let count = missed.min(u64::from(*max_runs)) as u32;
1524 (count, aligned_next)
1525 }
1526 }
1527}
1528
1529#[derive(Debug, Clone, PartialEq, Eq)]
1530pub enum RoutineExecutionDecision {
1531 Allowed,
1532 RequiresApproval { reason: String },
1533 Blocked { reason: String },
1534}
1535
1536pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
1537 let entrypoint = routine.entrypoint.to_ascii_lowercase();
1538 if entrypoint.starts_with("connector.")
1539 || entrypoint.starts_with("integration.")
1540 || entrypoint.contains("external")
1541 {
1542 return true;
1543 }
1544 routine
1545 .args
1546 .get("uses_external_integrations")
1547 .and_then(|v| v.as_bool())
1548 .unwrap_or(false)
1549 || routine
1550 .args
1551 .get("connector_id")
1552 .and_then(|v| v.as_str())
1553 .is_some()
1554}
1555
1556pub fn evaluate_routine_execution_policy(
1557 routine: &RoutineSpec,
1558 trigger_type: &str,
1559) -> RoutineExecutionDecision {
1560 if !routine_uses_external_integrations(routine) {
1561 return RoutineExecutionDecision::Allowed;
1562 }
1563 if !routine.external_integrations_allowed {
1564 return RoutineExecutionDecision::Blocked {
1565 reason: "external integrations are disabled by policy".to_string(),
1566 };
1567 }
1568 if routine.requires_approval {
1569 return RoutineExecutionDecision::RequiresApproval {
1570 reason: format!(
1571 "manual approval required before external side effects ({})",
1572 trigger_type
1573 ),
1574 };
1575 }
1576 RoutineExecutionDecision::Allowed
1577}
1578
1579fn is_valid_resource_key(key: &str) -> bool {
1580 let trimmed = key.trim();
1581 if trimmed.is_empty() {
1582 return false;
1583 }
1584 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
1585 if !allowed_prefix
1586 .iter()
1587 .any(|prefix| trimmed.starts_with(prefix))
1588 {
1589 return false;
1590 }
1591 !trimmed.contains("//")
1592}
1593
1594impl Deref for AppState {
1595 type Target = RuntimeState;
1596
1597 fn deref(&self) -> &Self::Target {
1598 self.runtime
1599 .get()
1600 .expect("runtime accessed before startup completion")
1601 }
1602}
1603
1604fn extract_event_session_id(properties: &Value) -> Option<String> {
1605 properties
1606 .get("sessionID")
1607 .or_else(|| properties.get("sessionId"))
1608 .or_else(|| properties.get("id"))
1609 .and_then(|v| v.as_str())
1610 .map(|s| s.to_string())
1611}
1612
1613fn extract_event_run_id(properties: &Value) -> Option<String> {
1614 properties
1615 .get("runID")
1616 .or_else(|| properties.get("run_id"))
1617 .and_then(|v| v.as_str())
1618 .map(|s| s.to_string())
1619}
1620
1621fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1622 let session_id = extract_event_session_id(&event.properties)?;
1623 let run_id = extract_event_run_id(&event.properties);
1624 let key = format!("run/{session_id}/status");
1625
1626 let mut base = serde_json::Map::new();
1627 base.insert("sessionID".to_string(), Value::String(session_id));
1628 if let Some(run_id) = run_id {
1629 base.insert("runID".to_string(), Value::String(run_id));
1630 }
1631
1632 match event.event_type.as_str() {
1633 "session.run.started" => {
1634 base.insert("state".to_string(), Value::String("running".to_string()));
1635 base.insert("phase".to_string(), Value::String("run".to_string()));
1636 base.insert(
1637 "eventType".to_string(),
1638 Value::String("session.run.started".to_string()),
1639 );
1640 Some(StatusIndexUpdate {
1641 key,
1642 value: Value::Object(base),
1643 })
1644 }
1645 "session.run.finished" => {
1646 base.insert("state".to_string(), Value::String("finished".to_string()));
1647 base.insert("phase".to_string(), Value::String("run".to_string()));
1648 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1649 base.insert("result".to_string(), Value::String(status.to_string()));
1650 }
1651 base.insert(
1652 "eventType".to_string(),
1653 Value::String("session.run.finished".to_string()),
1654 );
1655 Some(StatusIndexUpdate {
1656 key,
1657 value: Value::Object(base),
1658 })
1659 }
1660 "message.part.updated" => {
1661 let part_type = event
1662 .properties
1663 .get("part")
1664 .and_then(|v| v.get("type"))
1665 .and_then(|v| v.as_str())?;
1666 let (phase, tool_active) = match part_type {
1667 "tool-invocation" => ("tool", true),
1668 "tool-result" => ("run", false),
1669 _ => return None,
1670 };
1671 base.insert("state".to_string(), Value::String("running".to_string()));
1672 base.insert("phase".to_string(), Value::String(phase.to_string()));
1673 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1674 if let Some(tool_name) = event
1675 .properties
1676 .get("part")
1677 .and_then(|v| v.get("tool"))
1678 .and_then(|v| v.as_str())
1679 {
1680 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1681 }
1682 base.insert(
1683 "eventType".to_string(),
1684 Value::String("message.part.updated".to_string()),
1685 );
1686 Some(StatusIndexUpdate {
1687 key,
1688 value: Value::Object(base),
1689 })
1690 }
1691 _ => None,
1692 }
1693}
1694
1695pub async fn run_status_indexer(state: AppState) {
1696 let mut rx = state.event_bus.subscribe();
1697 loop {
1698 match rx.recv().await {
1699 Ok(event) => {
1700 if let Some(update) = derive_status_index_update(&event) {
1701 if let Err(error) = state
1702 .put_shared_resource(
1703 update.key,
1704 update.value,
1705 None,
1706 "system.status_indexer".to_string(),
1707 None,
1708 )
1709 .await
1710 {
1711 tracing::warn!("status indexer failed to persist update: {error:?}");
1712 }
1713 }
1714 }
1715 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1716 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1717 }
1718 }
1719}
1720
1721pub async fn run_agent_team_supervisor(state: AppState) {
1722 let mut rx = state.event_bus.subscribe();
1723 loop {
1724 match rx.recv().await {
1725 Ok(event) => {
1726 state.agent_teams.handle_engine_event(&state, &event).await;
1727 }
1728 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1729 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1730 }
1731 }
1732}
1733
1734pub async fn run_routine_scheduler(state: AppState) {
1735 loop {
1736 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1737 let now = now_ms();
1738 let plans = state.evaluate_routine_misfires(now).await;
1739 for plan in plans {
1740 let Some(routine) = state.get_routine(&plan.routine_id).await else {
1741 continue;
1742 };
1743 match evaluate_routine_execution_policy(&routine, "scheduled") {
1744 RoutineExecutionDecision::Allowed => {
1745 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
1746 let run = state
1747 .create_routine_run(
1748 &routine,
1749 "scheduled",
1750 plan.run_count,
1751 RoutineRunStatus::Queued,
1752 None,
1753 )
1754 .await;
1755 state
1756 .append_routine_history(RoutineHistoryEvent {
1757 routine_id: plan.routine_id.clone(),
1758 trigger_type: "scheduled".to_string(),
1759 run_count: plan.run_count,
1760 fired_at_ms: now,
1761 status: "queued".to_string(),
1762 detail: None,
1763 })
1764 .await;
1765 state.event_bus.publish(EngineEvent::new(
1766 "routine.fired",
1767 serde_json::json!({
1768 "routineID": plan.routine_id,
1769 "runID": run.run_id,
1770 "runCount": plan.run_count,
1771 "scheduledAtMs": plan.scheduled_at_ms,
1772 "nextFireAtMs": plan.next_fire_at_ms,
1773 }),
1774 ));
1775 state.event_bus.publish(EngineEvent::new(
1776 "routine.run.created",
1777 serde_json::json!({
1778 "run": run,
1779 }),
1780 ));
1781 }
1782 RoutineExecutionDecision::RequiresApproval { reason } => {
1783 let run = state
1784 .create_routine_run(
1785 &routine,
1786 "scheduled",
1787 plan.run_count,
1788 RoutineRunStatus::PendingApproval,
1789 Some(reason.clone()),
1790 )
1791 .await;
1792 state
1793 .append_routine_history(RoutineHistoryEvent {
1794 routine_id: plan.routine_id.clone(),
1795 trigger_type: "scheduled".to_string(),
1796 run_count: plan.run_count,
1797 fired_at_ms: now,
1798 status: "pending_approval".to_string(),
1799 detail: Some(reason.clone()),
1800 })
1801 .await;
1802 state.event_bus.publish(EngineEvent::new(
1803 "routine.approval_required",
1804 serde_json::json!({
1805 "routineID": plan.routine_id,
1806 "runID": run.run_id,
1807 "runCount": plan.run_count,
1808 "triggerType": "scheduled",
1809 "reason": reason,
1810 }),
1811 ));
1812 state.event_bus.publish(EngineEvent::new(
1813 "routine.run.created",
1814 serde_json::json!({
1815 "run": run,
1816 }),
1817 ));
1818 }
1819 RoutineExecutionDecision::Blocked { reason } => {
1820 let run = state
1821 .create_routine_run(
1822 &routine,
1823 "scheduled",
1824 plan.run_count,
1825 RoutineRunStatus::BlockedPolicy,
1826 Some(reason.clone()),
1827 )
1828 .await;
1829 state
1830 .append_routine_history(RoutineHistoryEvent {
1831 routine_id: plan.routine_id.clone(),
1832 trigger_type: "scheduled".to_string(),
1833 run_count: plan.run_count,
1834 fired_at_ms: now,
1835 status: "blocked_policy".to_string(),
1836 detail: Some(reason.clone()),
1837 })
1838 .await;
1839 state.event_bus.publish(EngineEvent::new(
1840 "routine.blocked",
1841 serde_json::json!({
1842 "routineID": plan.routine_id,
1843 "runID": run.run_id,
1844 "runCount": plan.run_count,
1845 "triggerType": "scheduled",
1846 "reason": reason,
1847 }),
1848 ));
1849 state.event_bus.publish(EngineEvent::new(
1850 "routine.run.created",
1851 serde_json::json!({
1852 "run": run,
1853 }),
1854 ));
1855 }
1856 }
1857 }
1858 }
1859}
1860
1861pub async fn run_routine_executor(state: AppState) {
1862 loop {
1863 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1864 let Some(run) = state.claim_next_queued_routine_run().await else {
1865 continue;
1866 };
1867
1868 state.event_bus.publish(EngineEvent::new(
1869 "routine.run.started",
1870 serde_json::json!({
1871 "runID": run.run_id,
1872 "routineID": run.routine_id,
1873 "triggerType": run.trigger_type,
1874 "startedAtMs": now_ms(),
1875 }),
1876 ));
1877
1878 let workspace_root = state.workspace_index.snapshot().await.root;
1879 let mut session = Session::new(
1880 Some(format!("Routine {}", run.routine_id)),
1881 Some(workspace_root.clone()),
1882 );
1883 let session_id = session.id.clone();
1884 session.workspace_root = Some(workspace_root);
1885
1886 if let Err(error) = state.storage.save_session(session).await {
1887 let detail = format!("failed to create routine session: {error}");
1888 let _ = state
1889 .update_routine_run_status(
1890 &run.run_id,
1891 RoutineRunStatus::Failed,
1892 Some(detail.clone()),
1893 )
1894 .await;
1895 state.event_bus.publish(EngineEvent::new(
1896 "routine.run.failed",
1897 serde_json::json!({
1898 "runID": run.run_id,
1899 "routineID": run.routine_id,
1900 "reason": detail,
1901 }),
1902 ));
1903 continue;
1904 }
1905
1906 state
1907 .set_routine_session_policy(
1908 session_id.clone(),
1909 run.run_id.clone(),
1910 run.routine_id.clone(),
1911 run.allowed_tools.clone(),
1912 )
1913 .await;
1914 state
1915 .engine_loop
1916 .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
1917 .await;
1918
1919 let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
1920 if let Some(spec) = selected_model.as_ref() {
1921 state.event_bus.publish(EngineEvent::new(
1922 "routine.run.model_selected",
1923 serde_json::json!({
1924 "runID": run.run_id,
1925 "routineID": run.routine_id,
1926 "providerID": spec.provider_id,
1927 "modelID": spec.model_id,
1928 "source": model_source,
1929 }),
1930 ));
1931 }
1932
1933 let request = SendMessageRequest {
1934 parts: vec![MessagePartInput::Text {
1935 text: build_routine_prompt(&state, &run).await,
1936 }],
1937 model: selected_model,
1938 agent: None,
1939 };
1940
1941 let run_result = state
1942 .engine_loop
1943 .run_prompt_async_with_context(
1944 session_id.clone(),
1945 request,
1946 Some(format!("routine:{}", run.run_id)),
1947 )
1948 .await;
1949
1950 state.clear_routine_session_policy(&session_id).await;
1951 state
1952 .engine_loop
1953 .clear_session_allowed_tools(&session_id)
1954 .await;
1955
1956 match run_result {
1957 Ok(()) => {
1958 append_configured_output_artifacts(&state, &run).await;
1959 let _ = state
1960 .update_routine_run_status(
1961 &run.run_id,
1962 RoutineRunStatus::Completed,
1963 Some("routine run completed".to_string()),
1964 )
1965 .await;
1966 state.event_bus.publish(EngineEvent::new(
1967 "routine.run.completed",
1968 serde_json::json!({
1969 "runID": run.run_id,
1970 "routineID": run.routine_id,
1971 "sessionID": session_id,
1972 "finishedAtMs": now_ms(),
1973 }),
1974 ));
1975 }
1976 Err(error) => {
1977 let detail = truncate_text(&error.to_string(), 500);
1978 let _ = state
1979 .update_routine_run_status(
1980 &run.run_id,
1981 RoutineRunStatus::Failed,
1982 Some(detail.clone()),
1983 )
1984 .await;
1985 state.event_bus.publish(EngineEvent::new(
1986 "routine.run.failed",
1987 serde_json::json!({
1988 "runID": run.run_id,
1989 "routineID": run.routine_id,
1990 "sessionID": session_id,
1991 "reason": detail,
1992 "finishedAtMs": now_ms(),
1993 }),
1994 ));
1995 }
1996 }
1997 }
1998}
1999
2000async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
2001 let normalized_entrypoint = run.entrypoint.trim();
2002 let known_tool = state
2003 .tools
2004 .list()
2005 .await
2006 .into_iter()
2007 .any(|schema| schema.name == normalized_entrypoint);
2008 if known_tool {
2009 let args = if run.args.is_object() {
2010 run.args.clone()
2011 } else {
2012 serde_json::json!({})
2013 };
2014 return format!("/tool {} {}", normalized_entrypoint, args);
2015 }
2016
2017 if let Some(objective) = routine_objective_from_args(run) {
2018 return build_routine_mission_prompt(run, &objective);
2019 }
2020
2021 format!(
2022 "Execute routine '{}' using entrypoint '{}' with args: {}",
2023 run.routine_id, run.entrypoint, run.args
2024 )
2025}
2026
2027fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
2028 run.args
2029 .get("prompt")
2030 .and_then(|v| v.as_str())
2031 .map(str::trim)
2032 .filter(|v| !v.is_empty())
2033 .map(ToString::to_string)
2034}
2035
2036fn routine_mode_from_args(args: &Value) -> &str {
2037 args.get("mode")
2038 .and_then(|v| v.as_str())
2039 .map(str::trim)
2040 .filter(|v| !v.is_empty())
2041 .unwrap_or("standalone")
2042}
2043
2044fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
2045 args.get("success_criteria")
2046 .and_then(|v| v.as_array())
2047 .map(|rows| {
2048 rows.iter()
2049 .filter_map(|row| row.as_str())
2050 .map(str::trim)
2051 .filter(|row| !row.is_empty())
2052 .map(ToString::to_string)
2053 .collect::<Vec<_>>()
2054 })
2055 .unwrap_or_default()
2056}
2057
2058fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
2059 let mode = routine_mode_from_args(&run.args);
2060 let success_criteria = routine_success_criteria_from_args(&run.args);
2061 let orchestrator_only_tool_calls = run
2062 .args
2063 .get("orchestrator_only_tool_calls")
2064 .and_then(|v| v.as_bool())
2065 .unwrap_or(false);
2066
2067 let mut lines = vec![
2068 format!("Automation ID: {}", run.routine_id),
2069 format!("Run ID: {}", run.run_id),
2070 format!("Mode: {}", mode),
2071 format!("Mission Objective: {}", objective),
2072 ];
2073
2074 if !success_criteria.is_empty() {
2075 lines.push("Success Criteria:".to_string());
2076 for criterion in success_criteria {
2077 lines.push(format!("- {}", criterion));
2078 }
2079 }
2080
2081 if run.allowed_tools.is_empty() {
2082 lines.push("Allowed Tools: all available by current policy".to_string());
2083 } else {
2084 lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
2085 }
2086
2087 if run.output_targets.is_empty() {
2088 lines.push("Output Targets: none configured".to_string());
2089 } else {
2090 lines.push("Output Targets:".to_string());
2091 for target in &run.output_targets {
2092 lines.push(format!("- {}", target));
2093 }
2094 }
2095
2096 if mode.eq_ignore_ascii_case("orchestrated") {
2097 lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
2098 lines
2099 .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
2100 if orchestrator_only_tool_calls {
2101 lines.push(
2102 "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
2103 .to_string(),
2104 );
2105 }
2106 } else {
2107 lines.push("Execution Pattern: Standalone mission run".to_string());
2108 }
2109
2110 lines.push(
2111 "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
2112 .to_string(),
2113 );
2114
2115 lines.join("\n")
2116}
2117
2118fn truncate_text(input: &str, max_len: usize) -> String {
2119 if input.len() <= max_len {
2120 return input.to_string();
2121 }
2122 let mut out = input[..max_len].to_string();
2123 out.push_str("...<truncated>");
2124 out
2125}
2126
2127async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
2128 if run.output_targets.is_empty() {
2129 return;
2130 }
2131 for target in &run.output_targets {
2132 let artifact = RoutineRunArtifact {
2133 artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
2134 uri: target.clone(),
2135 kind: "output_target".to_string(),
2136 label: Some("configured output target".to_string()),
2137 created_at_ms: now_ms(),
2138 metadata: Some(serde_json::json!({
2139 "source": "routine.output_targets",
2140 "runID": run.run_id,
2141 "routineID": run.routine_id,
2142 })),
2143 };
2144 let _ = state
2145 .append_routine_run_artifact(&run.run_id, artifact.clone())
2146 .await;
2147 state.event_bus.publish(EngineEvent::new(
2148 "routine.run.artifact_added",
2149 serde_json::json!({
2150 "runID": run.run_id,
2151 "routineID": run.routine_id,
2152 "artifact": artifact,
2153 }),
2154 ));
2155 }
2156}
2157
2158fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
2159 let obj = value.as_object()?;
2160 let provider_id = obj.get("provider_id")?.as_str()?.trim();
2161 let model_id = obj.get("model_id")?.as_str()?.trim();
2162 if provider_id.is_empty() || model_id.is_empty() {
2163 return None;
2164 }
2165 Some(ModelSpec {
2166 provider_id: provider_id.to_string(),
2167 model_id: model_id.to_string(),
2168 })
2169}
2170
2171fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
2172 args.get("model_policy")
2173 .and_then(|v| v.get("role_models"))
2174 .and_then(|v| v.get(role))
2175 .and_then(parse_model_spec)
2176}
2177
2178fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
2179 args.get("model_policy")
2180 .and_then(|v| v.get("default_model"))
2181 .and_then(parse_model_spec)
2182}
2183
2184fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
2185 providers.iter().any(|provider| {
2186 provider.id == spec.provider_id
2187 && provider
2188 .models
2189 .iter()
2190 .any(|model| model.id == spec.model_id)
2191 })
2192}
2193
2194async fn resolve_routine_model_spec_for_run(
2195 state: &AppState,
2196 run: &RoutineRunRecord,
2197) -> (Option<ModelSpec>, String) {
2198 let providers = state.providers.list().await;
2199 let mode = routine_mode_from_args(&run.args);
2200 let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
2201
2202 if mode.eq_ignore_ascii_case("orchestrated") {
2203 if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
2204 requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
2205 }
2206 }
2207 if let Some(default_model) = default_model_spec_from_args(&run.args) {
2208 requested.push((default_model, "args.model_policy.default_model"));
2209 }
2210
2211 for (candidate, source) in requested {
2212 if provider_catalog_has_model(&providers, &candidate) {
2213 return (Some(candidate), source.to_string());
2214 }
2215 }
2216
2217 let fallback = providers
2218 .into_iter()
2219 .find(|provider| !provider.models.is_empty())
2220 .and_then(|provider| {
2221 let model = provider.models.first()?;
2222 Some(ModelSpec {
2223 provider_id: provider.id,
2224 model_id: model.id.clone(),
2225 })
2226 });
2227
2228 (fallback, "provider_catalog_fallback".to_string())
2229}
2230
2231#[cfg(test)]
2232mod tests {
2233 use super::*;
2234
2235 fn test_state_with_path(path: PathBuf) -> AppState {
2236 let mut state = AppState::new_starting("test-attempt".to_string(), true);
2237 state.shared_resources_path = path;
2238 state.routines_path = tmp_routines_file("shared-state");
2239 state.routine_history_path = tmp_routines_file("routine-history");
2240 state.routine_runs_path = tmp_routines_file("routine-runs");
2241 state
2242 }
2243
2244 fn tmp_resource_file(name: &str) -> PathBuf {
2245 std::env::temp_dir().join(format!(
2246 "tandem-server-{name}-{}.json",
2247 uuid::Uuid::new_v4()
2248 ))
2249 }
2250
2251 fn tmp_routines_file(name: &str) -> PathBuf {
2252 std::env::temp_dir().join(format!(
2253 "tandem-server-routines-{name}-{}.json",
2254 uuid::Uuid::new_v4()
2255 ))
2256 }
2257
2258 #[tokio::test]
2259 async fn shared_resource_put_increments_revision() {
2260 let path = tmp_resource_file("shared-resource-put");
2261 let state = test_state_with_path(path.clone());
2262
2263 let first = state
2264 .put_shared_resource(
2265 "project/demo/board".to_string(),
2266 serde_json::json!({"status":"todo"}),
2267 None,
2268 "agent-1".to_string(),
2269 None,
2270 )
2271 .await
2272 .expect("first put");
2273 assert_eq!(first.rev, 1);
2274
2275 let second = state
2276 .put_shared_resource(
2277 "project/demo/board".to_string(),
2278 serde_json::json!({"status":"doing"}),
2279 Some(1),
2280 "agent-2".to_string(),
2281 Some(60_000),
2282 )
2283 .await
2284 .expect("second put");
2285 assert_eq!(second.rev, 2);
2286 assert_eq!(second.updated_by, "agent-2");
2287 assert_eq!(second.ttl_ms, Some(60_000));
2288
2289 let raw = tokio::fs::read_to_string(path.clone())
2290 .await
2291 .expect("persisted");
2292 assert!(raw.contains("\"rev\": 2"));
2293 let _ = tokio::fs::remove_file(path).await;
2294 }
2295
2296 #[tokio::test]
2297 async fn shared_resource_put_detects_revision_conflict() {
2298 let path = tmp_resource_file("shared-resource-conflict");
2299 let state = test_state_with_path(path.clone());
2300
2301 let _ = state
2302 .put_shared_resource(
2303 "mission/demo/card-1".to_string(),
2304 serde_json::json!({"title":"Card 1"}),
2305 None,
2306 "agent-1".to_string(),
2307 None,
2308 )
2309 .await
2310 .expect("seed put");
2311
2312 let conflict = state
2313 .put_shared_resource(
2314 "mission/demo/card-1".to_string(),
2315 serde_json::json!({"title":"Card 1 edited"}),
2316 Some(99),
2317 "agent-2".to_string(),
2318 None,
2319 )
2320 .await
2321 .expect_err("expected conflict");
2322
2323 match conflict {
2324 ResourceStoreError::RevisionConflict(conflict) => {
2325 assert_eq!(conflict.expected_rev, Some(99));
2326 assert_eq!(conflict.current_rev, Some(1));
2327 }
2328 other => panic!("unexpected error: {other:?}"),
2329 }
2330
2331 let _ = tokio::fs::remove_file(path).await;
2332 }
2333
2334 #[tokio::test]
2335 async fn shared_resource_rejects_invalid_namespace_key() {
2336 let path = tmp_resource_file("shared-resource-invalid-key");
2337 let state = test_state_with_path(path.clone());
2338
2339 let error = state
2340 .put_shared_resource(
2341 "global/demo/key".to_string(),
2342 serde_json::json!({"x":1}),
2343 None,
2344 "agent-1".to_string(),
2345 None,
2346 )
2347 .await
2348 .expect_err("invalid key should fail");
2349
2350 match error {
2351 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
2352 other => panic!("unexpected error: {other:?}"),
2353 }
2354
2355 assert!(!path.exists());
2356 }
2357
2358 #[test]
2359 fn derive_status_index_update_for_run_started() {
2360 let event = EngineEvent::new(
2361 "session.run.started",
2362 serde_json::json!({
2363 "sessionID": "s-1",
2364 "runID": "r-1"
2365 }),
2366 );
2367 let update = derive_status_index_update(&event).expect("update");
2368 assert_eq!(update.key, "run/s-1/status");
2369 assert_eq!(
2370 update.value.get("state").and_then(|v| v.as_str()),
2371 Some("running")
2372 );
2373 assert_eq!(
2374 update.value.get("phase").and_then(|v| v.as_str()),
2375 Some("run")
2376 );
2377 }
2378
2379 #[test]
2380 fn derive_status_index_update_for_tool_invocation() {
2381 let event = EngineEvent::new(
2382 "message.part.updated",
2383 serde_json::json!({
2384 "sessionID": "s-2",
2385 "runID": "r-2",
2386 "part": { "type": "tool-invocation", "tool": "todo_write" }
2387 }),
2388 );
2389 let update = derive_status_index_update(&event).expect("update");
2390 assert_eq!(update.key, "run/s-2/status");
2391 assert_eq!(
2392 update.value.get("phase").and_then(|v| v.as_str()),
2393 Some("tool")
2394 );
2395 assert_eq!(
2396 update.value.get("toolActive").and_then(|v| v.as_bool()),
2397 Some(true)
2398 );
2399 assert_eq!(
2400 update.value.get("tool").and_then(|v| v.as_str()),
2401 Some("todo_write")
2402 );
2403 }
2404
2405 #[test]
2406 fn misfire_skip_drops_runs_and_advances_next_fire() {
2407 let (count, next_fire) =
2408 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
2409 assert_eq!(count, 0);
2410 assert_eq!(next_fire, 11_000);
2411 }
2412
2413 #[test]
2414 fn misfire_run_once_emits_single_trigger() {
2415 let (count, next_fire) =
2416 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
2417 assert_eq!(count, 1);
2418 assert_eq!(next_fire, 11_000);
2419 }
2420
2421 #[test]
2422 fn misfire_catch_up_caps_trigger_count() {
2423 let (count, next_fire) = compute_misfire_plan(
2424 25_000,
2425 5_000,
2426 1_000,
2427 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2428 );
2429 assert_eq!(count, 3);
2430 assert_eq!(next_fire, 26_000);
2431 }
2432
2433 #[tokio::test]
2434 async fn routine_put_persists_and_loads() {
2435 let routines_path = tmp_routines_file("persist-load");
2436 let mut state = AppState::new_starting("routines-put".to_string(), true);
2437 state.routines_path = routines_path.clone();
2438
2439 let routine = RoutineSpec {
2440 routine_id: "routine-1".to_string(),
2441 name: "Digest".to_string(),
2442 status: RoutineStatus::Active,
2443 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2444 timezone: "UTC".to_string(),
2445 misfire_policy: RoutineMisfirePolicy::RunOnce,
2446 entrypoint: "mission.default".to_string(),
2447 args: serde_json::json!({"topic":"status"}),
2448 allowed_tools: vec![],
2449 output_targets: vec![],
2450 creator_type: "user".to_string(),
2451 creator_id: "user-1".to_string(),
2452 requires_approval: true,
2453 external_integrations_allowed: false,
2454 next_fire_at_ms: Some(5_000),
2455 last_fired_at_ms: None,
2456 };
2457
2458 state.put_routine(routine).await.expect("store routine");
2459
2460 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
2461 reloaded.routines_path = routines_path.clone();
2462 reloaded.load_routines().await.expect("load routines");
2463 let list = reloaded.list_routines().await;
2464 assert_eq!(list.len(), 1);
2465 assert_eq!(list[0].routine_id, "routine-1");
2466
2467 let _ = tokio::fs::remove_file(routines_path).await;
2468 }
2469
2470 #[tokio::test]
2471 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
2472 let routines_path = tmp_routines_file("misfire-eval");
2473 let mut state = AppState::new_starting("routines-eval".to_string(), true);
2474 state.routines_path = routines_path.clone();
2475
2476 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
2477 routine_id: id.to_string(),
2478 name: id.to_string(),
2479 status: RoutineStatus::Active,
2480 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
2481 timezone: "UTC".to_string(),
2482 misfire_policy: policy,
2483 entrypoint: "mission.default".to_string(),
2484 args: serde_json::json!({}),
2485 allowed_tools: vec![],
2486 output_targets: vec![],
2487 creator_type: "user".to_string(),
2488 creator_id: "u-1".to_string(),
2489 requires_approval: false,
2490 external_integrations_allowed: false,
2491 next_fire_at_ms: Some(5_000),
2492 last_fired_at_ms: None,
2493 };
2494
2495 state
2496 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
2497 .await
2498 .expect("put skip");
2499 state
2500 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
2501 .await
2502 .expect("put once");
2503 state
2504 .put_routine(base(
2505 "routine-catch",
2506 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2507 ))
2508 .await
2509 .expect("put catch");
2510
2511 let plans = state.evaluate_routine_misfires(10_500).await;
2512 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
2513 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
2514 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
2515
2516 assert!(plan_skip.is_none());
2517 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
2518 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
2519
2520 let stored = state.list_routines().await;
2521 let skip_next = stored
2522 .iter()
2523 .find(|r| r.routine_id == "routine-skip")
2524 .and_then(|r| r.next_fire_at_ms)
2525 .expect("skip next");
2526 assert!(skip_next > 10_500);
2527
2528 let _ = tokio::fs::remove_file(routines_path).await;
2529 }
2530
2531 #[test]
2532 fn routine_policy_blocks_external_side_effects_by_default() {
2533 let routine = RoutineSpec {
2534 routine_id: "routine-policy-1".to_string(),
2535 name: "Connector routine".to_string(),
2536 status: RoutineStatus::Active,
2537 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2538 timezone: "UTC".to_string(),
2539 misfire_policy: RoutineMisfirePolicy::RunOnce,
2540 entrypoint: "connector.email.reply".to_string(),
2541 args: serde_json::json!({}),
2542 allowed_tools: vec![],
2543 output_targets: vec![],
2544 creator_type: "user".to_string(),
2545 creator_id: "u-1".to_string(),
2546 requires_approval: true,
2547 external_integrations_allowed: false,
2548 next_fire_at_ms: None,
2549 last_fired_at_ms: None,
2550 };
2551
2552 let decision = evaluate_routine_execution_policy(&routine, "manual");
2553 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
2554 }
2555
2556 #[test]
2557 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
2558 let routine = RoutineSpec {
2559 routine_id: "routine-policy-2".to_string(),
2560 name: "Connector routine".to_string(),
2561 status: RoutineStatus::Active,
2562 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2563 timezone: "UTC".to_string(),
2564 misfire_policy: RoutineMisfirePolicy::RunOnce,
2565 entrypoint: "connector.email.reply".to_string(),
2566 args: serde_json::json!({}),
2567 allowed_tools: vec![],
2568 output_targets: vec![],
2569 creator_type: "user".to_string(),
2570 creator_id: "u-1".to_string(),
2571 requires_approval: true,
2572 external_integrations_allowed: true,
2573 next_fire_at_ms: None,
2574 last_fired_at_ms: None,
2575 };
2576
2577 let decision = evaluate_routine_execution_policy(&routine, "manual");
2578 assert!(matches!(
2579 decision,
2580 RoutineExecutionDecision::RequiresApproval { .. }
2581 ));
2582 }
2583
2584 #[test]
2585 fn routine_policy_allows_non_external_entrypoints() {
2586 let routine = RoutineSpec {
2587 routine_id: "routine-policy-3".to_string(),
2588 name: "Internal mission routine".to_string(),
2589 status: RoutineStatus::Active,
2590 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2591 timezone: "UTC".to_string(),
2592 misfire_policy: RoutineMisfirePolicy::RunOnce,
2593 entrypoint: "mission.default".to_string(),
2594 args: serde_json::json!({}),
2595 allowed_tools: vec![],
2596 output_targets: vec![],
2597 creator_type: "user".to_string(),
2598 creator_id: "u-1".to_string(),
2599 requires_approval: true,
2600 external_integrations_allowed: false,
2601 next_fire_at_ms: None,
2602 last_fired_at_ms: None,
2603 };
2604
2605 let decision = evaluate_routine_execution_policy(&routine, "manual");
2606 assert_eq!(decision, RoutineExecutionDecision::Allowed);
2607 }
2608
2609 #[tokio::test]
2610 async fn claim_next_queued_routine_run_marks_oldest_running() {
2611 let mut state = AppState::new_starting("routine-claim".to_string(), true);
2612 state.routine_runs_path = tmp_routines_file("routine-claim-runs");
2613
2614 let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
2615 run_id: run_id.to_string(),
2616 routine_id: "routine-claim".to_string(),
2617 trigger_type: "manual".to_string(),
2618 run_count: 1,
2619 status: RoutineRunStatus::Queued,
2620 created_at_ms,
2621 updated_at_ms: created_at_ms,
2622 fired_at_ms: Some(created_at_ms),
2623 started_at_ms: None,
2624 finished_at_ms: None,
2625 requires_approval: false,
2626 approval_reason: None,
2627 denial_reason: None,
2628 paused_reason: None,
2629 detail: None,
2630 entrypoint: "mission.default".to_string(),
2631 args: serde_json::json!({}),
2632 allowed_tools: vec![],
2633 output_targets: vec![],
2634 artifacts: vec![],
2635 };
2636
2637 {
2638 let mut guard = state.routine_runs.write().await;
2639 guard.insert("run-late".to_string(), mk("run-late", 2_000));
2640 guard.insert("run-early".to_string(), mk("run-early", 1_000));
2641 }
2642 state.persist_routine_runs().await.expect("persist");
2643
2644 let claimed = state
2645 .claim_next_queued_routine_run()
2646 .await
2647 .expect("claimed run");
2648 assert_eq!(claimed.run_id, "run-early");
2649 assert_eq!(claimed.status, RoutineRunStatus::Running);
2650 assert!(claimed.started_at_ms.is_some());
2651 }
2652
2653 #[tokio::test]
2654 async fn routine_session_policy_roundtrip_normalizes_tools() {
2655 let state = AppState::new_starting("routine-policy-hook".to_string(), true);
2656 state
2657 .set_routine_session_policy(
2658 "session-routine-1".to_string(),
2659 "run-1".to_string(),
2660 "routine-1".to_string(),
2661 vec![
2662 "read".to_string(),
2663 " mcp.arcade.search ".to_string(),
2664 "read".to_string(),
2665 "".to_string(),
2666 ],
2667 )
2668 .await;
2669
2670 let policy = state
2671 .routine_session_policy("session-routine-1")
2672 .await
2673 .expect("policy");
2674 assert_eq!(
2675 policy.allowed_tools,
2676 vec!["read".to_string(), "mcp.arcade.search".to_string()]
2677 );
2678 }
2679
2680 #[test]
2681 fn routine_mission_prompt_includes_orchestrated_contract() {
2682 let run = RoutineRunRecord {
2683 run_id: "run-orchestrated-1".to_string(),
2684 routine_id: "automation-orchestrated".to_string(),
2685 trigger_type: "manual".to_string(),
2686 run_count: 1,
2687 status: RoutineRunStatus::Queued,
2688 created_at_ms: 1_000,
2689 updated_at_ms: 1_000,
2690 fired_at_ms: Some(1_000),
2691 started_at_ms: None,
2692 finished_at_ms: None,
2693 requires_approval: true,
2694 approval_reason: None,
2695 denial_reason: None,
2696 paused_reason: None,
2697 detail: None,
2698 entrypoint: "mission.default".to_string(),
2699 args: serde_json::json!({
2700 "prompt": "Coordinate a multi-step release readiness check.",
2701 "mode": "orchestrated",
2702 "success_criteria": ["All blockers listed", "Output artifact written"],
2703 "orchestrator_only_tool_calls": true
2704 }),
2705 allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
2706 output_targets: vec!["file://reports/release-readiness.md".to_string()],
2707 artifacts: vec![],
2708 };
2709
2710 let objective = routine_objective_from_args(&run).expect("objective");
2711 let prompt = build_routine_mission_prompt(&run, &objective);
2712
2713 assert!(prompt.contains("Mode: orchestrated"));
2714 assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
2715 assert!(prompt.contains("only the orchestrator may execute tools"));
2716 assert!(prompt.contains("Allowed Tools: read, webfetch"));
2717 assert!(prompt.contains("file://reports/release-readiness.md"));
2718 }
2719
2720 #[test]
2721 fn routine_mission_prompt_includes_standalone_defaults() {
2722 let run = RoutineRunRecord {
2723 run_id: "run-standalone-1".to_string(),
2724 routine_id: "automation-standalone".to_string(),
2725 trigger_type: "manual".to_string(),
2726 run_count: 1,
2727 status: RoutineRunStatus::Queued,
2728 created_at_ms: 2_000,
2729 updated_at_ms: 2_000,
2730 fired_at_ms: Some(2_000),
2731 started_at_ms: None,
2732 finished_at_ms: None,
2733 requires_approval: false,
2734 approval_reason: None,
2735 denial_reason: None,
2736 paused_reason: None,
2737 detail: None,
2738 entrypoint: "mission.default".to_string(),
2739 args: serde_json::json!({
2740 "prompt": "Summarize top engineering updates.",
2741 "success_criteria": ["Three bullet summary"]
2742 }),
2743 allowed_tools: vec![],
2744 output_targets: vec![],
2745 artifacts: vec![],
2746 };
2747
2748 let objective = routine_objective_from_args(&run).expect("objective");
2749 let prompt = build_routine_mission_prompt(&run, &objective);
2750
2751 assert!(prompt.contains("Mode: standalone"));
2752 assert!(prompt.contains("Execution Pattern: Standalone mission run"));
2753 assert!(prompt.contains("Allowed Tools: all available by current policy"));
2754 assert!(prompt.contains("Output Targets: none configured"));
2755 }
2756}