1#![recursion_limit = "256"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
12use tandem_orchestrator::MissionState;
13use tandem_types::{
14 EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
15 SendMessageRequest, Session, ShellFamily,
16};
17use tokio::fs;
18use tokio::sync::RwLock;
19
20use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
21use tandem_core::{
22 AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus, PermissionManager,
23 PluginRegistry, Storage,
24};
25use tandem_providers::ProviderRegistry;
26use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
27use tandem_tools::ToolRegistry;
28
29mod agent_teams;
30mod http;
31pub mod webui;
32
33pub use agent_teams::AgentTeamRuntime;
34pub use http::serve;
35
36#[derive(Debug, Clone, Serialize, Deserialize, Default)]
37pub struct ChannelStatus {
38 pub enabled: bool,
39 pub connected: bool,
40 pub last_error: Option<String>,
41 pub active_sessions: u64,
42 pub meta: Value,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Default)]
46pub struct WebUiConfig {
47 #[serde(default)]
48 pub enabled: bool,
49 #[serde(default = "default_web_ui_prefix")]
50 pub path_prefix: String,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, Default)]
54pub struct ChannelsConfigFile {
55 pub telegram: Option<TelegramConfigFile>,
56 pub discord: Option<DiscordConfigFile>,
57 pub slack: Option<SlackConfigFile>,
58 #[serde(default)]
59 pub tool_policy: tandem_channels::config::ChannelToolPolicy,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TelegramConfigFile {
64 pub bot_token: String,
65 #[serde(default = "default_allow_all")]
66 pub allowed_users: Vec<String>,
67 #[serde(default)]
68 pub mention_only: bool,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct DiscordConfigFile {
73 pub bot_token: String,
74 #[serde(default)]
75 pub guild_id: Option<String>,
76 #[serde(default = "default_allow_all")]
77 pub allowed_users: Vec<String>,
78 #[serde(default = "default_discord_mention_only")]
79 pub mention_only: bool,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct SlackConfigFile {
84 pub bot_token: String,
85 pub channel_id: String,
86 #[serde(default = "default_allow_all")]
87 pub allowed_users: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, Default)]
91struct EffectiveAppConfig {
92 #[serde(default)]
93 pub channels: ChannelsConfigFile,
94 #[serde(default)]
95 pub web_ui: WebUiConfig,
96 #[serde(default)]
97 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
98}
99
100#[derive(Default)]
101pub struct ChannelRuntime {
102 pub listeners: Option<tokio::task::JoinSet<()>>,
103 pub statuses: std::collections::HashMap<String, ChannelStatus>,
104}
105
106#[derive(Debug, Clone)]
107pub struct EngineLease {
108 pub lease_id: String,
109 pub client_id: String,
110 pub client_type: String,
111 pub acquired_at_ms: u64,
112 pub last_renewed_at_ms: u64,
113 pub ttl_ms: u64,
114}
115
116impl EngineLease {
117 pub fn is_expired(&self, now_ms: u64) -> bool {
118 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
119 }
120}
121
122#[derive(Debug, Clone, Serialize)]
123pub struct ActiveRun {
124 #[serde(rename = "runID")]
125 pub run_id: String,
126 #[serde(rename = "startedAtMs")]
127 pub started_at_ms: u64,
128 #[serde(rename = "lastActivityAtMs")]
129 pub last_activity_at_ms: u64,
130 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
131 pub client_id: Option<String>,
132 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
133 pub agent_id: Option<String>,
134 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
135 pub agent_profile: Option<String>,
136}
137
138#[derive(Clone, Default)]
139pub struct RunRegistry {
140 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
141}
142
143impl RunRegistry {
144 pub fn new() -> Self {
145 Self::default()
146 }
147
148 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
149 self.active.read().await.get(session_id).cloned()
150 }
151
152 pub async fn acquire(
153 &self,
154 session_id: &str,
155 run_id: String,
156 client_id: Option<String>,
157 agent_id: Option<String>,
158 agent_profile: Option<String>,
159 ) -> std::result::Result<ActiveRun, ActiveRun> {
160 let mut guard = self.active.write().await;
161 if let Some(existing) = guard.get(session_id).cloned() {
162 return Err(existing);
163 }
164 let now = now_ms();
165 let run = ActiveRun {
166 run_id,
167 started_at_ms: now,
168 last_activity_at_ms: now,
169 client_id,
170 agent_id,
171 agent_profile,
172 };
173 guard.insert(session_id.to_string(), run.clone());
174 Ok(run)
175 }
176
177 pub async fn touch(&self, session_id: &str, run_id: &str) {
178 let mut guard = self.active.write().await;
179 if let Some(run) = guard.get_mut(session_id) {
180 if run.run_id == run_id {
181 run.last_activity_at_ms = now_ms();
182 }
183 }
184 }
185
186 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
187 let mut guard = self.active.write().await;
188 if let Some(run) = guard.get(session_id) {
189 if run.run_id == run_id {
190 return guard.remove(session_id);
191 }
192 }
193 None
194 }
195
196 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
197 self.active.write().await.remove(session_id)
198 }
199
200 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
201 let now = now_ms();
202 let mut guard = self.active.write().await;
203 let stale_ids = guard
204 .iter()
205 .filter_map(|(session_id, run)| {
206 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
207 Some(session_id.clone())
208 } else {
209 None
210 }
211 })
212 .collect::<Vec<_>>();
213 let mut out = Vec::with_capacity(stale_ids.len());
214 for session_id in stale_ids {
215 if let Some(run) = guard.remove(&session_id) {
216 out.push((session_id, run));
217 }
218 }
219 out
220 }
221}
222
223pub fn now_ms() -> u64 {
224 SystemTime::now()
225 .duration_since(UNIX_EPOCH)
226 .map(|d| d.as_millis() as u64)
227 .unwrap_or(0)
228}
229
230pub fn build_id() -> String {
231 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
232 let trimmed = explicit.trim();
233 if !trimmed.is_empty() {
234 return trimmed.to_string();
235 }
236 }
237 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
238 let trimmed = git_sha.trim();
239 if !trimmed.is_empty() {
240 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
241 }
242 }
243 env!("CARGO_PKG_VERSION").to_string()
244}
245
246pub fn detect_host_runtime_context() -> HostRuntimeContext {
247 let os = if cfg!(target_os = "windows") {
248 HostOs::Windows
249 } else if cfg!(target_os = "macos") {
250 HostOs::Macos
251 } else {
252 HostOs::Linux
253 };
254 let (shell_family, path_style) = match os {
255 HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
256 HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
257 };
258 HostRuntimeContext {
259 os,
260 arch: std::env::consts::ARCH.to_string(),
261 shell_family,
262 path_style,
263 }
264}
265
266pub fn binary_path_for_health() -> Option<String> {
267 #[cfg(debug_assertions)]
268 {
269 std::env::current_exe()
270 .ok()
271 .map(|p| p.to_string_lossy().to_string())
272 }
273 #[cfg(not(debug_assertions))]
274 {
275 None
276 }
277}
278
279#[derive(Clone)]
280pub struct RuntimeState {
281 pub storage: Arc<Storage>,
282 pub config: ConfigStore,
283 pub event_bus: EventBus,
284 pub providers: ProviderRegistry,
285 pub plugins: PluginRegistry,
286 pub agents: AgentRegistry,
287 pub tools: ToolRegistry,
288 pub permissions: PermissionManager,
289 pub mcp: McpRegistry,
290 pub pty: PtyManager,
291 pub lsp: LspManager,
292 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
293 pub logs: Arc<RwLock<Vec<Value>>>,
294 pub workspace_index: WorkspaceIndex,
295 pub cancellations: CancellationRegistry,
296 pub engine_loop: EngineLoop,
297 pub host_runtime_context: HostRuntimeContext,
298}
299
300#[derive(Debug, Clone)]
301pub struct GovernedMemoryRecord {
302 pub id: String,
303 pub run_id: String,
304 pub partition: MemoryPartition,
305 pub kind: MemoryContentKind,
306 pub content: String,
307 pub artifact_refs: Vec<String>,
308 pub classification: MemoryClassification,
309 pub metadata: Option<Value>,
310 pub source_memory_id: Option<String>,
311 pub created_at_ms: u64,
312}
313
314#[derive(Debug, Clone, Serialize)]
315pub struct MemoryAuditEvent {
316 pub audit_id: String,
317 pub action: String,
318 pub run_id: String,
319 pub memory_id: Option<String>,
320 pub source_memory_id: Option<String>,
321 pub to_tier: Option<GovernedMemoryTier>,
322 pub partition_key: String,
323 pub actor: String,
324 pub status: String,
325 #[serde(skip_serializing_if = "Option::is_none")]
326 pub detail: Option<String>,
327 pub created_at_ms: u64,
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct SharedResourceRecord {
332 pub key: String,
333 pub value: Value,
334 pub rev: u64,
335 pub updated_at_ms: u64,
336 pub updated_by: String,
337 #[serde(skip_serializing_if = "Option::is_none")]
338 pub ttl_ms: Option<u64>,
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
342#[serde(rename_all = "snake_case")]
343pub enum RoutineSchedule {
344 IntervalSeconds { seconds: u64 },
345 Cron { expression: String },
346}
347
348#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
349#[serde(rename_all = "snake_case", tag = "type")]
350pub enum RoutineMisfirePolicy {
351 Skip,
352 RunOnce,
353 CatchUp { max_runs: u32 },
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
357#[serde(rename_all = "snake_case")]
358pub enum RoutineStatus {
359 Active,
360 Paused,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct RoutineSpec {
365 pub routine_id: String,
366 pub name: String,
367 pub status: RoutineStatus,
368 pub schedule: RoutineSchedule,
369 pub timezone: String,
370 pub misfire_policy: RoutineMisfirePolicy,
371 pub entrypoint: String,
372 #[serde(default)]
373 pub args: Value,
374 #[serde(default)]
375 pub allowed_tools: Vec<String>,
376 #[serde(default)]
377 pub output_targets: Vec<String>,
378 pub creator_type: String,
379 pub creator_id: String,
380 pub requires_approval: bool,
381 pub external_integrations_allowed: bool,
382 #[serde(default, skip_serializing_if = "Option::is_none")]
383 pub next_fire_at_ms: Option<u64>,
384 #[serde(default, skip_serializing_if = "Option::is_none")]
385 pub last_fired_at_ms: Option<u64>,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct RoutineHistoryEvent {
390 pub routine_id: String,
391 pub trigger_type: String,
392 pub run_count: u32,
393 pub fired_at_ms: u64,
394 pub status: String,
395 #[serde(default, skip_serializing_if = "Option::is_none")]
396 pub detail: Option<String>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineRunStatus {
402 Queued,
403 PendingApproval,
404 Running,
405 Paused,
406 BlockedPolicy,
407 Denied,
408 Completed,
409 Failed,
410 Cancelled,
411}
412
413#[derive(Debug, Clone, Serialize, Deserialize)]
414pub struct RoutineRunArtifact {
415 pub artifact_id: String,
416 pub uri: String,
417 pub kind: String,
418 #[serde(default, skip_serializing_if = "Option::is_none")]
419 pub label: Option<String>,
420 pub created_at_ms: u64,
421 #[serde(default, skip_serializing_if = "Option::is_none")]
422 pub metadata: Option<Value>,
423}
424
425#[derive(Debug, Clone, Serialize, Deserialize)]
426pub struct RoutineRunRecord {
427 pub run_id: String,
428 pub routine_id: String,
429 pub trigger_type: String,
430 pub run_count: u32,
431 pub status: RoutineRunStatus,
432 pub created_at_ms: u64,
433 pub updated_at_ms: u64,
434 #[serde(default, skip_serializing_if = "Option::is_none")]
435 pub fired_at_ms: Option<u64>,
436 #[serde(default, skip_serializing_if = "Option::is_none")]
437 pub started_at_ms: Option<u64>,
438 #[serde(default, skip_serializing_if = "Option::is_none")]
439 pub finished_at_ms: Option<u64>,
440 pub requires_approval: bool,
441 #[serde(default, skip_serializing_if = "Option::is_none")]
442 pub approval_reason: Option<String>,
443 #[serde(default, skip_serializing_if = "Option::is_none")]
444 pub denial_reason: Option<String>,
445 #[serde(default, skip_serializing_if = "Option::is_none")]
446 pub paused_reason: Option<String>,
447 #[serde(default, skip_serializing_if = "Option::is_none")]
448 pub detail: Option<String>,
449 pub entrypoint: String,
450 #[serde(default)]
451 pub args: Value,
452 #[serde(default)]
453 pub allowed_tools: Vec<String>,
454 #[serde(default)]
455 pub output_targets: Vec<String>,
456 #[serde(default)]
457 pub artifacts: Vec<RoutineRunArtifact>,
458}
459
460#[derive(Debug, Clone)]
461pub struct RoutineSessionPolicy {
462 pub session_id: String,
463 pub run_id: String,
464 pub routine_id: String,
465 pub allowed_tools: Vec<String>,
466}
467
468#[derive(Debug, Clone, Serialize)]
469pub struct RoutineTriggerPlan {
470 pub routine_id: String,
471 pub run_count: u32,
472 pub scheduled_at_ms: u64,
473 pub next_fire_at_ms: u64,
474}
475
476#[derive(Debug, Clone, Serialize)]
477pub struct ResourceConflict {
478 pub key: String,
479 pub expected_rev: Option<u64>,
480 pub current_rev: Option<u64>,
481}
482
483#[derive(Debug, Clone, Serialize)]
484#[serde(tag = "type", rename_all = "snake_case")]
485pub enum ResourceStoreError {
486 InvalidKey { key: String },
487 RevisionConflict(ResourceConflict),
488 PersistFailed { message: String },
489}
490
491#[derive(Debug, Clone, Serialize)]
492#[serde(tag = "type", rename_all = "snake_case")]
493pub enum RoutineStoreError {
494 InvalidRoutineId { routine_id: String },
495 InvalidSchedule { detail: String },
496 PersistFailed { message: String },
497}
498
499#[derive(Debug, Clone)]
500pub enum StartupStatus {
501 Starting,
502 Ready,
503 Failed,
504}
505
506#[derive(Debug, Clone)]
507pub struct StartupState {
508 pub status: StartupStatus,
509 pub phase: String,
510 pub started_at_ms: u64,
511 pub attempt_id: String,
512 pub last_error: Option<String>,
513}
514
515#[derive(Debug, Clone)]
516pub struct StartupSnapshot {
517 pub status: StartupStatus,
518 pub phase: String,
519 pub started_at_ms: u64,
520 pub attempt_id: String,
521 pub last_error: Option<String>,
522 pub elapsed_ms: u64,
523}
524
525#[derive(Clone)]
526pub struct AppState {
527 pub runtime: Arc<OnceLock<RuntimeState>>,
528 pub startup: Arc<RwLock<StartupState>>,
529 pub in_process_mode: Arc<AtomicBool>,
530 pub api_token: Arc<RwLock<Option<String>>>,
531 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
532 pub run_registry: RunRegistry,
533 pub run_stale_ms: u64,
534 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
535 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
536 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
537 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
538 pub shared_resources_path: PathBuf,
539 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
540 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
541 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
542 pub routine_session_policies:
543 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
544 pub routines_path: PathBuf,
545 pub routine_history_path: PathBuf,
546 pub routine_runs_path: PathBuf,
547 pub agent_teams: AgentTeamRuntime,
548 pub web_ui_enabled: Arc<AtomicBool>,
549 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
550 pub server_base_url: Arc<std::sync::RwLock<String>>,
551 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
552 pub host_runtime_context: HostRuntimeContext,
553}
554
555#[derive(Debug, Clone)]
556struct StatusIndexUpdate {
557 key: String,
558 value: Value,
559}
560
561impl AppState {
562 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
563 Self {
564 runtime: Arc::new(OnceLock::new()),
565 startup: Arc::new(RwLock::new(StartupState {
566 status: StartupStatus::Starting,
567 phase: "boot".to_string(),
568 started_at_ms: now_ms(),
569 attempt_id,
570 last_error: None,
571 })),
572 in_process_mode: Arc::new(AtomicBool::new(in_process)),
573 api_token: Arc::new(RwLock::new(None)),
574 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
575 run_registry: RunRegistry::new(),
576 run_stale_ms: resolve_run_stale_ms(),
577 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
578 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
579 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
580 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
581 shared_resources_path: resolve_shared_resources_path(),
582 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
583 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
584 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
585 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
586 routines_path: resolve_routines_path(),
587 routine_history_path: resolve_routine_history_path(),
588 routine_runs_path: resolve_routine_runs_path(),
589 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
590 web_ui_enabled: Arc::new(AtomicBool::new(false)),
591 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
592 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
593 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
594 host_runtime_context: detect_host_runtime_context(),
595 }
596 }
597
598 pub fn is_ready(&self) -> bool {
599 self.runtime.get().is_some()
600 }
601
602 pub fn mode_label(&self) -> &'static str {
603 if self.in_process_mode.load(Ordering::Relaxed) {
604 "in-process"
605 } else {
606 "sidecar"
607 }
608 }
609
610 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
611 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
612 if let Ok(mut guard) = self.web_ui_prefix.write() {
613 *guard = normalize_web_ui_prefix(&prefix);
614 }
615 }
616
617 pub fn web_ui_enabled(&self) -> bool {
618 self.web_ui_enabled.load(Ordering::Relaxed)
619 }
620
621 pub fn web_ui_prefix(&self) -> String {
622 self.web_ui_prefix
623 .read()
624 .map(|v| v.clone())
625 .unwrap_or_else(|_| "/admin".to_string())
626 }
627
628 pub fn set_server_base_url(&self, base_url: String) {
629 if let Ok(mut guard) = self.server_base_url.write() {
630 *guard = base_url;
631 }
632 }
633
634 pub fn server_base_url(&self) -> String {
635 self.server_base_url
636 .read()
637 .map(|v| v.clone())
638 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
639 }
640
641 pub async fn api_token(&self) -> Option<String> {
642 self.api_token.read().await.clone()
643 }
644
645 pub async fn set_api_token(&self, token: Option<String>) {
646 *self.api_token.write().await = token;
647 }
648
649 pub async fn startup_snapshot(&self) -> StartupSnapshot {
650 let state = self.startup.read().await.clone();
651 StartupSnapshot {
652 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
653 status: state.status,
654 phase: state.phase,
655 started_at_ms: state.started_at_ms,
656 attempt_id: state.attempt_id,
657 last_error: state.last_error,
658 }
659 }
660
661 pub fn host_runtime_context(&self) -> HostRuntimeContext {
662 self.runtime
663 .get()
664 .map(|runtime| runtime.host_runtime_context.clone())
665 .unwrap_or_else(|| self.host_runtime_context.clone())
666 }
667
668 pub async fn set_phase(&self, phase: impl Into<String>) {
669 let mut startup = self.startup.write().await;
670 startup.phase = phase.into();
671 }
672
673 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
674 self.runtime
675 .set(runtime)
676 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
677 self.engine_loop
678 .set_spawn_agent_hook(std::sync::Arc::new(
679 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
680 ))
681 .await;
682 self.engine_loop
683 .set_tool_policy_hook(std::sync::Arc::new(
684 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
685 ))
686 .await;
687 let _ = self.load_shared_resources().await;
688 let _ = self.load_routines().await;
689 let _ = self.load_routine_history().await;
690 let _ = self.load_routine_runs().await;
691 let workspace_root = self.workspace_index.snapshot().await.root;
692 let _ = self
693 .agent_teams
694 .ensure_loaded_for_workspace(&workspace_root)
695 .await;
696 let mut startup = self.startup.write().await;
697 startup.status = StartupStatus::Ready;
698 startup.phase = "ready".to_string();
699 startup.last_error = None;
700 Ok(())
701 }
702
703 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
704 let mut startup = self.startup.write().await;
705 startup.status = StartupStatus::Failed;
706 startup.phase = phase.into();
707 startup.last_error = Some(error.into());
708 }
709
710 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
711 let runtime = self.channels_runtime.lock().await;
712 runtime.statuses.clone()
713 }
714
715 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
716 let effective = self.config.get_effective_value().await;
717 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
718 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
719
720 let mut runtime = self.channels_runtime.lock().await;
721 if let Some(listeners) = runtime.listeners.as_mut() {
722 listeners.abort_all();
723 }
724 runtime.listeners = None;
725 runtime.statuses.clear();
726
727 let mut status_map = std::collections::HashMap::new();
728 status_map.insert(
729 "telegram".to_string(),
730 ChannelStatus {
731 enabled: parsed.channels.telegram.is_some(),
732 connected: false,
733 last_error: None,
734 active_sessions: 0,
735 meta: serde_json::json!({}),
736 },
737 );
738 status_map.insert(
739 "discord".to_string(),
740 ChannelStatus {
741 enabled: parsed.channels.discord.is_some(),
742 connected: false,
743 last_error: None,
744 active_sessions: 0,
745 meta: serde_json::json!({}),
746 },
747 );
748 status_map.insert(
749 "slack".to_string(),
750 ChannelStatus {
751 enabled: parsed.channels.slack.is_some(),
752 connected: false,
753 last_error: None,
754 active_sessions: 0,
755 meta: serde_json::json!({}),
756 },
757 );
758
759 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
760 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
761 runtime.listeners = Some(listeners);
762 for status in status_map.values_mut() {
763 if status.enabled {
764 status.connected = true;
765 }
766 }
767 }
768
769 runtime.statuses = status_map.clone();
770 drop(runtime);
771
772 self.event_bus.publish(EngineEvent::new(
773 "channel.status.changed",
774 serde_json::json!({ "channels": status_map }),
775 ));
776 Ok(())
777 }
778
779 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
780 if !self.shared_resources_path.exists() {
781 return Ok(());
782 }
783 let raw = fs::read_to_string(&self.shared_resources_path).await?;
784 let parsed =
785 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
786 .unwrap_or_default();
787 let mut guard = self.shared_resources.write().await;
788 *guard = parsed;
789 Ok(())
790 }
791
792 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
793 if let Some(parent) = self.shared_resources_path.parent() {
794 fs::create_dir_all(parent).await?;
795 }
796 let payload = {
797 let guard = self.shared_resources.read().await;
798 serde_json::to_string_pretty(&*guard)?
799 };
800 fs::write(&self.shared_resources_path, payload).await?;
801 Ok(())
802 }
803
804 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
805 self.shared_resources.read().await.get(key).cloned()
806 }
807
808 pub async fn list_shared_resources(
809 &self,
810 prefix: Option<&str>,
811 limit: usize,
812 ) -> Vec<SharedResourceRecord> {
813 let limit = limit.clamp(1, 500);
814 let mut rows = self
815 .shared_resources
816 .read()
817 .await
818 .values()
819 .filter(|record| {
820 if let Some(prefix) = prefix {
821 record.key.starts_with(prefix)
822 } else {
823 true
824 }
825 })
826 .cloned()
827 .collect::<Vec<_>>();
828 rows.sort_by(|a, b| a.key.cmp(&b.key));
829 rows.truncate(limit);
830 rows
831 }
832
833 pub async fn put_shared_resource(
834 &self,
835 key: String,
836 value: Value,
837 if_match_rev: Option<u64>,
838 updated_by: String,
839 ttl_ms: Option<u64>,
840 ) -> Result<SharedResourceRecord, ResourceStoreError> {
841 if !is_valid_resource_key(&key) {
842 return Err(ResourceStoreError::InvalidKey { key });
843 }
844
845 let now = now_ms();
846 let mut guard = self.shared_resources.write().await;
847 let existing = guard.get(&key).cloned();
848
849 if let Some(expected) = if_match_rev {
850 let current = existing.as_ref().map(|row| row.rev);
851 if current != Some(expected) {
852 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
853 key,
854 expected_rev: Some(expected),
855 current_rev: current,
856 }));
857 }
858 }
859
860 let next_rev = existing
861 .as_ref()
862 .map(|row| row.rev.saturating_add(1))
863 .unwrap_or(1);
864
865 let record = SharedResourceRecord {
866 key: key.clone(),
867 value,
868 rev: next_rev,
869 updated_at_ms: now,
870 updated_by,
871 ttl_ms,
872 };
873
874 let previous = guard.insert(key.clone(), record.clone());
875 drop(guard);
876
877 if let Err(error) = self.persist_shared_resources().await {
878 let mut rollback = self.shared_resources.write().await;
879 if let Some(previous) = previous {
880 rollback.insert(key, previous);
881 } else {
882 rollback.remove(&key);
883 }
884 return Err(ResourceStoreError::PersistFailed {
885 message: error.to_string(),
886 });
887 }
888
889 Ok(record)
890 }
891
892 pub async fn delete_shared_resource(
893 &self,
894 key: &str,
895 if_match_rev: Option<u64>,
896 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
897 if !is_valid_resource_key(key) {
898 return Err(ResourceStoreError::InvalidKey {
899 key: key.to_string(),
900 });
901 }
902
903 let mut guard = self.shared_resources.write().await;
904 let current = guard.get(key).cloned();
905 if let Some(expected) = if_match_rev {
906 let current_rev = current.as_ref().map(|row| row.rev);
907 if current_rev != Some(expected) {
908 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
909 key: key.to_string(),
910 expected_rev: Some(expected),
911 current_rev,
912 }));
913 }
914 }
915
916 let removed = guard.remove(key);
917 drop(guard);
918
919 if let Err(error) = self.persist_shared_resources().await {
920 if let Some(record) = removed.clone() {
921 self.shared_resources
922 .write()
923 .await
924 .insert(record.key.clone(), record);
925 }
926 return Err(ResourceStoreError::PersistFailed {
927 message: error.to_string(),
928 });
929 }
930
931 Ok(removed)
932 }
933
934 pub async fn load_routines(&self) -> anyhow::Result<()> {
935 if !self.routines_path.exists() {
936 return Ok(());
937 }
938 let raw = fs::read_to_string(&self.routines_path).await?;
939 let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
940 .unwrap_or_default();
941 let mut guard = self.routines.write().await;
942 *guard = parsed;
943 Ok(())
944 }
945
946 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
947 if !self.routine_history_path.exists() {
948 return Ok(());
949 }
950 let raw = fs::read_to_string(&self.routine_history_path).await?;
951 let parsed = serde_json::from_str::<
952 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
953 >(&raw)
954 .unwrap_or_default();
955 let mut guard = self.routine_history.write().await;
956 *guard = parsed;
957 Ok(())
958 }
959
960 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
961 if !self.routine_runs_path.exists() {
962 return Ok(());
963 }
964 let raw = fs::read_to_string(&self.routine_runs_path).await?;
965 let parsed =
966 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
967 .unwrap_or_default();
968 let mut guard = self.routine_runs.write().await;
969 *guard = parsed;
970 Ok(())
971 }
972
973 pub async fn persist_routines(&self) -> anyhow::Result<()> {
974 if let Some(parent) = self.routines_path.parent() {
975 fs::create_dir_all(parent).await?;
976 }
977 let payload = {
978 let guard = self.routines.read().await;
979 serde_json::to_string_pretty(&*guard)?
980 };
981 fs::write(&self.routines_path, payload).await?;
982 Ok(())
983 }
984
985 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
986 if let Some(parent) = self.routine_history_path.parent() {
987 fs::create_dir_all(parent).await?;
988 }
989 let payload = {
990 let guard = self.routine_history.read().await;
991 serde_json::to_string_pretty(&*guard)?
992 };
993 fs::write(&self.routine_history_path, payload).await?;
994 Ok(())
995 }
996
997 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
998 if let Some(parent) = self.routine_runs_path.parent() {
999 fs::create_dir_all(parent).await?;
1000 }
1001 let payload = {
1002 let guard = self.routine_runs.read().await;
1003 serde_json::to_string_pretty(&*guard)?
1004 };
1005 fs::write(&self.routine_runs_path, payload).await?;
1006 Ok(())
1007 }
1008
1009 pub async fn put_routine(
1010 &self,
1011 mut routine: RoutineSpec,
1012 ) -> Result<RoutineSpec, RoutineStoreError> {
1013 if routine.routine_id.trim().is_empty() {
1014 return Err(RoutineStoreError::InvalidRoutineId {
1015 routine_id: routine.routine_id,
1016 });
1017 }
1018
1019 routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1020 routine.output_targets = normalize_non_empty_list(routine.output_targets);
1021
1022 let interval = match routine.schedule {
1023 RoutineSchedule::IntervalSeconds { seconds } => {
1024 if seconds == 0 {
1025 return Err(RoutineStoreError::InvalidSchedule {
1026 detail: "interval_seconds must be > 0".to_string(),
1027 });
1028 }
1029 Some(seconds)
1030 }
1031 RoutineSchedule::Cron { .. } => None,
1032 };
1033 if routine.next_fire_at_ms.is_none() {
1034 routine.next_fire_at_ms = Some(now_ms().saturating_add(interval.unwrap_or(60) * 1000));
1035 }
1036
1037 let mut guard = self.routines.write().await;
1038 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1039 drop(guard);
1040
1041 if let Err(error) = self.persist_routines().await {
1042 let mut rollback = self.routines.write().await;
1043 if let Some(previous) = previous {
1044 rollback.insert(previous.routine_id.clone(), previous);
1045 } else {
1046 rollback.remove(&routine.routine_id);
1047 }
1048 return Err(RoutineStoreError::PersistFailed {
1049 message: error.to_string(),
1050 });
1051 }
1052
1053 Ok(routine)
1054 }
1055
1056 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1057 let mut rows = self
1058 .routines
1059 .read()
1060 .await
1061 .values()
1062 .cloned()
1063 .collect::<Vec<_>>();
1064 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1065 rows
1066 }
1067
1068 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1069 self.routines.read().await.get(routine_id).cloned()
1070 }
1071
1072 pub async fn delete_routine(
1073 &self,
1074 routine_id: &str,
1075 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1076 let mut guard = self.routines.write().await;
1077 let removed = guard.remove(routine_id);
1078 drop(guard);
1079
1080 if let Err(error) = self.persist_routines().await {
1081 if let Some(removed) = removed.clone() {
1082 self.routines
1083 .write()
1084 .await
1085 .insert(removed.routine_id.clone(), removed);
1086 }
1087 return Err(RoutineStoreError::PersistFailed {
1088 message: error.to_string(),
1089 });
1090 }
1091 Ok(removed)
1092 }
1093
1094 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1095 let mut plans = Vec::new();
1096 let mut guard = self.routines.write().await;
1097 for routine in guard.values_mut() {
1098 if routine.status != RoutineStatus::Active {
1099 continue;
1100 }
1101 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1102 continue;
1103 };
1104 let Some(interval_ms) = routine_interval_ms(&routine.schedule) else {
1105 continue;
1106 };
1107 if now_ms < next_fire_at_ms {
1108 continue;
1109 }
1110 let (run_count, next_fire_at_ms) = compute_misfire_plan(
1111 now_ms,
1112 next_fire_at_ms,
1113 interval_ms,
1114 &routine.misfire_policy,
1115 );
1116 routine.next_fire_at_ms = Some(next_fire_at_ms);
1117 if run_count == 0 {
1118 continue;
1119 }
1120 plans.push(RoutineTriggerPlan {
1121 routine_id: routine.routine_id.clone(),
1122 run_count,
1123 scheduled_at_ms: now_ms,
1124 next_fire_at_ms,
1125 });
1126 }
1127 drop(guard);
1128 let _ = self.persist_routines().await;
1129 plans
1130 }
1131
1132 pub async fn mark_routine_fired(
1133 &self,
1134 routine_id: &str,
1135 fired_at_ms: u64,
1136 ) -> Option<RoutineSpec> {
1137 let mut guard = self.routines.write().await;
1138 let routine = guard.get_mut(routine_id)?;
1139 routine.last_fired_at_ms = Some(fired_at_ms);
1140 let updated = routine.clone();
1141 drop(guard);
1142 let _ = self.persist_routines().await;
1143 Some(updated)
1144 }
1145
1146 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1147 let mut history = self.routine_history.write().await;
1148 history
1149 .entry(event.routine_id.clone())
1150 .or_default()
1151 .push(event);
1152 drop(history);
1153 let _ = self.persist_routine_history().await;
1154 }
1155
1156 pub async fn list_routine_history(
1157 &self,
1158 routine_id: &str,
1159 limit: usize,
1160 ) -> Vec<RoutineHistoryEvent> {
1161 let limit = limit.clamp(1, 500);
1162 let mut rows = self
1163 .routine_history
1164 .read()
1165 .await
1166 .get(routine_id)
1167 .cloned()
1168 .unwrap_or_default();
1169 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1170 rows.truncate(limit);
1171 rows
1172 }
1173
1174 pub async fn create_routine_run(
1175 &self,
1176 routine: &RoutineSpec,
1177 trigger_type: &str,
1178 run_count: u32,
1179 status: RoutineRunStatus,
1180 detail: Option<String>,
1181 ) -> RoutineRunRecord {
1182 let now = now_ms();
1183 let record = RoutineRunRecord {
1184 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1185 routine_id: routine.routine_id.clone(),
1186 trigger_type: trigger_type.to_string(),
1187 run_count,
1188 status,
1189 created_at_ms: now,
1190 updated_at_ms: now,
1191 fired_at_ms: Some(now),
1192 started_at_ms: None,
1193 finished_at_ms: None,
1194 requires_approval: routine.requires_approval,
1195 approval_reason: None,
1196 denial_reason: None,
1197 paused_reason: None,
1198 detail,
1199 entrypoint: routine.entrypoint.clone(),
1200 args: routine.args.clone(),
1201 allowed_tools: routine.allowed_tools.clone(),
1202 output_targets: routine.output_targets.clone(),
1203 artifacts: Vec::new(),
1204 };
1205 self.routine_runs
1206 .write()
1207 .await
1208 .insert(record.run_id.clone(), record.clone());
1209 let _ = self.persist_routine_runs().await;
1210 record
1211 }
1212
1213 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1214 self.routine_runs.read().await.get(run_id).cloned()
1215 }
1216
1217 pub async fn list_routine_runs(
1218 &self,
1219 routine_id: Option<&str>,
1220 limit: usize,
1221 ) -> Vec<RoutineRunRecord> {
1222 let mut rows = self
1223 .routine_runs
1224 .read()
1225 .await
1226 .values()
1227 .filter(|row| {
1228 if let Some(id) = routine_id {
1229 row.routine_id == id
1230 } else {
1231 true
1232 }
1233 })
1234 .cloned()
1235 .collect::<Vec<_>>();
1236 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1237 rows.truncate(limit.clamp(1, 500));
1238 rows
1239 }
1240
1241 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1242 let mut guard = self.routine_runs.write().await;
1243 let next_run_id = guard
1244 .values()
1245 .filter(|row| row.status == RoutineRunStatus::Queued)
1246 .min_by(|a, b| {
1247 a.created_at_ms
1248 .cmp(&b.created_at_ms)
1249 .then_with(|| a.run_id.cmp(&b.run_id))
1250 })
1251 .map(|row| row.run_id.clone())?;
1252 let now = now_ms();
1253 let row = guard.get_mut(&next_run_id)?;
1254 row.status = RoutineRunStatus::Running;
1255 row.updated_at_ms = now;
1256 row.started_at_ms = Some(now);
1257 let claimed = row.clone();
1258 drop(guard);
1259 let _ = self.persist_routine_runs().await;
1260 Some(claimed)
1261 }
1262
1263 pub async fn set_routine_session_policy(
1264 &self,
1265 session_id: String,
1266 run_id: String,
1267 routine_id: String,
1268 allowed_tools: Vec<String>,
1269 ) {
1270 let policy = RoutineSessionPolicy {
1271 session_id: session_id.clone(),
1272 run_id,
1273 routine_id,
1274 allowed_tools: normalize_allowed_tools(allowed_tools),
1275 };
1276 self.routine_session_policies
1277 .write()
1278 .await
1279 .insert(session_id, policy);
1280 }
1281
1282 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1283 self.routine_session_policies
1284 .read()
1285 .await
1286 .get(session_id)
1287 .cloned()
1288 }
1289
1290 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1291 self.routine_session_policies
1292 .write()
1293 .await
1294 .remove(session_id);
1295 }
1296
1297 pub async fn update_routine_run_status(
1298 &self,
1299 run_id: &str,
1300 status: RoutineRunStatus,
1301 reason: Option<String>,
1302 ) -> Option<RoutineRunRecord> {
1303 let mut guard = self.routine_runs.write().await;
1304 let row = guard.get_mut(run_id)?;
1305 row.status = status.clone();
1306 row.updated_at_ms = now_ms();
1307 match status {
1308 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1309 RoutineRunStatus::Running => {
1310 row.started_at_ms.get_or_insert_with(now_ms);
1311 if let Some(detail) = reason {
1312 row.detail = Some(detail);
1313 }
1314 }
1315 RoutineRunStatus::Denied => row.denial_reason = reason,
1316 RoutineRunStatus::Paused => row.paused_reason = reason,
1317 RoutineRunStatus::Completed
1318 | RoutineRunStatus::Failed
1319 | RoutineRunStatus::Cancelled => {
1320 row.finished_at_ms = Some(now_ms());
1321 if let Some(detail) = reason {
1322 row.detail = Some(detail);
1323 }
1324 }
1325 _ => {
1326 if let Some(detail) = reason {
1327 row.detail = Some(detail);
1328 }
1329 }
1330 }
1331 let updated = row.clone();
1332 drop(guard);
1333 let _ = self.persist_routine_runs().await;
1334 Some(updated)
1335 }
1336
1337 pub async fn append_routine_run_artifact(
1338 &self,
1339 run_id: &str,
1340 artifact: RoutineRunArtifact,
1341 ) -> Option<RoutineRunRecord> {
1342 let mut guard = self.routine_runs.write().await;
1343 let row = guard.get_mut(run_id)?;
1344 row.updated_at_ms = now_ms();
1345 row.artifacts.push(artifact);
1346 let updated = row.clone();
1347 drop(guard);
1348 let _ = self.persist_routine_runs().await;
1349 Some(updated)
1350 }
1351}
1352
1353async fn build_channels_config(
1354 state: &AppState,
1355 channels: &ChannelsConfigFile,
1356) -> Option<ChannelsConfig> {
1357 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1358 return None;
1359 }
1360 Some(ChannelsConfig {
1361 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1362 bot_token: cfg.bot_token,
1363 allowed_users: cfg.allowed_users,
1364 mention_only: cfg.mention_only,
1365 }),
1366 discord: channels.discord.clone().map(|cfg| DiscordConfig {
1367 bot_token: cfg.bot_token,
1368 guild_id: cfg.guild_id,
1369 allowed_users: cfg.allowed_users,
1370 mention_only: cfg.mention_only,
1371 }),
1372 slack: channels.slack.clone().map(|cfg| SlackConfig {
1373 bot_token: cfg.bot_token,
1374 channel_id: cfg.channel_id,
1375 allowed_users: cfg.allowed_users,
1376 }),
1377 server_base_url: state.server_base_url(),
1378 api_token: state.api_token().await.unwrap_or_default(),
1379 tool_policy: channels.tool_policy.clone(),
1380 })
1381}
1382
1383fn normalize_web_ui_prefix(prefix: &str) -> String {
1384 let trimmed = prefix.trim();
1385 if trimmed.is_empty() || trimmed == "/" {
1386 return "/admin".to_string();
1387 }
1388 let with_leading = if trimmed.starts_with('/') {
1389 trimmed.to_string()
1390 } else {
1391 format!("/{trimmed}")
1392 };
1393 with_leading.trim_end_matches('/').to_string()
1394}
1395
1396fn default_web_ui_prefix() -> String {
1397 "/admin".to_string()
1398}
1399
1400fn default_allow_all() -> Vec<String> {
1401 vec!["*".to_string()]
1402}
1403
1404fn default_discord_mention_only() -> bool {
1405 true
1406}
1407
1408fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
1409 normalize_non_empty_list(raw)
1410}
1411
1412fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1413 let mut out = Vec::new();
1414 let mut seen = std::collections::HashSet::new();
1415 for item in raw {
1416 let normalized = item.trim().to_string();
1417 if normalized.is_empty() {
1418 continue;
1419 }
1420 if seen.insert(normalized.clone()) {
1421 out.push(normalized);
1422 }
1423 }
1424 out
1425}
1426
1427fn resolve_run_stale_ms() -> u64 {
1428 std::env::var("TANDEM_RUN_STALE_MS")
1429 .ok()
1430 .and_then(|v| v.trim().parse::<u64>().ok())
1431 .unwrap_or(120_000)
1432 .clamp(30_000, 600_000)
1433}
1434
1435fn resolve_shared_resources_path() -> PathBuf {
1436 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1437 let trimmed = dir.trim();
1438 if !trimmed.is_empty() {
1439 return PathBuf::from(trimmed).join("shared_resources.json");
1440 }
1441 }
1442 PathBuf::from(".tandem").join("shared_resources.json")
1443}
1444
1445fn resolve_routines_path() -> PathBuf {
1446 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1447 let trimmed = dir.trim();
1448 if !trimmed.is_empty() {
1449 return PathBuf::from(trimmed).join("routines.json");
1450 }
1451 }
1452 PathBuf::from(".tandem").join("routines.json")
1453}
1454
1455fn resolve_routine_history_path() -> PathBuf {
1456 if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
1457 let trimmed = root.trim();
1458 if !trimmed.is_empty() {
1459 return PathBuf::from(trimmed).join("routine_history.json");
1460 }
1461 }
1462 PathBuf::from(".tandem").join("routine_history.json")
1463}
1464
1465fn resolve_routine_runs_path() -> PathBuf {
1466 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
1467 let trimmed = root.trim();
1468 if !trimmed.is_empty() {
1469 return PathBuf::from(trimmed).join("routine_runs.json");
1470 }
1471 }
1472 PathBuf::from(".tandem").join("routine_runs.json")
1473}
1474
1475fn resolve_agent_team_audit_path() -> PathBuf {
1476 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
1477 let trimmed = base.trim();
1478 if !trimmed.is_empty() {
1479 return PathBuf::from(trimmed)
1480 .join("agent-team")
1481 .join("audit.log.jsonl");
1482 }
1483 }
1484 PathBuf::from(".tandem")
1485 .join("agent-team")
1486 .join("audit.log.jsonl")
1487}
1488
1489fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
1490 match schedule {
1491 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
1492 RoutineSchedule::Cron { .. } => None,
1493 }
1494}
1495
1496fn compute_misfire_plan(
1497 now_ms: u64,
1498 next_fire_at_ms: u64,
1499 interval_ms: u64,
1500 policy: &RoutineMisfirePolicy,
1501) -> (u32, u64) {
1502 if now_ms < next_fire_at_ms || interval_ms == 0 {
1503 return (0, next_fire_at_ms);
1504 }
1505 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
1506 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
1507 match policy {
1508 RoutineMisfirePolicy::Skip => (0, aligned_next),
1509 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
1510 RoutineMisfirePolicy::CatchUp { max_runs } => {
1511 let count = missed.min(u64::from(*max_runs)) as u32;
1512 (count, aligned_next)
1513 }
1514 }
1515}
1516
1517#[derive(Debug, Clone, PartialEq, Eq)]
1518pub enum RoutineExecutionDecision {
1519 Allowed,
1520 RequiresApproval { reason: String },
1521 Blocked { reason: String },
1522}
1523
1524pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
1525 let entrypoint = routine.entrypoint.to_ascii_lowercase();
1526 if entrypoint.starts_with("connector.")
1527 || entrypoint.starts_with("integration.")
1528 || entrypoint.contains("external")
1529 {
1530 return true;
1531 }
1532 routine
1533 .args
1534 .get("uses_external_integrations")
1535 .and_then(|v| v.as_bool())
1536 .unwrap_or(false)
1537 || routine
1538 .args
1539 .get("connector_id")
1540 .and_then(|v| v.as_str())
1541 .is_some()
1542}
1543
1544pub fn evaluate_routine_execution_policy(
1545 routine: &RoutineSpec,
1546 trigger_type: &str,
1547) -> RoutineExecutionDecision {
1548 if !routine_uses_external_integrations(routine) {
1549 return RoutineExecutionDecision::Allowed;
1550 }
1551 if !routine.external_integrations_allowed {
1552 return RoutineExecutionDecision::Blocked {
1553 reason: "external integrations are disabled by policy".to_string(),
1554 };
1555 }
1556 if routine.requires_approval {
1557 return RoutineExecutionDecision::RequiresApproval {
1558 reason: format!(
1559 "manual approval required before external side effects ({})",
1560 trigger_type
1561 ),
1562 };
1563 }
1564 RoutineExecutionDecision::Allowed
1565}
1566
1567fn is_valid_resource_key(key: &str) -> bool {
1568 let trimmed = key.trim();
1569 if trimmed.is_empty() {
1570 return false;
1571 }
1572 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
1573 if !allowed_prefix
1574 .iter()
1575 .any(|prefix| trimmed.starts_with(prefix))
1576 {
1577 return false;
1578 }
1579 !trimmed.contains("//")
1580}
1581
1582impl Deref for AppState {
1583 type Target = RuntimeState;
1584
1585 fn deref(&self) -> &Self::Target {
1586 self.runtime
1587 .get()
1588 .expect("runtime accessed before startup completion")
1589 }
1590}
1591
1592fn extract_event_session_id(properties: &Value) -> Option<String> {
1593 properties
1594 .get("sessionID")
1595 .or_else(|| properties.get("sessionId"))
1596 .or_else(|| properties.get("id"))
1597 .and_then(|v| v.as_str())
1598 .map(|s| s.to_string())
1599}
1600
1601fn extract_event_run_id(properties: &Value) -> Option<String> {
1602 properties
1603 .get("runID")
1604 .or_else(|| properties.get("run_id"))
1605 .and_then(|v| v.as_str())
1606 .map(|s| s.to_string())
1607}
1608
1609fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1610 let session_id = extract_event_session_id(&event.properties)?;
1611 let run_id = extract_event_run_id(&event.properties);
1612 let key = format!("run/{session_id}/status");
1613
1614 let mut base = serde_json::Map::new();
1615 base.insert("sessionID".to_string(), Value::String(session_id));
1616 if let Some(run_id) = run_id {
1617 base.insert("runID".to_string(), Value::String(run_id));
1618 }
1619
1620 match event.event_type.as_str() {
1621 "session.run.started" => {
1622 base.insert("state".to_string(), Value::String("running".to_string()));
1623 base.insert("phase".to_string(), Value::String("run".to_string()));
1624 base.insert(
1625 "eventType".to_string(),
1626 Value::String("session.run.started".to_string()),
1627 );
1628 Some(StatusIndexUpdate {
1629 key,
1630 value: Value::Object(base),
1631 })
1632 }
1633 "session.run.finished" => {
1634 base.insert("state".to_string(), Value::String("finished".to_string()));
1635 base.insert("phase".to_string(), Value::String("run".to_string()));
1636 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1637 base.insert("result".to_string(), Value::String(status.to_string()));
1638 }
1639 base.insert(
1640 "eventType".to_string(),
1641 Value::String("session.run.finished".to_string()),
1642 );
1643 Some(StatusIndexUpdate {
1644 key,
1645 value: Value::Object(base),
1646 })
1647 }
1648 "message.part.updated" => {
1649 let part_type = event
1650 .properties
1651 .get("part")
1652 .and_then(|v| v.get("type"))
1653 .and_then(|v| v.as_str())?;
1654 let (phase, tool_active) = match part_type {
1655 "tool-invocation" => ("tool", true),
1656 "tool-result" => ("run", false),
1657 _ => return None,
1658 };
1659 base.insert("state".to_string(), Value::String("running".to_string()));
1660 base.insert("phase".to_string(), Value::String(phase.to_string()));
1661 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1662 if let Some(tool_name) = event
1663 .properties
1664 .get("part")
1665 .and_then(|v| v.get("tool"))
1666 .and_then(|v| v.as_str())
1667 {
1668 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1669 }
1670 base.insert(
1671 "eventType".to_string(),
1672 Value::String("message.part.updated".to_string()),
1673 );
1674 Some(StatusIndexUpdate {
1675 key,
1676 value: Value::Object(base),
1677 })
1678 }
1679 _ => None,
1680 }
1681}
1682
1683pub async fn run_status_indexer(state: AppState) {
1684 let mut rx = state.event_bus.subscribe();
1685 loop {
1686 match rx.recv().await {
1687 Ok(event) => {
1688 if let Some(update) = derive_status_index_update(&event) {
1689 if let Err(error) = state
1690 .put_shared_resource(
1691 update.key,
1692 update.value,
1693 None,
1694 "system.status_indexer".to_string(),
1695 None,
1696 )
1697 .await
1698 {
1699 tracing::warn!("status indexer failed to persist update: {error:?}");
1700 }
1701 }
1702 }
1703 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1704 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1705 }
1706 }
1707}
1708
1709pub async fn run_agent_team_supervisor(state: AppState) {
1710 let mut rx = state.event_bus.subscribe();
1711 loop {
1712 match rx.recv().await {
1713 Ok(event) => {
1714 state.agent_teams.handle_engine_event(&state, &event).await;
1715 }
1716 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1717 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1718 }
1719 }
1720}
1721
1722pub async fn run_routine_scheduler(state: AppState) {
1723 loop {
1724 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1725 let now = now_ms();
1726 let plans = state.evaluate_routine_misfires(now).await;
1727 for plan in plans {
1728 let Some(routine) = state.get_routine(&plan.routine_id).await else {
1729 continue;
1730 };
1731 match evaluate_routine_execution_policy(&routine, "scheduled") {
1732 RoutineExecutionDecision::Allowed => {
1733 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
1734 let run = state
1735 .create_routine_run(
1736 &routine,
1737 "scheduled",
1738 plan.run_count,
1739 RoutineRunStatus::Queued,
1740 None,
1741 )
1742 .await;
1743 state
1744 .append_routine_history(RoutineHistoryEvent {
1745 routine_id: plan.routine_id.clone(),
1746 trigger_type: "scheduled".to_string(),
1747 run_count: plan.run_count,
1748 fired_at_ms: now,
1749 status: "queued".to_string(),
1750 detail: None,
1751 })
1752 .await;
1753 state.event_bus.publish(EngineEvent::new(
1754 "routine.fired",
1755 serde_json::json!({
1756 "routineID": plan.routine_id,
1757 "runID": run.run_id,
1758 "runCount": plan.run_count,
1759 "scheduledAtMs": plan.scheduled_at_ms,
1760 "nextFireAtMs": plan.next_fire_at_ms,
1761 }),
1762 ));
1763 state.event_bus.publish(EngineEvent::new(
1764 "routine.run.created",
1765 serde_json::json!({
1766 "run": run,
1767 }),
1768 ));
1769 }
1770 RoutineExecutionDecision::RequiresApproval { reason } => {
1771 let run = state
1772 .create_routine_run(
1773 &routine,
1774 "scheduled",
1775 plan.run_count,
1776 RoutineRunStatus::PendingApproval,
1777 Some(reason.clone()),
1778 )
1779 .await;
1780 state
1781 .append_routine_history(RoutineHistoryEvent {
1782 routine_id: plan.routine_id.clone(),
1783 trigger_type: "scheduled".to_string(),
1784 run_count: plan.run_count,
1785 fired_at_ms: now,
1786 status: "pending_approval".to_string(),
1787 detail: Some(reason.clone()),
1788 })
1789 .await;
1790 state.event_bus.publish(EngineEvent::new(
1791 "routine.approval_required",
1792 serde_json::json!({
1793 "routineID": plan.routine_id,
1794 "runID": run.run_id,
1795 "runCount": plan.run_count,
1796 "triggerType": "scheduled",
1797 "reason": reason,
1798 }),
1799 ));
1800 state.event_bus.publish(EngineEvent::new(
1801 "routine.run.created",
1802 serde_json::json!({
1803 "run": run,
1804 }),
1805 ));
1806 }
1807 RoutineExecutionDecision::Blocked { reason } => {
1808 let run = state
1809 .create_routine_run(
1810 &routine,
1811 "scheduled",
1812 plan.run_count,
1813 RoutineRunStatus::BlockedPolicy,
1814 Some(reason.clone()),
1815 )
1816 .await;
1817 state
1818 .append_routine_history(RoutineHistoryEvent {
1819 routine_id: plan.routine_id.clone(),
1820 trigger_type: "scheduled".to_string(),
1821 run_count: plan.run_count,
1822 fired_at_ms: now,
1823 status: "blocked_policy".to_string(),
1824 detail: Some(reason.clone()),
1825 })
1826 .await;
1827 state.event_bus.publish(EngineEvent::new(
1828 "routine.blocked",
1829 serde_json::json!({
1830 "routineID": plan.routine_id,
1831 "runID": run.run_id,
1832 "runCount": plan.run_count,
1833 "triggerType": "scheduled",
1834 "reason": reason,
1835 }),
1836 ));
1837 state.event_bus.publish(EngineEvent::new(
1838 "routine.run.created",
1839 serde_json::json!({
1840 "run": run,
1841 }),
1842 ));
1843 }
1844 }
1845 }
1846 }
1847}
1848
1849pub async fn run_routine_executor(state: AppState) {
1850 loop {
1851 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1852 let Some(run) = state.claim_next_queued_routine_run().await else {
1853 continue;
1854 };
1855
1856 state.event_bus.publish(EngineEvent::new(
1857 "routine.run.started",
1858 serde_json::json!({
1859 "runID": run.run_id,
1860 "routineID": run.routine_id,
1861 "triggerType": run.trigger_type,
1862 "startedAtMs": now_ms(),
1863 }),
1864 ));
1865
1866 let workspace_root = state.workspace_index.snapshot().await.root;
1867 let mut session = Session::new(
1868 Some(format!("Routine {}", run.routine_id)),
1869 Some(workspace_root.clone()),
1870 );
1871 let session_id = session.id.clone();
1872 session.workspace_root = Some(workspace_root);
1873
1874 if let Err(error) = state.storage.save_session(session).await {
1875 let detail = format!("failed to create routine session: {error}");
1876 let _ = state
1877 .update_routine_run_status(
1878 &run.run_id,
1879 RoutineRunStatus::Failed,
1880 Some(detail.clone()),
1881 )
1882 .await;
1883 state.event_bus.publish(EngineEvent::new(
1884 "routine.run.failed",
1885 serde_json::json!({
1886 "runID": run.run_id,
1887 "routineID": run.routine_id,
1888 "reason": detail,
1889 }),
1890 ));
1891 continue;
1892 }
1893
1894 state
1895 .set_routine_session_policy(
1896 session_id.clone(),
1897 run.run_id.clone(),
1898 run.routine_id.clone(),
1899 run.allowed_tools.clone(),
1900 )
1901 .await;
1902 state
1903 .engine_loop
1904 .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
1905 .await;
1906
1907 let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
1908 if let Some(spec) = selected_model.as_ref() {
1909 state.event_bus.publish(EngineEvent::new(
1910 "routine.run.model_selected",
1911 serde_json::json!({
1912 "runID": run.run_id,
1913 "routineID": run.routine_id,
1914 "providerID": spec.provider_id,
1915 "modelID": spec.model_id,
1916 "source": model_source,
1917 }),
1918 ));
1919 }
1920
1921 let request = SendMessageRequest {
1922 parts: vec![MessagePartInput::Text {
1923 text: build_routine_prompt(&state, &run).await,
1924 }],
1925 model: selected_model,
1926 agent: None,
1927 };
1928
1929 let run_result = state
1930 .engine_loop
1931 .run_prompt_async_with_context(
1932 session_id.clone(),
1933 request,
1934 Some(format!("routine:{}", run.run_id)),
1935 )
1936 .await;
1937
1938 state.clear_routine_session_policy(&session_id).await;
1939 state
1940 .engine_loop
1941 .clear_session_allowed_tools(&session_id)
1942 .await;
1943
1944 match run_result {
1945 Ok(()) => {
1946 append_configured_output_artifacts(&state, &run).await;
1947 let _ = state
1948 .update_routine_run_status(
1949 &run.run_id,
1950 RoutineRunStatus::Completed,
1951 Some("routine run completed".to_string()),
1952 )
1953 .await;
1954 state.event_bus.publish(EngineEvent::new(
1955 "routine.run.completed",
1956 serde_json::json!({
1957 "runID": run.run_id,
1958 "routineID": run.routine_id,
1959 "sessionID": session_id,
1960 "finishedAtMs": now_ms(),
1961 }),
1962 ));
1963 }
1964 Err(error) => {
1965 let detail = truncate_text(&error.to_string(), 500);
1966 let _ = state
1967 .update_routine_run_status(
1968 &run.run_id,
1969 RoutineRunStatus::Failed,
1970 Some(detail.clone()),
1971 )
1972 .await;
1973 state.event_bus.publish(EngineEvent::new(
1974 "routine.run.failed",
1975 serde_json::json!({
1976 "runID": run.run_id,
1977 "routineID": run.routine_id,
1978 "sessionID": session_id,
1979 "reason": detail,
1980 "finishedAtMs": now_ms(),
1981 }),
1982 ));
1983 }
1984 }
1985 }
1986}
1987
1988async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
1989 let normalized_entrypoint = run.entrypoint.trim();
1990 let known_tool = state
1991 .tools
1992 .list()
1993 .await
1994 .into_iter()
1995 .any(|schema| schema.name == normalized_entrypoint);
1996 if known_tool {
1997 let args = if run.args.is_object() {
1998 run.args.clone()
1999 } else {
2000 serde_json::json!({})
2001 };
2002 return format!("/tool {} {}", normalized_entrypoint, args);
2003 }
2004
2005 if let Some(objective) = routine_objective_from_args(run) {
2006 return build_routine_mission_prompt(run, &objective);
2007 }
2008
2009 format!(
2010 "Execute routine '{}' using entrypoint '{}' with args: {}",
2011 run.routine_id, run.entrypoint, run.args
2012 )
2013}
2014
2015fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
2016 run.args
2017 .get("prompt")
2018 .and_then(|v| v.as_str())
2019 .map(str::trim)
2020 .filter(|v| !v.is_empty())
2021 .map(ToString::to_string)
2022}
2023
2024fn routine_mode_from_args(args: &Value) -> &str {
2025 args.get("mode")
2026 .and_then(|v| v.as_str())
2027 .map(str::trim)
2028 .filter(|v| !v.is_empty())
2029 .unwrap_or("standalone")
2030}
2031
2032fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
2033 args.get("success_criteria")
2034 .and_then(|v| v.as_array())
2035 .map(|rows| {
2036 rows.iter()
2037 .filter_map(|row| row.as_str())
2038 .map(str::trim)
2039 .filter(|row| !row.is_empty())
2040 .map(ToString::to_string)
2041 .collect::<Vec<_>>()
2042 })
2043 .unwrap_or_default()
2044}
2045
2046fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
2047 let mode = routine_mode_from_args(&run.args);
2048 let success_criteria = routine_success_criteria_from_args(&run.args);
2049 let orchestrator_only_tool_calls = run
2050 .args
2051 .get("orchestrator_only_tool_calls")
2052 .and_then(|v| v.as_bool())
2053 .unwrap_or(false);
2054
2055 let mut lines = vec![
2056 format!("Automation ID: {}", run.routine_id),
2057 format!("Run ID: {}", run.run_id),
2058 format!("Mode: {}", mode),
2059 format!("Mission Objective: {}", objective),
2060 ];
2061
2062 if !success_criteria.is_empty() {
2063 lines.push("Success Criteria:".to_string());
2064 for criterion in success_criteria {
2065 lines.push(format!("- {}", criterion));
2066 }
2067 }
2068
2069 if run.allowed_tools.is_empty() {
2070 lines.push("Allowed Tools: all available by current policy".to_string());
2071 } else {
2072 lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
2073 }
2074
2075 if run.output_targets.is_empty() {
2076 lines.push("Output Targets: none configured".to_string());
2077 } else {
2078 lines.push("Output Targets:".to_string());
2079 for target in &run.output_targets {
2080 lines.push(format!("- {}", target));
2081 }
2082 }
2083
2084 if mode.eq_ignore_ascii_case("orchestrated") {
2085 lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
2086 lines
2087 .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
2088 if orchestrator_only_tool_calls {
2089 lines.push(
2090 "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
2091 .to_string(),
2092 );
2093 }
2094 } else {
2095 lines.push("Execution Pattern: Standalone mission run".to_string());
2096 }
2097
2098 lines.push(
2099 "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
2100 .to_string(),
2101 );
2102
2103 lines.join("\n")
2104}
2105
2106fn truncate_text(input: &str, max_len: usize) -> String {
2107 if input.len() <= max_len {
2108 return input.to_string();
2109 }
2110 let mut out = input[..max_len].to_string();
2111 out.push_str("...<truncated>");
2112 out
2113}
2114
2115async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
2116 if run.output_targets.is_empty() {
2117 return;
2118 }
2119 for target in &run.output_targets {
2120 let artifact = RoutineRunArtifact {
2121 artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
2122 uri: target.clone(),
2123 kind: "output_target".to_string(),
2124 label: Some("configured output target".to_string()),
2125 created_at_ms: now_ms(),
2126 metadata: Some(serde_json::json!({
2127 "source": "routine.output_targets",
2128 "runID": run.run_id,
2129 "routineID": run.routine_id,
2130 })),
2131 };
2132 let _ = state
2133 .append_routine_run_artifact(&run.run_id, artifact.clone())
2134 .await;
2135 state.event_bus.publish(EngineEvent::new(
2136 "routine.run.artifact_added",
2137 serde_json::json!({
2138 "runID": run.run_id,
2139 "routineID": run.routine_id,
2140 "artifact": artifact,
2141 }),
2142 ));
2143 }
2144}
2145
2146fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
2147 let obj = value.as_object()?;
2148 let provider_id = obj.get("provider_id")?.as_str()?.trim();
2149 let model_id = obj.get("model_id")?.as_str()?.trim();
2150 if provider_id.is_empty() || model_id.is_empty() {
2151 return None;
2152 }
2153 Some(ModelSpec {
2154 provider_id: provider_id.to_string(),
2155 model_id: model_id.to_string(),
2156 })
2157}
2158
2159fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
2160 args.get("model_policy")
2161 .and_then(|v| v.get("role_models"))
2162 .and_then(|v| v.get(role))
2163 .and_then(parse_model_spec)
2164}
2165
2166fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
2167 args.get("model_policy")
2168 .and_then(|v| v.get("default_model"))
2169 .and_then(parse_model_spec)
2170}
2171
2172fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
2173 providers.iter().any(|provider| {
2174 provider.id == spec.provider_id
2175 && provider
2176 .models
2177 .iter()
2178 .any(|model| model.id == spec.model_id)
2179 })
2180}
2181
2182async fn resolve_routine_model_spec_for_run(
2183 state: &AppState,
2184 run: &RoutineRunRecord,
2185) -> (Option<ModelSpec>, String) {
2186 let providers = state.providers.list().await;
2187 let mode = routine_mode_from_args(&run.args);
2188 let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
2189
2190 if mode.eq_ignore_ascii_case("orchestrated") {
2191 if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
2192 requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
2193 }
2194 }
2195 if let Some(default_model) = default_model_spec_from_args(&run.args) {
2196 requested.push((default_model, "args.model_policy.default_model"));
2197 }
2198
2199 for (candidate, source) in requested {
2200 if provider_catalog_has_model(&providers, &candidate) {
2201 return (Some(candidate), source.to_string());
2202 }
2203 }
2204
2205 let fallback = providers
2206 .into_iter()
2207 .find(|provider| !provider.models.is_empty())
2208 .and_then(|provider| {
2209 let model = provider.models.first()?;
2210 Some(ModelSpec {
2211 provider_id: provider.id,
2212 model_id: model.id.clone(),
2213 })
2214 });
2215
2216 (fallback, "provider_catalog_fallback".to_string())
2217}
2218
2219#[cfg(test)]
2220mod tests {
2221 use super::*;
2222
2223 fn test_state_with_path(path: PathBuf) -> AppState {
2224 let mut state = AppState::new_starting("test-attempt".to_string(), true);
2225 state.shared_resources_path = path;
2226 state.routines_path = tmp_routines_file("shared-state");
2227 state.routine_history_path = tmp_routines_file("routine-history");
2228 state.routine_runs_path = tmp_routines_file("routine-runs");
2229 state
2230 }
2231
2232 fn tmp_resource_file(name: &str) -> PathBuf {
2233 std::env::temp_dir().join(format!(
2234 "tandem-server-{name}-{}.json",
2235 uuid::Uuid::new_v4()
2236 ))
2237 }
2238
2239 fn tmp_routines_file(name: &str) -> PathBuf {
2240 std::env::temp_dir().join(format!(
2241 "tandem-server-routines-{name}-{}.json",
2242 uuid::Uuid::new_v4()
2243 ))
2244 }
2245
2246 #[tokio::test]
2247 async fn shared_resource_put_increments_revision() {
2248 let path = tmp_resource_file("shared-resource-put");
2249 let state = test_state_with_path(path.clone());
2250
2251 let first = state
2252 .put_shared_resource(
2253 "project/demo/board".to_string(),
2254 serde_json::json!({"status":"todo"}),
2255 None,
2256 "agent-1".to_string(),
2257 None,
2258 )
2259 .await
2260 .expect("first put");
2261 assert_eq!(first.rev, 1);
2262
2263 let second = state
2264 .put_shared_resource(
2265 "project/demo/board".to_string(),
2266 serde_json::json!({"status":"doing"}),
2267 Some(1),
2268 "agent-2".to_string(),
2269 Some(60_000),
2270 )
2271 .await
2272 .expect("second put");
2273 assert_eq!(second.rev, 2);
2274 assert_eq!(second.updated_by, "agent-2");
2275 assert_eq!(second.ttl_ms, Some(60_000));
2276
2277 let raw = tokio::fs::read_to_string(path.clone())
2278 .await
2279 .expect("persisted");
2280 assert!(raw.contains("\"rev\": 2"));
2281 let _ = tokio::fs::remove_file(path).await;
2282 }
2283
2284 #[tokio::test]
2285 async fn shared_resource_put_detects_revision_conflict() {
2286 let path = tmp_resource_file("shared-resource-conflict");
2287 let state = test_state_with_path(path.clone());
2288
2289 let _ = state
2290 .put_shared_resource(
2291 "mission/demo/card-1".to_string(),
2292 serde_json::json!({"title":"Card 1"}),
2293 None,
2294 "agent-1".to_string(),
2295 None,
2296 )
2297 .await
2298 .expect("seed put");
2299
2300 let conflict = state
2301 .put_shared_resource(
2302 "mission/demo/card-1".to_string(),
2303 serde_json::json!({"title":"Card 1 edited"}),
2304 Some(99),
2305 "agent-2".to_string(),
2306 None,
2307 )
2308 .await
2309 .expect_err("expected conflict");
2310
2311 match conflict {
2312 ResourceStoreError::RevisionConflict(conflict) => {
2313 assert_eq!(conflict.expected_rev, Some(99));
2314 assert_eq!(conflict.current_rev, Some(1));
2315 }
2316 other => panic!("unexpected error: {other:?}"),
2317 }
2318
2319 let _ = tokio::fs::remove_file(path).await;
2320 }
2321
2322 #[tokio::test]
2323 async fn shared_resource_rejects_invalid_namespace_key() {
2324 let path = tmp_resource_file("shared-resource-invalid-key");
2325 let state = test_state_with_path(path.clone());
2326
2327 let error = state
2328 .put_shared_resource(
2329 "global/demo/key".to_string(),
2330 serde_json::json!({"x":1}),
2331 None,
2332 "agent-1".to_string(),
2333 None,
2334 )
2335 .await
2336 .expect_err("invalid key should fail");
2337
2338 match error {
2339 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
2340 other => panic!("unexpected error: {other:?}"),
2341 }
2342
2343 assert!(!path.exists());
2344 }
2345
2346 #[test]
2347 fn derive_status_index_update_for_run_started() {
2348 let event = EngineEvent::new(
2349 "session.run.started",
2350 serde_json::json!({
2351 "sessionID": "s-1",
2352 "runID": "r-1"
2353 }),
2354 );
2355 let update = derive_status_index_update(&event).expect("update");
2356 assert_eq!(update.key, "run/s-1/status");
2357 assert_eq!(
2358 update.value.get("state").and_then(|v| v.as_str()),
2359 Some("running")
2360 );
2361 assert_eq!(
2362 update.value.get("phase").and_then(|v| v.as_str()),
2363 Some("run")
2364 );
2365 }
2366
2367 #[test]
2368 fn derive_status_index_update_for_tool_invocation() {
2369 let event = EngineEvent::new(
2370 "message.part.updated",
2371 serde_json::json!({
2372 "sessionID": "s-2",
2373 "runID": "r-2",
2374 "part": { "type": "tool-invocation", "tool": "todo_write" }
2375 }),
2376 );
2377 let update = derive_status_index_update(&event).expect("update");
2378 assert_eq!(update.key, "run/s-2/status");
2379 assert_eq!(
2380 update.value.get("phase").and_then(|v| v.as_str()),
2381 Some("tool")
2382 );
2383 assert_eq!(
2384 update.value.get("toolActive").and_then(|v| v.as_bool()),
2385 Some(true)
2386 );
2387 assert_eq!(
2388 update.value.get("tool").and_then(|v| v.as_str()),
2389 Some("todo_write")
2390 );
2391 }
2392
2393 #[test]
2394 fn misfire_skip_drops_runs_and_advances_next_fire() {
2395 let (count, next_fire) =
2396 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
2397 assert_eq!(count, 0);
2398 assert_eq!(next_fire, 11_000);
2399 }
2400
2401 #[test]
2402 fn misfire_run_once_emits_single_trigger() {
2403 let (count, next_fire) =
2404 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
2405 assert_eq!(count, 1);
2406 assert_eq!(next_fire, 11_000);
2407 }
2408
2409 #[test]
2410 fn misfire_catch_up_caps_trigger_count() {
2411 let (count, next_fire) = compute_misfire_plan(
2412 25_000,
2413 5_000,
2414 1_000,
2415 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2416 );
2417 assert_eq!(count, 3);
2418 assert_eq!(next_fire, 26_000);
2419 }
2420
2421 #[tokio::test]
2422 async fn routine_put_persists_and_loads() {
2423 let routines_path = tmp_routines_file("persist-load");
2424 let mut state = AppState::new_starting("routines-put".to_string(), true);
2425 state.routines_path = routines_path.clone();
2426
2427 let routine = RoutineSpec {
2428 routine_id: "routine-1".to_string(),
2429 name: "Digest".to_string(),
2430 status: RoutineStatus::Active,
2431 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2432 timezone: "UTC".to_string(),
2433 misfire_policy: RoutineMisfirePolicy::RunOnce,
2434 entrypoint: "mission.default".to_string(),
2435 args: serde_json::json!({"topic":"status"}),
2436 allowed_tools: vec![],
2437 output_targets: vec![],
2438 creator_type: "user".to_string(),
2439 creator_id: "user-1".to_string(),
2440 requires_approval: true,
2441 external_integrations_allowed: false,
2442 next_fire_at_ms: Some(5_000),
2443 last_fired_at_ms: None,
2444 };
2445
2446 state.put_routine(routine).await.expect("store routine");
2447
2448 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
2449 reloaded.routines_path = routines_path.clone();
2450 reloaded.load_routines().await.expect("load routines");
2451 let list = reloaded.list_routines().await;
2452 assert_eq!(list.len(), 1);
2453 assert_eq!(list[0].routine_id, "routine-1");
2454
2455 let _ = tokio::fs::remove_file(routines_path).await;
2456 }
2457
2458 #[tokio::test]
2459 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
2460 let routines_path = tmp_routines_file("misfire-eval");
2461 let mut state = AppState::new_starting("routines-eval".to_string(), true);
2462 state.routines_path = routines_path.clone();
2463
2464 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
2465 routine_id: id.to_string(),
2466 name: id.to_string(),
2467 status: RoutineStatus::Active,
2468 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
2469 timezone: "UTC".to_string(),
2470 misfire_policy: policy,
2471 entrypoint: "mission.default".to_string(),
2472 args: serde_json::json!({}),
2473 allowed_tools: vec![],
2474 output_targets: vec![],
2475 creator_type: "user".to_string(),
2476 creator_id: "u-1".to_string(),
2477 requires_approval: false,
2478 external_integrations_allowed: false,
2479 next_fire_at_ms: Some(5_000),
2480 last_fired_at_ms: None,
2481 };
2482
2483 state
2484 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
2485 .await
2486 .expect("put skip");
2487 state
2488 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
2489 .await
2490 .expect("put once");
2491 state
2492 .put_routine(base(
2493 "routine-catch",
2494 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2495 ))
2496 .await
2497 .expect("put catch");
2498
2499 let plans = state.evaluate_routine_misfires(10_500).await;
2500 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
2501 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
2502 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
2503
2504 assert!(plan_skip.is_none());
2505 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
2506 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
2507
2508 let stored = state.list_routines().await;
2509 let skip_next = stored
2510 .iter()
2511 .find(|r| r.routine_id == "routine-skip")
2512 .and_then(|r| r.next_fire_at_ms)
2513 .expect("skip next");
2514 assert!(skip_next > 10_500);
2515
2516 let _ = tokio::fs::remove_file(routines_path).await;
2517 }
2518
2519 #[test]
2520 fn routine_policy_blocks_external_side_effects_by_default() {
2521 let routine = RoutineSpec {
2522 routine_id: "routine-policy-1".to_string(),
2523 name: "Connector routine".to_string(),
2524 status: RoutineStatus::Active,
2525 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2526 timezone: "UTC".to_string(),
2527 misfire_policy: RoutineMisfirePolicy::RunOnce,
2528 entrypoint: "connector.email.reply".to_string(),
2529 args: serde_json::json!({}),
2530 allowed_tools: vec![],
2531 output_targets: vec![],
2532 creator_type: "user".to_string(),
2533 creator_id: "u-1".to_string(),
2534 requires_approval: true,
2535 external_integrations_allowed: false,
2536 next_fire_at_ms: None,
2537 last_fired_at_ms: None,
2538 };
2539
2540 let decision = evaluate_routine_execution_policy(&routine, "manual");
2541 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
2542 }
2543
2544 #[test]
2545 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
2546 let routine = RoutineSpec {
2547 routine_id: "routine-policy-2".to_string(),
2548 name: "Connector routine".to_string(),
2549 status: RoutineStatus::Active,
2550 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2551 timezone: "UTC".to_string(),
2552 misfire_policy: RoutineMisfirePolicy::RunOnce,
2553 entrypoint: "connector.email.reply".to_string(),
2554 args: serde_json::json!({}),
2555 allowed_tools: vec![],
2556 output_targets: vec![],
2557 creator_type: "user".to_string(),
2558 creator_id: "u-1".to_string(),
2559 requires_approval: true,
2560 external_integrations_allowed: true,
2561 next_fire_at_ms: None,
2562 last_fired_at_ms: None,
2563 };
2564
2565 let decision = evaluate_routine_execution_policy(&routine, "manual");
2566 assert!(matches!(
2567 decision,
2568 RoutineExecutionDecision::RequiresApproval { .. }
2569 ));
2570 }
2571
2572 #[test]
2573 fn routine_policy_allows_non_external_entrypoints() {
2574 let routine = RoutineSpec {
2575 routine_id: "routine-policy-3".to_string(),
2576 name: "Internal mission routine".to_string(),
2577 status: RoutineStatus::Active,
2578 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2579 timezone: "UTC".to_string(),
2580 misfire_policy: RoutineMisfirePolicy::RunOnce,
2581 entrypoint: "mission.default".to_string(),
2582 args: serde_json::json!({}),
2583 allowed_tools: vec![],
2584 output_targets: vec![],
2585 creator_type: "user".to_string(),
2586 creator_id: "u-1".to_string(),
2587 requires_approval: true,
2588 external_integrations_allowed: false,
2589 next_fire_at_ms: None,
2590 last_fired_at_ms: None,
2591 };
2592
2593 let decision = evaluate_routine_execution_policy(&routine, "manual");
2594 assert_eq!(decision, RoutineExecutionDecision::Allowed);
2595 }
2596
2597 #[tokio::test]
2598 async fn claim_next_queued_routine_run_marks_oldest_running() {
2599 let mut state = AppState::new_starting("routine-claim".to_string(), true);
2600 state.routine_runs_path = tmp_routines_file("routine-claim-runs");
2601
2602 let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
2603 run_id: run_id.to_string(),
2604 routine_id: "routine-claim".to_string(),
2605 trigger_type: "manual".to_string(),
2606 run_count: 1,
2607 status: RoutineRunStatus::Queued,
2608 created_at_ms,
2609 updated_at_ms: created_at_ms,
2610 fired_at_ms: Some(created_at_ms),
2611 started_at_ms: None,
2612 finished_at_ms: None,
2613 requires_approval: false,
2614 approval_reason: None,
2615 denial_reason: None,
2616 paused_reason: None,
2617 detail: None,
2618 entrypoint: "mission.default".to_string(),
2619 args: serde_json::json!({}),
2620 allowed_tools: vec![],
2621 output_targets: vec![],
2622 artifacts: vec![],
2623 };
2624
2625 {
2626 let mut guard = state.routine_runs.write().await;
2627 guard.insert("run-late".to_string(), mk("run-late", 2_000));
2628 guard.insert("run-early".to_string(), mk("run-early", 1_000));
2629 }
2630 state.persist_routine_runs().await.expect("persist");
2631
2632 let claimed = state
2633 .claim_next_queued_routine_run()
2634 .await
2635 .expect("claimed run");
2636 assert_eq!(claimed.run_id, "run-early");
2637 assert_eq!(claimed.status, RoutineRunStatus::Running);
2638 assert!(claimed.started_at_ms.is_some());
2639 }
2640
2641 #[tokio::test]
2642 async fn routine_session_policy_roundtrip_normalizes_tools() {
2643 let state = AppState::new_starting("routine-policy-hook".to_string(), true);
2644 state
2645 .set_routine_session_policy(
2646 "session-routine-1".to_string(),
2647 "run-1".to_string(),
2648 "routine-1".to_string(),
2649 vec![
2650 "read".to_string(),
2651 " mcp.arcade.search ".to_string(),
2652 "read".to_string(),
2653 "".to_string(),
2654 ],
2655 )
2656 .await;
2657
2658 let policy = state
2659 .routine_session_policy("session-routine-1")
2660 .await
2661 .expect("policy");
2662 assert_eq!(
2663 policy.allowed_tools,
2664 vec!["read".to_string(), "mcp.arcade.search".to_string()]
2665 );
2666 }
2667
2668 #[test]
2669 fn routine_mission_prompt_includes_orchestrated_contract() {
2670 let run = RoutineRunRecord {
2671 run_id: "run-orchestrated-1".to_string(),
2672 routine_id: "automation-orchestrated".to_string(),
2673 trigger_type: "manual".to_string(),
2674 run_count: 1,
2675 status: RoutineRunStatus::Queued,
2676 created_at_ms: 1_000,
2677 updated_at_ms: 1_000,
2678 fired_at_ms: Some(1_000),
2679 started_at_ms: None,
2680 finished_at_ms: None,
2681 requires_approval: true,
2682 approval_reason: None,
2683 denial_reason: None,
2684 paused_reason: None,
2685 detail: None,
2686 entrypoint: "mission.default".to_string(),
2687 args: serde_json::json!({
2688 "prompt": "Coordinate a multi-step release readiness check.",
2689 "mode": "orchestrated",
2690 "success_criteria": ["All blockers listed", "Output artifact written"],
2691 "orchestrator_only_tool_calls": true
2692 }),
2693 allowed_tools: vec!["read".to_string(), "webfetch_document".to_string()],
2694 output_targets: vec!["file://reports/release-readiness.md".to_string()],
2695 artifacts: vec![],
2696 };
2697
2698 let objective = routine_objective_from_args(&run).expect("objective");
2699 let prompt = build_routine_mission_prompt(&run, &objective);
2700
2701 assert!(prompt.contains("Mode: orchestrated"));
2702 assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
2703 assert!(prompt.contains("only the orchestrator may execute tools"));
2704 assert!(prompt.contains("Allowed Tools: read, webfetch_document"));
2705 assert!(prompt.contains("file://reports/release-readiness.md"));
2706 }
2707
2708 #[test]
2709 fn routine_mission_prompt_includes_standalone_defaults() {
2710 let run = RoutineRunRecord {
2711 run_id: "run-standalone-1".to_string(),
2712 routine_id: "automation-standalone".to_string(),
2713 trigger_type: "manual".to_string(),
2714 run_count: 1,
2715 status: RoutineRunStatus::Queued,
2716 created_at_ms: 2_000,
2717 updated_at_ms: 2_000,
2718 fired_at_ms: Some(2_000),
2719 started_at_ms: None,
2720 finished_at_ms: None,
2721 requires_approval: false,
2722 approval_reason: None,
2723 denial_reason: None,
2724 paused_reason: None,
2725 detail: None,
2726 entrypoint: "mission.default".to_string(),
2727 args: serde_json::json!({
2728 "prompt": "Summarize top engineering updates.",
2729 "success_criteria": ["Three bullet summary"]
2730 }),
2731 allowed_tools: vec![],
2732 output_targets: vec![],
2733 artifacts: vec![],
2734 };
2735
2736 let objective = routine_objective_from_args(&run).expect("objective");
2737 let prompt = build_routine_mission_prompt(&run, &objective);
2738
2739 assert!(prompt.contains("Mode: standalone"));
2740 assert!(prompt.contains("Execution Pattern: Standalone mission run"));
2741 assert!(prompt.contains("Allowed Tools: all available by current policy"));
2742 assert!(prompt.contains("Output Targets: none configured"));
2743 }
2744}