1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use futures::future::BoxFuture;
10use serde::{Deserialize, Serialize};
11use serde_json::{json, Value};
12use sha2::{Digest, Sha256};
13use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
14use tandem_orchestrator::MissionState;
15use tandem_types::{
16 EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
17 SendMessageRequest, Session, ShellFamily,
18};
19use tokio::fs;
20use tokio::sync::RwLock;
21
22use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
23use tandem_core::{
24 resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
25 PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
26};
27use tandem_memory::db::MemoryDatabase;
28use tandem_providers::ChatMessage;
29use tandem_providers::ProviderRegistry;
30use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
31use tandem_tools::ToolRegistry;
32
33mod agent_teams;
34mod http;
35pub mod webui;
36
37pub use agent_teams::AgentTeamRuntime;
38pub use http::serve;
39
40#[derive(Debug, Clone, Serialize, Deserialize, Default)]
41pub struct ChannelStatus {
42 pub enabled: bool,
43 pub connected: bool,
44 pub last_error: Option<String>,
45 pub active_sessions: u64,
46 pub meta: Value,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, Default)]
50pub struct WebUiConfig {
51 #[serde(default)]
52 pub enabled: bool,
53 #[serde(default = "default_web_ui_prefix")]
54 pub path_prefix: String,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, Default)]
58pub struct ChannelsConfigFile {
59 pub telegram: Option<TelegramConfigFile>,
60 pub discord: Option<DiscordConfigFile>,
61 pub slack: Option<SlackConfigFile>,
62 #[serde(default)]
63 pub tool_policy: tandem_channels::config::ChannelToolPolicy,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct TelegramConfigFile {
68 pub bot_token: String,
69 #[serde(default = "default_allow_all")]
70 pub allowed_users: Vec<String>,
71 #[serde(default)]
72 pub mention_only: bool,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct DiscordConfigFile {
77 pub bot_token: String,
78 #[serde(default)]
79 pub guild_id: Option<String>,
80 #[serde(default = "default_allow_all")]
81 pub allowed_users: Vec<String>,
82 #[serde(default = "default_discord_mention_only")]
83 pub mention_only: bool,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SlackConfigFile {
88 pub bot_token: String,
89 pub channel_id: String,
90 #[serde(default = "default_allow_all")]
91 pub allowed_users: Vec<String>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, Default)]
95struct EffectiveAppConfig {
96 #[serde(default)]
97 pub channels: ChannelsConfigFile,
98 #[serde(default)]
99 pub web_ui: WebUiConfig,
100 #[serde(default)]
101 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
102}
103
104#[derive(Default)]
105pub struct ChannelRuntime {
106 pub listeners: Option<tokio::task::JoinSet<()>>,
107 pub statuses: std::collections::HashMap<String, ChannelStatus>,
108}
109
110#[derive(Debug, Clone)]
111pub struct EngineLease {
112 pub lease_id: String,
113 pub client_id: String,
114 pub client_type: String,
115 pub acquired_at_ms: u64,
116 pub last_renewed_at_ms: u64,
117 pub ttl_ms: u64,
118}
119
120impl EngineLease {
121 pub fn is_expired(&self, now_ms: u64) -> bool {
122 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
123 }
124}
125
126#[derive(Debug, Clone, Serialize)]
127pub struct ActiveRun {
128 #[serde(rename = "runID")]
129 pub run_id: String,
130 #[serde(rename = "startedAtMs")]
131 pub started_at_ms: u64,
132 #[serde(rename = "lastActivityAtMs")]
133 pub last_activity_at_ms: u64,
134 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
135 pub client_id: Option<String>,
136 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
137 pub agent_id: Option<String>,
138 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
139 pub agent_profile: Option<String>,
140}
141
142#[derive(Clone, Default)]
143pub struct RunRegistry {
144 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
145}
146
147impl RunRegistry {
148 pub fn new() -> Self {
149 Self::default()
150 }
151
152 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
153 self.active.read().await.get(session_id).cloned()
154 }
155
156 pub async fn acquire(
157 &self,
158 session_id: &str,
159 run_id: String,
160 client_id: Option<String>,
161 agent_id: Option<String>,
162 agent_profile: Option<String>,
163 ) -> std::result::Result<ActiveRun, ActiveRun> {
164 let mut guard = self.active.write().await;
165 if let Some(existing) = guard.get(session_id).cloned() {
166 return Err(existing);
167 }
168 let now = now_ms();
169 let run = ActiveRun {
170 run_id,
171 started_at_ms: now,
172 last_activity_at_ms: now,
173 client_id,
174 agent_id,
175 agent_profile,
176 };
177 guard.insert(session_id.to_string(), run.clone());
178 Ok(run)
179 }
180
181 pub async fn touch(&self, session_id: &str, run_id: &str) {
182 let mut guard = self.active.write().await;
183 if let Some(run) = guard.get_mut(session_id) {
184 if run.run_id == run_id {
185 run.last_activity_at_ms = now_ms();
186 }
187 }
188 }
189
190 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
191 let mut guard = self.active.write().await;
192 if let Some(run) = guard.get(session_id) {
193 if run.run_id == run_id {
194 return guard.remove(session_id);
195 }
196 }
197 None
198 }
199
200 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
201 self.active.write().await.remove(session_id)
202 }
203
204 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
205 let now = now_ms();
206 let mut guard = self.active.write().await;
207 let stale_ids = guard
208 .iter()
209 .filter_map(|(session_id, run)| {
210 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
211 Some(session_id.clone())
212 } else {
213 None
214 }
215 })
216 .collect::<Vec<_>>();
217 let mut out = Vec::with_capacity(stale_ids.len());
218 for session_id in stale_ids {
219 if let Some(run) = guard.remove(&session_id) {
220 out.push((session_id, run));
221 }
222 }
223 out
224 }
225}
226
227pub fn now_ms() -> u64 {
228 SystemTime::now()
229 .duration_since(UNIX_EPOCH)
230 .map(|d| d.as_millis() as u64)
231 .unwrap_or(0)
232}
233
234pub fn build_id() -> String {
235 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
236 let trimmed = explicit.trim();
237 if !trimmed.is_empty() {
238 return trimmed.to_string();
239 }
240 }
241 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
242 let trimmed = git_sha.trim();
243 if !trimmed.is_empty() {
244 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
245 }
246 }
247 env!("CARGO_PKG_VERSION").to_string()
248}
249
250pub fn detect_host_runtime_context() -> HostRuntimeContext {
251 let os = if cfg!(target_os = "windows") {
252 HostOs::Windows
253 } else if cfg!(target_os = "macos") {
254 HostOs::Macos
255 } else {
256 HostOs::Linux
257 };
258 let (shell_family, path_style) = match os {
259 HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
260 HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
261 };
262 HostRuntimeContext {
263 os,
264 arch: std::env::consts::ARCH.to_string(),
265 shell_family,
266 path_style,
267 }
268}
269
270pub fn binary_path_for_health() -> Option<String> {
271 #[cfg(debug_assertions)]
272 {
273 std::env::current_exe()
274 .ok()
275 .map(|p| p.to_string_lossy().to_string())
276 }
277 #[cfg(not(debug_assertions))]
278 {
279 None
280 }
281}
282
283#[derive(Clone)]
284pub struct RuntimeState {
285 pub storage: Arc<Storage>,
286 pub config: ConfigStore,
287 pub event_bus: EventBus,
288 pub providers: ProviderRegistry,
289 pub plugins: PluginRegistry,
290 pub agents: AgentRegistry,
291 pub tools: ToolRegistry,
292 pub permissions: PermissionManager,
293 pub mcp: McpRegistry,
294 pub pty: PtyManager,
295 pub lsp: LspManager,
296 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
297 pub logs: Arc<RwLock<Vec<Value>>>,
298 pub workspace_index: WorkspaceIndex,
299 pub cancellations: CancellationRegistry,
300 pub engine_loop: EngineLoop,
301 pub host_runtime_context: HostRuntimeContext,
302}
303
304#[derive(Debug, Clone)]
305pub struct GovernedMemoryRecord {
306 pub id: String,
307 pub run_id: String,
308 pub partition: MemoryPartition,
309 pub kind: MemoryContentKind,
310 pub content: String,
311 pub artifact_refs: Vec<String>,
312 pub classification: MemoryClassification,
313 pub metadata: Option<Value>,
314 pub source_memory_id: Option<String>,
315 pub created_at_ms: u64,
316}
317
318#[derive(Debug, Clone, Serialize)]
319pub struct MemoryAuditEvent {
320 pub audit_id: String,
321 pub action: String,
322 pub run_id: String,
323 pub memory_id: Option<String>,
324 pub source_memory_id: Option<String>,
325 pub to_tier: Option<GovernedMemoryTier>,
326 pub partition_key: String,
327 pub actor: String,
328 pub status: String,
329 #[serde(skip_serializing_if = "Option::is_none")]
330 pub detail: Option<String>,
331 pub created_at_ms: u64,
332}
333
334#[derive(Debug, Clone, Serialize, Deserialize)]
335pub struct SharedResourceRecord {
336 pub key: String,
337 pub value: Value,
338 pub rev: u64,
339 pub updated_at_ms: u64,
340 pub updated_by: String,
341 #[serde(skip_serializing_if = "Option::is_none")]
342 pub ttl_ms: Option<u64>,
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
346#[serde(rename_all = "snake_case")]
347pub enum RoutineSchedule {
348 IntervalSeconds { seconds: u64 },
349 Cron { expression: String },
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
353#[serde(rename_all = "snake_case", tag = "type")]
354pub enum RoutineMisfirePolicy {
355 Skip,
356 RunOnce,
357 CatchUp { max_runs: u32 },
358}
359
360#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
361#[serde(rename_all = "snake_case")]
362pub enum RoutineStatus {
363 Active,
364 Paused,
365}
366
367#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct RoutineSpec {
369 pub routine_id: String,
370 pub name: String,
371 pub status: RoutineStatus,
372 pub schedule: RoutineSchedule,
373 pub timezone: String,
374 pub misfire_policy: RoutineMisfirePolicy,
375 pub entrypoint: String,
376 #[serde(default)]
377 pub args: Value,
378 #[serde(default)]
379 pub allowed_tools: Vec<String>,
380 #[serde(default)]
381 pub output_targets: Vec<String>,
382 pub creator_type: String,
383 pub creator_id: String,
384 pub requires_approval: bool,
385 pub external_integrations_allowed: bool,
386 #[serde(default, skip_serializing_if = "Option::is_none")]
387 pub next_fire_at_ms: Option<u64>,
388 #[serde(default, skip_serializing_if = "Option::is_none")]
389 pub last_fired_at_ms: Option<u64>,
390}
391
392#[derive(Debug, Clone, Serialize, Deserialize)]
393pub struct RoutineHistoryEvent {
394 pub routine_id: String,
395 pub trigger_type: String,
396 pub run_count: u32,
397 pub fired_at_ms: u64,
398 pub status: String,
399 #[serde(default, skip_serializing_if = "Option::is_none")]
400 pub detail: Option<String>,
401}
402
403#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
404#[serde(rename_all = "snake_case")]
405pub enum RoutineRunStatus {
406 Queued,
407 PendingApproval,
408 Running,
409 Paused,
410 BlockedPolicy,
411 Denied,
412 Completed,
413 Failed,
414 Cancelled,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct RoutineRunArtifact {
419 pub artifact_id: String,
420 pub uri: String,
421 pub kind: String,
422 #[serde(default, skip_serializing_if = "Option::is_none")]
423 pub label: Option<String>,
424 pub created_at_ms: u64,
425 #[serde(default, skip_serializing_if = "Option::is_none")]
426 pub metadata: Option<Value>,
427}
428
429#[derive(Debug, Clone, Serialize, Deserialize)]
430pub struct RoutineRunRecord {
431 pub run_id: String,
432 pub routine_id: String,
433 pub trigger_type: String,
434 pub run_count: u32,
435 pub status: RoutineRunStatus,
436 pub created_at_ms: u64,
437 pub updated_at_ms: u64,
438 #[serde(default, skip_serializing_if = "Option::is_none")]
439 pub fired_at_ms: Option<u64>,
440 #[serde(default, skip_serializing_if = "Option::is_none")]
441 pub started_at_ms: Option<u64>,
442 #[serde(default, skip_serializing_if = "Option::is_none")]
443 pub finished_at_ms: Option<u64>,
444 pub requires_approval: bool,
445 #[serde(default, skip_serializing_if = "Option::is_none")]
446 pub approval_reason: Option<String>,
447 #[serde(default, skip_serializing_if = "Option::is_none")]
448 pub denial_reason: Option<String>,
449 #[serde(default, skip_serializing_if = "Option::is_none")]
450 pub paused_reason: Option<String>,
451 #[serde(default, skip_serializing_if = "Option::is_none")]
452 pub detail: Option<String>,
453 pub entrypoint: String,
454 #[serde(default)]
455 pub args: Value,
456 #[serde(default)]
457 pub allowed_tools: Vec<String>,
458 #[serde(default)]
459 pub output_targets: Vec<String>,
460 #[serde(default)]
461 pub artifacts: Vec<RoutineRunArtifact>,
462}
463
464#[derive(Debug, Clone)]
465pub struct RoutineSessionPolicy {
466 pub session_id: String,
467 pub run_id: String,
468 pub routine_id: String,
469 pub allowed_tools: Vec<String>,
470}
471
472#[derive(Debug, Clone, Serialize)]
473pub struct RoutineTriggerPlan {
474 pub routine_id: String,
475 pub run_count: u32,
476 pub scheduled_at_ms: u64,
477 pub next_fire_at_ms: u64,
478}
479
480#[derive(Debug, Clone, Serialize)]
481pub struct ResourceConflict {
482 pub key: String,
483 pub expected_rev: Option<u64>,
484 pub current_rev: Option<u64>,
485}
486
487#[derive(Debug, Clone, Serialize)]
488#[serde(tag = "type", rename_all = "snake_case")]
489pub enum ResourceStoreError {
490 InvalidKey { key: String },
491 RevisionConflict(ResourceConflict),
492 PersistFailed { message: String },
493}
494
495#[derive(Debug, Clone, Serialize)]
496#[serde(tag = "type", rename_all = "snake_case")]
497pub enum RoutineStoreError {
498 InvalidRoutineId { routine_id: String },
499 InvalidSchedule { detail: String },
500 PersistFailed { message: String },
501}
502
503#[derive(Debug, Clone)]
504pub enum StartupStatus {
505 Starting,
506 Ready,
507 Failed,
508}
509
510#[derive(Debug, Clone)]
511pub struct StartupState {
512 pub status: StartupStatus,
513 pub phase: String,
514 pub started_at_ms: u64,
515 pub attempt_id: String,
516 pub last_error: Option<String>,
517}
518
519#[derive(Debug, Clone)]
520pub struct StartupSnapshot {
521 pub status: StartupStatus,
522 pub phase: String,
523 pub started_at_ms: u64,
524 pub attempt_id: String,
525 pub last_error: Option<String>,
526 pub elapsed_ms: u64,
527}
528
529#[derive(Clone)]
530pub struct AppState {
531 pub runtime: Arc<OnceLock<RuntimeState>>,
532 pub startup: Arc<RwLock<StartupState>>,
533 pub in_process_mode: Arc<AtomicBool>,
534 pub api_token: Arc<RwLock<Option<String>>>,
535 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
536 pub run_registry: RunRegistry,
537 pub run_stale_ms: u64,
538 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
539 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
540 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
541 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
542 pub shared_resources_path: PathBuf,
543 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
544 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
545 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
546 pub routine_session_policies:
547 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
548 pub routines_path: PathBuf,
549 pub routine_history_path: PathBuf,
550 pub routine_runs_path: PathBuf,
551 pub agent_teams: AgentTeamRuntime,
552 pub web_ui_enabled: Arc<AtomicBool>,
553 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
554 pub server_base_url: Arc<std::sync::RwLock<String>>,
555 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
556 pub host_runtime_context: HostRuntimeContext,
557}
558
559#[derive(Debug, Clone)]
560struct StatusIndexUpdate {
561 key: String,
562 value: Value,
563}
564
565impl AppState {
566 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
567 Self {
568 runtime: Arc::new(OnceLock::new()),
569 startup: Arc::new(RwLock::new(StartupState {
570 status: StartupStatus::Starting,
571 phase: "boot".to_string(),
572 started_at_ms: now_ms(),
573 attempt_id,
574 last_error: None,
575 })),
576 in_process_mode: Arc::new(AtomicBool::new(in_process)),
577 api_token: Arc::new(RwLock::new(None)),
578 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
579 run_registry: RunRegistry::new(),
580 run_stale_ms: resolve_run_stale_ms(),
581 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
582 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
583 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
584 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
585 shared_resources_path: resolve_shared_resources_path(),
586 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
587 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
588 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
589 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
590 routines_path: resolve_routines_path(),
591 routine_history_path: resolve_routine_history_path(),
592 routine_runs_path: resolve_routine_runs_path(),
593 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
594 web_ui_enabled: Arc::new(AtomicBool::new(false)),
595 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
596 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
597 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
598 host_runtime_context: detect_host_runtime_context(),
599 }
600 }
601
602 pub fn is_ready(&self) -> bool {
603 self.runtime.get().is_some()
604 }
605
606 pub fn mode_label(&self) -> &'static str {
607 if self.in_process_mode.load(Ordering::Relaxed) {
608 "in-process"
609 } else {
610 "sidecar"
611 }
612 }
613
614 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
615 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
616 if let Ok(mut guard) = self.web_ui_prefix.write() {
617 *guard = normalize_web_ui_prefix(&prefix);
618 }
619 }
620
621 pub fn web_ui_enabled(&self) -> bool {
622 self.web_ui_enabled.load(Ordering::Relaxed)
623 }
624
625 pub fn web_ui_prefix(&self) -> String {
626 self.web_ui_prefix
627 .read()
628 .map(|v| v.clone())
629 .unwrap_or_else(|_| "/admin".to_string())
630 }
631
632 pub fn set_server_base_url(&self, base_url: String) {
633 if let Ok(mut guard) = self.server_base_url.write() {
634 *guard = base_url;
635 }
636 }
637
638 pub fn server_base_url(&self) -> String {
639 self.server_base_url
640 .read()
641 .map(|v| v.clone())
642 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
643 }
644
645 pub async fn api_token(&self) -> Option<String> {
646 self.api_token.read().await.clone()
647 }
648
649 pub async fn set_api_token(&self, token: Option<String>) {
650 *self.api_token.write().await = token;
651 }
652
653 pub async fn startup_snapshot(&self) -> StartupSnapshot {
654 let state = self.startup.read().await.clone();
655 StartupSnapshot {
656 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
657 status: state.status,
658 phase: state.phase,
659 started_at_ms: state.started_at_ms,
660 attempt_id: state.attempt_id,
661 last_error: state.last_error,
662 }
663 }
664
665 pub fn host_runtime_context(&self) -> HostRuntimeContext {
666 self.runtime
667 .get()
668 .map(|runtime| runtime.host_runtime_context.clone())
669 .unwrap_or_else(|| self.host_runtime_context.clone())
670 }
671
672 pub async fn set_phase(&self, phase: impl Into<String>) {
673 let mut startup = self.startup.write().await;
674 startup.phase = phase.into();
675 }
676
677 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
678 self.runtime
679 .set(runtime)
680 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
681 self.engine_loop
682 .set_spawn_agent_hook(std::sync::Arc::new(
683 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
684 ))
685 .await;
686 self.engine_loop
687 .set_tool_policy_hook(std::sync::Arc::new(
688 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
689 ))
690 .await;
691 self.engine_loop
692 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
693 self.clone(),
694 )))
695 .await;
696 let _ = self.load_shared_resources().await;
697 let _ = self.load_routines().await;
698 let _ = self.load_routine_history().await;
699 let _ = self.load_routine_runs().await;
700 let workspace_root = self.workspace_index.snapshot().await.root;
701 let _ = self
702 .agent_teams
703 .ensure_loaded_for_workspace(&workspace_root)
704 .await;
705 let mut startup = self.startup.write().await;
706 startup.status = StartupStatus::Ready;
707 startup.phase = "ready".to_string();
708 startup.last_error = None;
709 Ok(())
710 }
711
712 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
713 let mut startup = self.startup.write().await;
714 startup.status = StartupStatus::Failed;
715 startup.phase = phase.into();
716 startup.last_error = Some(error.into());
717 }
718
719 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
720 let runtime = self.channels_runtime.lock().await;
721 runtime.statuses.clone()
722 }
723
724 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
725 let effective = self.config.get_effective_value().await;
726 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
727 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
728
729 let mut runtime = self.channels_runtime.lock().await;
730 if let Some(listeners) = runtime.listeners.as_mut() {
731 listeners.abort_all();
732 }
733 runtime.listeners = None;
734 runtime.statuses.clear();
735
736 let mut status_map = std::collections::HashMap::new();
737 status_map.insert(
738 "telegram".to_string(),
739 ChannelStatus {
740 enabled: parsed.channels.telegram.is_some(),
741 connected: false,
742 last_error: None,
743 active_sessions: 0,
744 meta: serde_json::json!({}),
745 },
746 );
747 status_map.insert(
748 "discord".to_string(),
749 ChannelStatus {
750 enabled: parsed.channels.discord.is_some(),
751 connected: false,
752 last_error: None,
753 active_sessions: 0,
754 meta: serde_json::json!({}),
755 },
756 );
757 status_map.insert(
758 "slack".to_string(),
759 ChannelStatus {
760 enabled: parsed.channels.slack.is_some(),
761 connected: false,
762 last_error: None,
763 active_sessions: 0,
764 meta: serde_json::json!({}),
765 },
766 );
767
768 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
769 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
770 runtime.listeners = Some(listeners);
771 for status in status_map.values_mut() {
772 if status.enabled {
773 status.connected = true;
774 }
775 }
776 }
777
778 runtime.statuses = status_map.clone();
779 drop(runtime);
780
781 self.event_bus.publish(EngineEvent::new(
782 "channel.status.changed",
783 serde_json::json!({ "channels": status_map }),
784 ));
785 Ok(())
786 }
787
788 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
789 if !self.shared_resources_path.exists() {
790 return Ok(());
791 }
792 let raw = fs::read_to_string(&self.shared_resources_path).await?;
793 let parsed =
794 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
795 .unwrap_or_default();
796 let mut guard = self.shared_resources.write().await;
797 *guard = parsed;
798 Ok(())
799 }
800
801 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
802 if let Some(parent) = self.shared_resources_path.parent() {
803 fs::create_dir_all(parent).await?;
804 }
805 let payload = {
806 let guard = self.shared_resources.read().await;
807 serde_json::to_string_pretty(&*guard)?
808 };
809 fs::write(&self.shared_resources_path, payload).await?;
810 Ok(())
811 }
812
813 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
814 self.shared_resources.read().await.get(key).cloned()
815 }
816
817 pub async fn list_shared_resources(
818 &self,
819 prefix: Option<&str>,
820 limit: usize,
821 ) -> Vec<SharedResourceRecord> {
822 let limit = limit.clamp(1, 500);
823 let mut rows = self
824 .shared_resources
825 .read()
826 .await
827 .values()
828 .filter(|record| {
829 if let Some(prefix) = prefix {
830 record.key.starts_with(prefix)
831 } else {
832 true
833 }
834 })
835 .cloned()
836 .collect::<Vec<_>>();
837 rows.sort_by(|a, b| a.key.cmp(&b.key));
838 rows.truncate(limit);
839 rows
840 }
841
842 pub async fn put_shared_resource(
843 &self,
844 key: String,
845 value: Value,
846 if_match_rev: Option<u64>,
847 updated_by: String,
848 ttl_ms: Option<u64>,
849 ) -> Result<SharedResourceRecord, ResourceStoreError> {
850 if !is_valid_resource_key(&key) {
851 return Err(ResourceStoreError::InvalidKey { key });
852 }
853
854 let now = now_ms();
855 let mut guard = self.shared_resources.write().await;
856 let existing = guard.get(&key).cloned();
857
858 if let Some(expected) = if_match_rev {
859 let current = existing.as_ref().map(|row| row.rev);
860 if current != Some(expected) {
861 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
862 key,
863 expected_rev: Some(expected),
864 current_rev: current,
865 }));
866 }
867 }
868
869 let next_rev = existing
870 .as_ref()
871 .map(|row| row.rev.saturating_add(1))
872 .unwrap_or(1);
873
874 let record = SharedResourceRecord {
875 key: key.clone(),
876 value,
877 rev: next_rev,
878 updated_at_ms: now,
879 updated_by,
880 ttl_ms,
881 };
882
883 let previous = guard.insert(key.clone(), record.clone());
884 drop(guard);
885
886 if let Err(error) = self.persist_shared_resources().await {
887 let mut rollback = self.shared_resources.write().await;
888 if let Some(previous) = previous {
889 rollback.insert(key, previous);
890 } else {
891 rollback.remove(&key);
892 }
893 return Err(ResourceStoreError::PersistFailed {
894 message: error.to_string(),
895 });
896 }
897
898 Ok(record)
899 }
900
901 pub async fn delete_shared_resource(
902 &self,
903 key: &str,
904 if_match_rev: Option<u64>,
905 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
906 if !is_valid_resource_key(key) {
907 return Err(ResourceStoreError::InvalidKey {
908 key: key.to_string(),
909 });
910 }
911
912 let mut guard = self.shared_resources.write().await;
913 let current = guard.get(key).cloned();
914 if let Some(expected) = if_match_rev {
915 let current_rev = current.as_ref().map(|row| row.rev);
916 if current_rev != Some(expected) {
917 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
918 key: key.to_string(),
919 expected_rev: Some(expected),
920 current_rev,
921 }));
922 }
923 }
924
925 let removed = guard.remove(key);
926 drop(guard);
927
928 if let Err(error) = self.persist_shared_resources().await {
929 if let Some(record) = removed.clone() {
930 self.shared_resources
931 .write()
932 .await
933 .insert(record.key.clone(), record);
934 }
935 return Err(ResourceStoreError::PersistFailed {
936 message: error.to_string(),
937 });
938 }
939
940 Ok(removed)
941 }
942
943 pub async fn load_routines(&self) -> anyhow::Result<()> {
944 if !self.routines_path.exists() {
945 return Ok(());
946 }
947 let raw = fs::read_to_string(&self.routines_path).await?;
948 let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
949 .unwrap_or_default();
950 let mut guard = self.routines.write().await;
951 *guard = parsed;
952 Ok(())
953 }
954
955 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
956 if !self.routine_history_path.exists() {
957 return Ok(());
958 }
959 let raw = fs::read_to_string(&self.routine_history_path).await?;
960 let parsed = serde_json::from_str::<
961 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
962 >(&raw)
963 .unwrap_or_default();
964 let mut guard = self.routine_history.write().await;
965 *guard = parsed;
966 Ok(())
967 }
968
969 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
970 if !self.routine_runs_path.exists() {
971 return Ok(());
972 }
973 let raw = fs::read_to_string(&self.routine_runs_path).await?;
974 let parsed =
975 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
976 .unwrap_or_default();
977 let mut guard = self.routine_runs.write().await;
978 *guard = parsed;
979 Ok(())
980 }
981
982 pub async fn persist_routines(&self) -> anyhow::Result<()> {
983 if let Some(parent) = self.routines_path.parent() {
984 fs::create_dir_all(parent).await?;
985 }
986 let payload = {
987 let guard = self.routines.read().await;
988 serde_json::to_string_pretty(&*guard)?
989 };
990 fs::write(&self.routines_path, payload).await?;
991 Ok(())
992 }
993
994 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
995 if let Some(parent) = self.routine_history_path.parent() {
996 fs::create_dir_all(parent).await?;
997 }
998 let payload = {
999 let guard = self.routine_history.read().await;
1000 serde_json::to_string_pretty(&*guard)?
1001 };
1002 fs::write(&self.routine_history_path, payload).await?;
1003 Ok(())
1004 }
1005
1006 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1007 if let Some(parent) = self.routine_runs_path.parent() {
1008 fs::create_dir_all(parent).await?;
1009 }
1010 let payload = {
1011 let guard = self.routine_runs.read().await;
1012 serde_json::to_string_pretty(&*guard)?
1013 };
1014 fs::write(&self.routine_runs_path, payload).await?;
1015 Ok(())
1016 }
1017
1018 pub async fn put_routine(
1019 &self,
1020 mut routine: RoutineSpec,
1021 ) -> Result<RoutineSpec, RoutineStoreError> {
1022 if routine.routine_id.trim().is_empty() {
1023 return Err(RoutineStoreError::InvalidRoutineId {
1024 routine_id: routine.routine_id,
1025 });
1026 }
1027
1028 routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1029 routine.output_targets = normalize_non_empty_list(routine.output_targets);
1030
1031 let interval = match routine.schedule {
1032 RoutineSchedule::IntervalSeconds { seconds } => {
1033 if seconds == 0 {
1034 return Err(RoutineStoreError::InvalidSchedule {
1035 detail: "interval_seconds must be > 0".to_string(),
1036 });
1037 }
1038 Some(seconds)
1039 }
1040 RoutineSchedule::Cron { .. } => None,
1041 };
1042 if routine.next_fire_at_ms.is_none() {
1043 routine.next_fire_at_ms = Some(now_ms().saturating_add(interval.unwrap_or(60) * 1000));
1044 }
1045
1046 let mut guard = self.routines.write().await;
1047 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1048 drop(guard);
1049
1050 if let Err(error) = self.persist_routines().await {
1051 let mut rollback = self.routines.write().await;
1052 if let Some(previous) = previous {
1053 rollback.insert(previous.routine_id.clone(), previous);
1054 } else {
1055 rollback.remove(&routine.routine_id);
1056 }
1057 return Err(RoutineStoreError::PersistFailed {
1058 message: error.to_string(),
1059 });
1060 }
1061
1062 Ok(routine)
1063 }
1064
1065 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1066 let mut rows = self
1067 .routines
1068 .read()
1069 .await
1070 .values()
1071 .cloned()
1072 .collect::<Vec<_>>();
1073 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1074 rows
1075 }
1076
1077 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1078 self.routines.read().await.get(routine_id).cloned()
1079 }
1080
1081 pub async fn delete_routine(
1082 &self,
1083 routine_id: &str,
1084 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1085 let mut guard = self.routines.write().await;
1086 let removed = guard.remove(routine_id);
1087 drop(guard);
1088
1089 if let Err(error) = self.persist_routines().await {
1090 if let Some(removed) = removed.clone() {
1091 self.routines
1092 .write()
1093 .await
1094 .insert(removed.routine_id.clone(), removed);
1095 }
1096 return Err(RoutineStoreError::PersistFailed {
1097 message: error.to_string(),
1098 });
1099 }
1100 Ok(removed)
1101 }
1102
1103 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1104 let mut plans = Vec::new();
1105 let mut guard = self.routines.write().await;
1106 for routine in guard.values_mut() {
1107 if routine.status != RoutineStatus::Active {
1108 continue;
1109 }
1110 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1111 continue;
1112 };
1113 let Some(interval_ms) = routine_interval_ms(&routine.schedule) else {
1114 continue;
1115 };
1116 if now_ms < next_fire_at_ms {
1117 continue;
1118 }
1119 let (run_count, next_fire_at_ms) = compute_misfire_plan(
1120 now_ms,
1121 next_fire_at_ms,
1122 interval_ms,
1123 &routine.misfire_policy,
1124 );
1125 routine.next_fire_at_ms = Some(next_fire_at_ms);
1126 if run_count == 0 {
1127 continue;
1128 }
1129 plans.push(RoutineTriggerPlan {
1130 routine_id: routine.routine_id.clone(),
1131 run_count,
1132 scheduled_at_ms: now_ms,
1133 next_fire_at_ms,
1134 });
1135 }
1136 drop(guard);
1137 let _ = self.persist_routines().await;
1138 plans
1139 }
1140
1141 pub async fn mark_routine_fired(
1142 &self,
1143 routine_id: &str,
1144 fired_at_ms: u64,
1145 ) -> Option<RoutineSpec> {
1146 let mut guard = self.routines.write().await;
1147 let routine = guard.get_mut(routine_id)?;
1148 routine.last_fired_at_ms = Some(fired_at_ms);
1149 let updated = routine.clone();
1150 drop(guard);
1151 let _ = self.persist_routines().await;
1152 Some(updated)
1153 }
1154
1155 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1156 let mut history = self.routine_history.write().await;
1157 history
1158 .entry(event.routine_id.clone())
1159 .or_default()
1160 .push(event);
1161 drop(history);
1162 let _ = self.persist_routine_history().await;
1163 }
1164
1165 pub async fn list_routine_history(
1166 &self,
1167 routine_id: &str,
1168 limit: usize,
1169 ) -> Vec<RoutineHistoryEvent> {
1170 let limit = limit.clamp(1, 500);
1171 let mut rows = self
1172 .routine_history
1173 .read()
1174 .await
1175 .get(routine_id)
1176 .cloned()
1177 .unwrap_or_default();
1178 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1179 rows.truncate(limit);
1180 rows
1181 }
1182
1183 pub async fn create_routine_run(
1184 &self,
1185 routine: &RoutineSpec,
1186 trigger_type: &str,
1187 run_count: u32,
1188 status: RoutineRunStatus,
1189 detail: Option<String>,
1190 ) -> RoutineRunRecord {
1191 let now = now_ms();
1192 let record = RoutineRunRecord {
1193 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1194 routine_id: routine.routine_id.clone(),
1195 trigger_type: trigger_type.to_string(),
1196 run_count,
1197 status,
1198 created_at_ms: now,
1199 updated_at_ms: now,
1200 fired_at_ms: Some(now),
1201 started_at_ms: None,
1202 finished_at_ms: None,
1203 requires_approval: routine.requires_approval,
1204 approval_reason: None,
1205 denial_reason: None,
1206 paused_reason: None,
1207 detail,
1208 entrypoint: routine.entrypoint.clone(),
1209 args: routine.args.clone(),
1210 allowed_tools: routine.allowed_tools.clone(),
1211 output_targets: routine.output_targets.clone(),
1212 artifacts: Vec::new(),
1213 };
1214 self.routine_runs
1215 .write()
1216 .await
1217 .insert(record.run_id.clone(), record.clone());
1218 let _ = self.persist_routine_runs().await;
1219 record
1220 }
1221
1222 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1223 self.routine_runs.read().await.get(run_id).cloned()
1224 }
1225
1226 pub async fn list_routine_runs(
1227 &self,
1228 routine_id: Option<&str>,
1229 limit: usize,
1230 ) -> Vec<RoutineRunRecord> {
1231 let mut rows = self
1232 .routine_runs
1233 .read()
1234 .await
1235 .values()
1236 .filter(|row| {
1237 if let Some(id) = routine_id {
1238 row.routine_id == id
1239 } else {
1240 true
1241 }
1242 })
1243 .cloned()
1244 .collect::<Vec<_>>();
1245 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1246 rows.truncate(limit.clamp(1, 500));
1247 rows
1248 }
1249
1250 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1251 let mut guard = self.routine_runs.write().await;
1252 let next_run_id = guard
1253 .values()
1254 .filter(|row| row.status == RoutineRunStatus::Queued)
1255 .min_by(|a, b| {
1256 a.created_at_ms
1257 .cmp(&b.created_at_ms)
1258 .then_with(|| a.run_id.cmp(&b.run_id))
1259 })
1260 .map(|row| row.run_id.clone())?;
1261 let now = now_ms();
1262 let row = guard.get_mut(&next_run_id)?;
1263 row.status = RoutineRunStatus::Running;
1264 row.updated_at_ms = now;
1265 row.started_at_ms = Some(now);
1266 let claimed = row.clone();
1267 drop(guard);
1268 let _ = self.persist_routine_runs().await;
1269 Some(claimed)
1270 }
1271
1272 pub async fn set_routine_session_policy(
1273 &self,
1274 session_id: String,
1275 run_id: String,
1276 routine_id: String,
1277 allowed_tools: Vec<String>,
1278 ) {
1279 let policy = RoutineSessionPolicy {
1280 session_id: session_id.clone(),
1281 run_id,
1282 routine_id,
1283 allowed_tools: normalize_allowed_tools(allowed_tools),
1284 };
1285 self.routine_session_policies
1286 .write()
1287 .await
1288 .insert(session_id, policy);
1289 }
1290
1291 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1292 self.routine_session_policies
1293 .read()
1294 .await
1295 .get(session_id)
1296 .cloned()
1297 }
1298
1299 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1300 self.routine_session_policies
1301 .write()
1302 .await
1303 .remove(session_id);
1304 }
1305
1306 pub async fn update_routine_run_status(
1307 &self,
1308 run_id: &str,
1309 status: RoutineRunStatus,
1310 reason: Option<String>,
1311 ) -> Option<RoutineRunRecord> {
1312 let mut guard = self.routine_runs.write().await;
1313 let row = guard.get_mut(run_id)?;
1314 row.status = status.clone();
1315 row.updated_at_ms = now_ms();
1316 match status {
1317 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1318 RoutineRunStatus::Running => {
1319 row.started_at_ms.get_or_insert_with(now_ms);
1320 if let Some(detail) = reason {
1321 row.detail = Some(detail);
1322 }
1323 }
1324 RoutineRunStatus::Denied => row.denial_reason = reason,
1325 RoutineRunStatus::Paused => row.paused_reason = reason,
1326 RoutineRunStatus::Completed
1327 | RoutineRunStatus::Failed
1328 | RoutineRunStatus::Cancelled => {
1329 row.finished_at_ms = Some(now_ms());
1330 if let Some(detail) = reason {
1331 row.detail = Some(detail);
1332 }
1333 }
1334 _ => {
1335 if let Some(detail) = reason {
1336 row.detail = Some(detail);
1337 }
1338 }
1339 }
1340 let updated = row.clone();
1341 drop(guard);
1342 let _ = self.persist_routine_runs().await;
1343 Some(updated)
1344 }
1345
1346 pub async fn append_routine_run_artifact(
1347 &self,
1348 run_id: &str,
1349 artifact: RoutineRunArtifact,
1350 ) -> Option<RoutineRunRecord> {
1351 let mut guard = self.routine_runs.write().await;
1352 let row = guard.get_mut(run_id)?;
1353 row.updated_at_ms = now_ms();
1354 row.artifacts.push(artifact);
1355 let updated = row.clone();
1356 drop(guard);
1357 let _ = self.persist_routine_runs().await;
1358 Some(updated)
1359 }
1360}
1361
1362async fn build_channels_config(
1363 state: &AppState,
1364 channels: &ChannelsConfigFile,
1365) -> Option<ChannelsConfig> {
1366 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1367 return None;
1368 }
1369 Some(ChannelsConfig {
1370 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1371 bot_token: cfg.bot_token,
1372 allowed_users: cfg.allowed_users,
1373 mention_only: cfg.mention_only,
1374 }),
1375 discord: channels.discord.clone().map(|cfg| DiscordConfig {
1376 bot_token: cfg.bot_token,
1377 guild_id: cfg.guild_id,
1378 allowed_users: cfg.allowed_users,
1379 mention_only: cfg.mention_only,
1380 }),
1381 slack: channels.slack.clone().map(|cfg| SlackConfig {
1382 bot_token: cfg.bot_token,
1383 channel_id: cfg.channel_id,
1384 allowed_users: cfg.allowed_users,
1385 }),
1386 server_base_url: state.server_base_url(),
1387 api_token: state.api_token().await.unwrap_or_default(),
1388 tool_policy: channels.tool_policy.clone(),
1389 })
1390}
1391
1392fn normalize_web_ui_prefix(prefix: &str) -> String {
1393 let trimmed = prefix.trim();
1394 if trimmed.is_empty() || trimmed == "/" {
1395 return "/admin".to_string();
1396 }
1397 let with_leading = if trimmed.starts_with('/') {
1398 trimmed.to_string()
1399 } else {
1400 format!("/{trimmed}")
1401 };
1402 with_leading.trim_end_matches('/').to_string()
1403}
1404
1405fn default_web_ui_prefix() -> String {
1406 "/admin".to_string()
1407}
1408
1409fn default_allow_all() -> Vec<String> {
1410 vec!["*".to_string()]
1411}
1412
1413fn default_discord_mention_only() -> bool {
1414 true
1415}
1416
1417fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
1418 normalize_non_empty_list(raw)
1419}
1420
1421fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1422 let mut out = Vec::new();
1423 let mut seen = std::collections::HashSet::new();
1424 for item in raw {
1425 let normalized = item.trim().to_string();
1426 if normalized.is_empty() {
1427 continue;
1428 }
1429 if seen.insert(normalized.clone()) {
1430 out.push(normalized);
1431 }
1432 }
1433 out
1434}
1435
1436fn resolve_run_stale_ms() -> u64 {
1437 std::env::var("TANDEM_RUN_STALE_MS")
1438 .ok()
1439 .and_then(|v| v.trim().parse::<u64>().ok())
1440 .unwrap_or(120_000)
1441 .clamp(30_000, 600_000)
1442}
1443
1444fn resolve_shared_resources_path() -> PathBuf {
1445 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1446 let trimmed = dir.trim();
1447 if !trimmed.is_empty() {
1448 return PathBuf::from(trimmed).join("shared_resources.json");
1449 }
1450 }
1451 default_state_dir().join("shared_resources.json")
1452}
1453
1454fn resolve_routines_path() -> PathBuf {
1455 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
1456 let trimmed = dir.trim();
1457 if !trimmed.is_empty() {
1458 return PathBuf::from(trimmed).join("routines.json");
1459 }
1460 }
1461 default_state_dir().join("routines.json")
1462}
1463
1464fn resolve_routine_history_path() -> PathBuf {
1465 if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
1466 let trimmed = root.trim();
1467 if !trimmed.is_empty() {
1468 return PathBuf::from(trimmed).join("routine_history.json");
1469 }
1470 }
1471 default_state_dir().join("routine_history.json")
1472}
1473
1474fn resolve_routine_runs_path() -> PathBuf {
1475 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
1476 let trimmed = root.trim();
1477 if !trimmed.is_empty() {
1478 return PathBuf::from(trimmed).join("routine_runs.json");
1479 }
1480 }
1481 default_state_dir().join("routine_runs.json")
1482}
1483
1484fn resolve_agent_team_audit_path() -> PathBuf {
1485 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
1486 let trimmed = base.trim();
1487 if !trimmed.is_empty() {
1488 return PathBuf::from(trimmed)
1489 .join("agent-team")
1490 .join("audit.log.jsonl");
1491 }
1492 }
1493 default_state_dir()
1494 .join("agent-team")
1495 .join("audit.log.jsonl")
1496}
1497
1498fn default_state_dir() -> PathBuf {
1499 if let Ok(paths) = resolve_shared_paths() {
1500 return paths.engine_state_dir;
1501 }
1502 if let Some(data_dir) = dirs::data_dir() {
1503 return data_dir.join("tandem").join("data");
1504 }
1505 dirs::home_dir()
1506 .map(|home| home.join(".tandem").join("data"))
1507 .unwrap_or_else(|| PathBuf::from(".tandem"))
1508}
1509
1510fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
1511 match schedule {
1512 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
1513 RoutineSchedule::Cron { .. } => None,
1514 }
1515}
1516
1517fn compute_misfire_plan(
1518 now_ms: u64,
1519 next_fire_at_ms: u64,
1520 interval_ms: u64,
1521 policy: &RoutineMisfirePolicy,
1522) -> (u32, u64) {
1523 if now_ms < next_fire_at_ms || interval_ms == 0 {
1524 return (0, next_fire_at_ms);
1525 }
1526 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
1527 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
1528 match policy {
1529 RoutineMisfirePolicy::Skip => (0, aligned_next),
1530 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
1531 RoutineMisfirePolicy::CatchUp { max_runs } => {
1532 let count = missed.min(u64::from(*max_runs)) as u32;
1533 (count, aligned_next)
1534 }
1535 }
1536}
1537
1538#[derive(Debug, Clone, PartialEq, Eq)]
1539pub enum RoutineExecutionDecision {
1540 Allowed,
1541 RequiresApproval { reason: String },
1542 Blocked { reason: String },
1543}
1544
1545pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
1546 let entrypoint = routine.entrypoint.to_ascii_lowercase();
1547 if entrypoint.starts_with("connector.")
1548 || entrypoint.starts_with("integration.")
1549 || entrypoint.contains("external")
1550 {
1551 return true;
1552 }
1553 routine
1554 .args
1555 .get("uses_external_integrations")
1556 .and_then(|v| v.as_bool())
1557 .unwrap_or(false)
1558 || routine
1559 .args
1560 .get("connector_id")
1561 .and_then(|v| v.as_str())
1562 .is_some()
1563}
1564
1565pub fn evaluate_routine_execution_policy(
1566 routine: &RoutineSpec,
1567 trigger_type: &str,
1568) -> RoutineExecutionDecision {
1569 if !routine_uses_external_integrations(routine) {
1570 return RoutineExecutionDecision::Allowed;
1571 }
1572 if !routine.external_integrations_allowed {
1573 return RoutineExecutionDecision::Blocked {
1574 reason: "external integrations are disabled by policy".to_string(),
1575 };
1576 }
1577 if routine.requires_approval {
1578 return RoutineExecutionDecision::RequiresApproval {
1579 reason: format!(
1580 "manual approval required before external side effects ({})",
1581 trigger_type
1582 ),
1583 };
1584 }
1585 RoutineExecutionDecision::Allowed
1586}
1587
1588fn is_valid_resource_key(key: &str) -> bool {
1589 let trimmed = key.trim();
1590 if trimmed.is_empty() {
1591 return false;
1592 }
1593 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
1594 if !allowed_prefix
1595 .iter()
1596 .any(|prefix| trimmed.starts_with(prefix))
1597 {
1598 return false;
1599 }
1600 !trimmed.contains("//")
1601}
1602
1603impl Deref for AppState {
1604 type Target = RuntimeState;
1605
1606 fn deref(&self) -> &Self::Target {
1607 self.runtime
1608 .get()
1609 .expect("runtime accessed before startup completion")
1610 }
1611}
1612
1613#[derive(Clone)]
1614struct ServerPromptContextHook {
1615 state: AppState,
1616}
1617
1618impl ServerPromptContextHook {
1619 fn new(state: AppState) -> Self {
1620 Self { state }
1621 }
1622
1623 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
1624 let paths = resolve_shared_paths().ok()?;
1625 MemoryDatabase::new(&paths.memory_db_path).await.ok()
1626 }
1627
1628 fn hash_query(input: &str) -> String {
1629 let mut hasher = Sha256::new();
1630 hasher.update(input.as_bytes());
1631 format!("{:x}", hasher.finalize())
1632 }
1633
1634 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
1635 let mut out = vec!["<memory_context>".to_string()];
1636 let mut used = 0usize;
1637 for hit in hits {
1638 let text = hit
1639 .record
1640 .content
1641 .split_whitespace()
1642 .take(60)
1643 .collect::<Vec<_>>()
1644 .join(" ");
1645 let line = format!(
1646 "- [{:.3}] {} (source={}, run={})",
1647 hit.score, text, hit.record.source_type, hit.record.run_id
1648 );
1649 used = used.saturating_add(line.len());
1650 if used > 2200 {
1651 break;
1652 }
1653 out.push(line);
1654 }
1655 out.push("</memory_context>".to_string());
1656 out.join("\n")
1657 }
1658}
1659
1660impl PromptContextHook for ServerPromptContextHook {
1661 fn augment_provider_messages(
1662 &self,
1663 ctx: PromptContextHookContext,
1664 mut messages: Vec<ChatMessage>,
1665 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
1666 let this = self.clone();
1667 Box::pin(async move {
1668 let run = this.state.run_registry.get(&ctx.session_id).await;
1669 let Some(run) = run else {
1670 return Ok(messages);
1671 };
1672 let run_id = run.run_id;
1673 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
1674 let query = messages
1675 .iter()
1676 .rev()
1677 .find(|m| m.role == "user")
1678 .map(|m| m.content.clone())
1679 .unwrap_or_default();
1680 if query.trim().is_empty() {
1681 return Ok(messages);
1682 }
1683
1684 let Some(db) = this.open_memory_db().await else {
1685 return Ok(messages);
1686 };
1687 let started = now_ms();
1688 let hits = db
1689 .search_global_memory(&user_id, &query, 8, None, None, None)
1690 .await
1691 .unwrap_or_default();
1692 let latency_ms = now_ms().saturating_sub(started);
1693 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
1694 this.state.event_bus.publish(EngineEvent::new(
1695 "memory.search.performed",
1696 json!({
1697 "runID": run_id,
1698 "sessionID": ctx.session_id,
1699 "messageID": ctx.message_id,
1700 "providerID": ctx.provider_id,
1701 "modelID": ctx.model_id,
1702 "iteration": ctx.iteration,
1703 "queryHash": Self::hash_query(&query),
1704 "resultCount": hits.len(),
1705 "scoreMin": scores.iter().copied().reduce(f64::min),
1706 "scoreMax": scores.iter().copied().reduce(f64::max),
1707 "scores": scores,
1708 "latencyMs": latency_ms,
1709 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
1710 }),
1711 ));
1712
1713 if hits.is_empty() {
1714 return Ok(messages);
1715 }
1716
1717 let memory_block = Self::build_memory_block(&hits);
1718 messages.push(ChatMessage {
1719 role: "system".to_string(),
1720 content: memory_block.clone(),
1721 });
1722 this.state.event_bus.publish(EngineEvent::new(
1723 "memory.context.injected",
1724 json!({
1725 "runID": run_id,
1726 "sessionID": ctx.session_id,
1727 "messageID": ctx.message_id,
1728 "iteration": ctx.iteration,
1729 "count": hits.len(),
1730 "tokenSizeApprox": memory_block.split_whitespace().count(),
1731 }),
1732 ));
1733 Ok(messages)
1734 })
1735 }
1736}
1737
1738fn extract_event_session_id(properties: &Value) -> Option<String> {
1739 properties
1740 .get("sessionID")
1741 .or_else(|| properties.get("sessionId"))
1742 .or_else(|| properties.get("id"))
1743 .and_then(|v| v.as_str())
1744 .map(|s| s.to_string())
1745}
1746
1747fn extract_event_run_id(properties: &Value) -> Option<String> {
1748 properties
1749 .get("runID")
1750 .or_else(|| properties.get("run_id"))
1751 .and_then(|v| v.as_str())
1752 .map(|s| s.to_string())
1753}
1754
1755fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
1756 let session_id = extract_event_session_id(&event.properties)?;
1757 let run_id = extract_event_run_id(&event.properties);
1758 let key = format!("run/{session_id}/status");
1759
1760 let mut base = serde_json::Map::new();
1761 base.insert("sessionID".to_string(), Value::String(session_id));
1762 if let Some(run_id) = run_id {
1763 base.insert("runID".to_string(), Value::String(run_id));
1764 }
1765
1766 match event.event_type.as_str() {
1767 "session.run.started" => {
1768 base.insert("state".to_string(), Value::String("running".to_string()));
1769 base.insert("phase".to_string(), Value::String("run".to_string()));
1770 base.insert(
1771 "eventType".to_string(),
1772 Value::String("session.run.started".to_string()),
1773 );
1774 Some(StatusIndexUpdate {
1775 key,
1776 value: Value::Object(base),
1777 })
1778 }
1779 "session.run.finished" => {
1780 base.insert("state".to_string(), Value::String("finished".to_string()));
1781 base.insert("phase".to_string(), Value::String("run".to_string()));
1782 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
1783 base.insert("result".to_string(), Value::String(status.to_string()));
1784 }
1785 base.insert(
1786 "eventType".to_string(),
1787 Value::String("session.run.finished".to_string()),
1788 );
1789 Some(StatusIndexUpdate {
1790 key,
1791 value: Value::Object(base),
1792 })
1793 }
1794 "message.part.updated" => {
1795 let part_type = event
1796 .properties
1797 .get("part")
1798 .and_then(|v| v.get("type"))
1799 .and_then(|v| v.as_str())?;
1800 let (phase, tool_active) = match part_type {
1801 "tool-invocation" => ("tool", true),
1802 "tool-result" => ("run", false),
1803 _ => return None,
1804 };
1805 base.insert("state".to_string(), Value::String("running".to_string()));
1806 base.insert("phase".to_string(), Value::String(phase.to_string()));
1807 base.insert("toolActive".to_string(), Value::Bool(tool_active));
1808 if let Some(tool_name) = event
1809 .properties
1810 .get("part")
1811 .and_then(|v| v.get("tool"))
1812 .and_then(|v| v.as_str())
1813 {
1814 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
1815 }
1816 base.insert(
1817 "eventType".to_string(),
1818 Value::String("message.part.updated".to_string()),
1819 );
1820 Some(StatusIndexUpdate {
1821 key,
1822 value: Value::Object(base),
1823 })
1824 }
1825 _ => None,
1826 }
1827}
1828
1829pub async fn run_status_indexer(state: AppState) {
1830 let mut rx = state.event_bus.subscribe();
1831 loop {
1832 match rx.recv().await {
1833 Ok(event) => {
1834 if let Some(update) = derive_status_index_update(&event) {
1835 if let Err(error) = state
1836 .put_shared_resource(
1837 update.key,
1838 update.value,
1839 None,
1840 "system.status_indexer".to_string(),
1841 None,
1842 )
1843 .await
1844 {
1845 tracing::warn!("status indexer failed to persist update: {error:?}");
1846 }
1847 }
1848 }
1849 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1850 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1851 }
1852 }
1853}
1854
1855pub async fn run_agent_team_supervisor(state: AppState) {
1856 let mut rx = state.event_bus.subscribe();
1857 loop {
1858 match rx.recv().await {
1859 Ok(event) => {
1860 state.agent_teams.handle_engine_event(&state, &event).await;
1861 }
1862 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1863 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
1864 }
1865 }
1866}
1867
1868pub async fn run_routine_scheduler(state: AppState) {
1869 loop {
1870 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1871 let now = now_ms();
1872 let plans = state.evaluate_routine_misfires(now).await;
1873 for plan in plans {
1874 let Some(routine) = state.get_routine(&plan.routine_id).await else {
1875 continue;
1876 };
1877 match evaluate_routine_execution_policy(&routine, "scheduled") {
1878 RoutineExecutionDecision::Allowed => {
1879 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
1880 let run = state
1881 .create_routine_run(
1882 &routine,
1883 "scheduled",
1884 plan.run_count,
1885 RoutineRunStatus::Queued,
1886 None,
1887 )
1888 .await;
1889 state
1890 .append_routine_history(RoutineHistoryEvent {
1891 routine_id: plan.routine_id.clone(),
1892 trigger_type: "scheduled".to_string(),
1893 run_count: plan.run_count,
1894 fired_at_ms: now,
1895 status: "queued".to_string(),
1896 detail: None,
1897 })
1898 .await;
1899 state.event_bus.publish(EngineEvent::new(
1900 "routine.fired",
1901 serde_json::json!({
1902 "routineID": plan.routine_id,
1903 "runID": run.run_id,
1904 "runCount": plan.run_count,
1905 "scheduledAtMs": plan.scheduled_at_ms,
1906 "nextFireAtMs": plan.next_fire_at_ms,
1907 }),
1908 ));
1909 state.event_bus.publish(EngineEvent::new(
1910 "routine.run.created",
1911 serde_json::json!({
1912 "run": run,
1913 }),
1914 ));
1915 }
1916 RoutineExecutionDecision::RequiresApproval { reason } => {
1917 let run = state
1918 .create_routine_run(
1919 &routine,
1920 "scheduled",
1921 plan.run_count,
1922 RoutineRunStatus::PendingApproval,
1923 Some(reason.clone()),
1924 )
1925 .await;
1926 state
1927 .append_routine_history(RoutineHistoryEvent {
1928 routine_id: plan.routine_id.clone(),
1929 trigger_type: "scheduled".to_string(),
1930 run_count: plan.run_count,
1931 fired_at_ms: now,
1932 status: "pending_approval".to_string(),
1933 detail: Some(reason.clone()),
1934 })
1935 .await;
1936 state.event_bus.publish(EngineEvent::new(
1937 "routine.approval_required",
1938 serde_json::json!({
1939 "routineID": plan.routine_id,
1940 "runID": run.run_id,
1941 "runCount": plan.run_count,
1942 "triggerType": "scheduled",
1943 "reason": reason,
1944 }),
1945 ));
1946 state.event_bus.publish(EngineEvent::new(
1947 "routine.run.created",
1948 serde_json::json!({
1949 "run": run,
1950 }),
1951 ));
1952 }
1953 RoutineExecutionDecision::Blocked { reason } => {
1954 let run = state
1955 .create_routine_run(
1956 &routine,
1957 "scheduled",
1958 plan.run_count,
1959 RoutineRunStatus::BlockedPolicy,
1960 Some(reason.clone()),
1961 )
1962 .await;
1963 state
1964 .append_routine_history(RoutineHistoryEvent {
1965 routine_id: plan.routine_id.clone(),
1966 trigger_type: "scheduled".to_string(),
1967 run_count: plan.run_count,
1968 fired_at_ms: now,
1969 status: "blocked_policy".to_string(),
1970 detail: Some(reason.clone()),
1971 })
1972 .await;
1973 state.event_bus.publish(EngineEvent::new(
1974 "routine.blocked",
1975 serde_json::json!({
1976 "routineID": plan.routine_id,
1977 "runID": run.run_id,
1978 "runCount": plan.run_count,
1979 "triggerType": "scheduled",
1980 "reason": reason,
1981 }),
1982 ));
1983 state.event_bus.publish(EngineEvent::new(
1984 "routine.run.created",
1985 serde_json::json!({
1986 "run": run,
1987 }),
1988 ));
1989 }
1990 }
1991 }
1992 }
1993}
1994
1995pub async fn run_routine_executor(state: AppState) {
1996 loop {
1997 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1998 let Some(run) = state.claim_next_queued_routine_run().await else {
1999 continue;
2000 };
2001
2002 state.event_bus.publish(EngineEvent::new(
2003 "routine.run.started",
2004 serde_json::json!({
2005 "runID": run.run_id,
2006 "routineID": run.routine_id,
2007 "triggerType": run.trigger_type,
2008 "startedAtMs": now_ms(),
2009 }),
2010 ));
2011
2012 let workspace_root = state.workspace_index.snapshot().await.root;
2013 let mut session = Session::new(
2014 Some(format!("Routine {}", run.routine_id)),
2015 Some(workspace_root.clone()),
2016 );
2017 let session_id = session.id.clone();
2018 session.workspace_root = Some(workspace_root);
2019
2020 if let Err(error) = state.storage.save_session(session).await {
2021 let detail = format!("failed to create routine session: {error}");
2022 let _ = state
2023 .update_routine_run_status(
2024 &run.run_id,
2025 RoutineRunStatus::Failed,
2026 Some(detail.clone()),
2027 )
2028 .await;
2029 state.event_bus.publish(EngineEvent::new(
2030 "routine.run.failed",
2031 serde_json::json!({
2032 "runID": run.run_id,
2033 "routineID": run.routine_id,
2034 "reason": detail,
2035 }),
2036 ));
2037 continue;
2038 }
2039
2040 state
2041 .set_routine_session_policy(
2042 session_id.clone(),
2043 run.run_id.clone(),
2044 run.routine_id.clone(),
2045 run.allowed_tools.clone(),
2046 )
2047 .await;
2048 state
2049 .engine_loop
2050 .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
2051 .await;
2052
2053 let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
2054 if let Some(spec) = selected_model.as_ref() {
2055 state.event_bus.publish(EngineEvent::new(
2056 "routine.run.model_selected",
2057 serde_json::json!({
2058 "runID": run.run_id,
2059 "routineID": run.routine_id,
2060 "providerID": spec.provider_id,
2061 "modelID": spec.model_id,
2062 "source": model_source,
2063 }),
2064 ));
2065 }
2066
2067 let request = SendMessageRequest {
2068 parts: vec![MessagePartInput::Text {
2069 text: build_routine_prompt(&state, &run).await,
2070 }],
2071 model: selected_model,
2072 agent: None,
2073 };
2074
2075 let run_result = state
2076 .engine_loop
2077 .run_prompt_async_with_context(
2078 session_id.clone(),
2079 request,
2080 Some(format!("routine:{}", run.run_id)),
2081 )
2082 .await;
2083
2084 state.clear_routine_session_policy(&session_id).await;
2085 state
2086 .engine_loop
2087 .clear_session_allowed_tools(&session_id)
2088 .await;
2089
2090 match run_result {
2091 Ok(()) => {
2092 append_configured_output_artifacts(&state, &run).await;
2093 let _ = state
2094 .update_routine_run_status(
2095 &run.run_id,
2096 RoutineRunStatus::Completed,
2097 Some("routine run completed".to_string()),
2098 )
2099 .await;
2100 state.event_bus.publish(EngineEvent::new(
2101 "routine.run.completed",
2102 serde_json::json!({
2103 "runID": run.run_id,
2104 "routineID": run.routine_id,
2105 "sessionID": session_id,
2106 "finishedAtMs": now_ms(),
2107 }),
2108 ));
2109 }
2110 Err(error) => {
2111 let detail = truncate_text(&error.to_string(), 500);
2112 let _ = state
2113 .update_routine_run_status(
2114 &run.run_id,
2115 RoutineRunStatus::Failed,
2116 Some(detail.clone()),
2117 )
2118 .await;
2119 state.event_bus.publish(EngineEvent::new(
2120 "routine.run.failed",
2121 serde_json::json!({
2122 "runID": run.run_id,
2123 "routineID": run.routine_id,
2124 "sessionID": session_id,
2125 "reason": detail,
2126 "finishedAtMs": now_ms(),
2127 }),
2128 ));
2129 }
2130 }
2131 }
2132}
2133
2134async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
2135 let normalized_entrypoint = run.entrypoint.trim();
2136 let known_tool = state
2137 .tools
2138 .list()
2139 .await
2140 .into_iter()
2141 .any(|schema| schema.name == normalized_entrypoint);
2142 if known_tool {
2143 let args = if run.args.is_object() {
2144 run.args.clone()
2145 } else {
2146 serde_json::json!({})
2147 };
2148 return format!("/tool {} {}", normalized_entrypoint, args);
2149 }
2150
2151 if let Some(objective) = routine_objective_from_args(run) {
2152 return build_routine_mission_prompt(run, &objective);
2153 }
2154
2155 format!(
2156 "Execute routine '{}' using entrypoint '{}' with args: {}",
2157 run.routine_id, run.entrypoint, run.args
2158 )
2159}
2160
2161fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
2162 run.args
2163 .get("prompt")
2164 .and_then(|v| v.as_str())
2165 .map(str::trim)
2166 .filter(|v| !v.is_empty())
2167 .map(ToString::to_string)
2168}
2169
2170fn routine_mode_from_args(args: &Value) -> &str {
2171 args.get("mode")
2172 .and_then(|v| v.as_str())
2173 .map(str::trim)
2174 .filter(|v| !v.is_empty())
2175 .unwrap_or("standalone")
2176}
2177
2178fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
2179 args.get("success_criteria")
2180 .and_then(|v| v.as_array())
2181 .map(|rows| {
2182 rows.iter()
2183 .filter_map(|row| row.as_str())
2184 .map(str::trim)
2185 .filter(|row| !row.is_empty())
2186 .map(ToString::to_string)
2187 .collect::<Vec<_>>()
2188 })
2189 .unwrap_or_default()
2190}
2191
2192fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
2193 let mode = routine_mode_from_args(&run.args);
2194 let success_criteria = routine_success_criteria_from_args(&run.args);
2195 let orchestrator_only_tool_calls = run
2196 .args
2197 .get("orchestrator_only_tool_calls")
2198 .and_then(|v| v.as_bool())
2199 .unwrap_or(false);
2200
2201 let mut lines = vec![
2202 format!("Automation ID: {}", run.routine_id),
2203 format!("Run ID: {}", run.run_id),
2204 format!("Mode: {}", mode),
2205 format!("Mission Objective: {}", objective),
2206 ];
2207
2208 if !success_criteria.is_empty() {
2209 lines.push("Success Criteria:".to_string());
2210 for criterion in success_criteria {
2211 lines.push(format!("- {}", criterion));
2212 }
2213 }
2214
2215 if run.allowed_tools.is_empty() {
2216 lines.push("Allowed Tools: all available by current policy".to_string());
2217 } else {
2218 lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
2219 }
2220
2221 if run.output_targets.is_empty() {
2222 lines.push("Output Targets: none configured".to_string());
2223 } else {
2224 lines.push("Output Targets:".to_string());
2225 for target in &run.output_targets {
2226 lines.push(format!("- {}", target));
2227 }
2228 }
2229
2230 if mode.eq_ignore_ascii_case("orchestrated") {
2231 lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
2232 lines
2233 .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
2234 if orchestrator_only_tool_calls {
2235 lines.push(
2236 "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
2237 .to_string(),
2238 );
2239 }
2240 } else {
2241 lines.push("Execution Pattern: Standalone mission run".to_string());
2242 }
2243
2244 lines.push(
2245 "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
2246 .to_string(),
2247 );
2248
2249 lines.join("\n")
2250}
2251
2252fn truncate_text(input: &str, max_len: usize) -> String {
2253 if input.len() <= max_len {
2254 return input.to_string();
2255 }
2256 let mut out = input[..max_len].to_string();
2257 out.push_str("...<truncated>");
2258 out
2259}
2260
2261async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
2262 if run.output_targets.is_empty() {
2263 return;
2264 }
2265 for target in &run.output_targets {
2266 let artifact = RoutineRunArtifact {
2267 artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
2268 uri: target.clone(),
2269 kind: "output_target".to_string(),
2270 label: Some("configured output target".to_string()),
2271 created_at_ms: now_ms(),
2272 metadata: Some(serde_json::json!({
2273 "source": "routine.output_targets",
2274 "runID": run.run_id,
2275 "routineID": run.routine_id,
2276 })),
2277 };
2278 let _ = state
2279 .append_routine_run_artifact(&run.run_id, artifact.clone())
2280 .await;
2281 state.event_bus.publish(EngineEvent::new(
2282 "routine.run.artifact_added",
2283 serde_json::json!({
2284 "runID": run.run_id,
2285 "routineID": run.routine_id,
2286 "artifact": artifact,
2287 }),
2288 ));
2289 }
2290}
2291
2292fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
2293 let obj = value.as_object()?;
2294 let provider_id = obj.get("provider_id")?.as_str()?.trim();
2295 let model_id = obj.get("model_id")?.as_str()?.trim();
2296 if provider_id.is_empty() || model_id.is_empty() {
2297 return None;
2298 }
2299 Some(ModelSpec {
2300 provider_id: provider_id.to_string(),
2301 model_id: model_id.to_string(),
2302 })
2303}
2304
2305fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
2306 args.get("model_policy")
2307 .and_then(|v| v.get("role_models"))
2308 .and_then(|v| v.get(role))
2309 .and_then(parse_model_spec)
2310}
2311
2312fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
2313 args.get("model_policy")
2314 .and_then(|v| v.get("default_model"))
2315 .and_then(parse_model_spec)
2316}
2317
2318fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
2319 providers.iter().any(|provider| {
2320 provider.id == spec.provider_id
2321 && provider
2322 .models
2323 .iter()
2324 .any(|model| model.id == spec.model_id)
2325 })
2326}
2327
2328async fn resolve_routine_model_spec_for_run(
2329 state: &AppState,
2330 run: &RoutineRunRecord,
2331) -> (Option<ModelSpec>, String) {
2332 let providers = state.providers.list().await;
2333 let mode = routine_mode_from_args(&run.args);
2334 let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
2335
2336 if mode.eq_ignore_ascii_case("orchestrated") {
2337 if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
2338 requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
2339 }
2340 }
2341 if let Some(default_model) = default_model_spec_from_args(&run.args) {
2342 requested.push((default_model, "args.model_policy.default_model"));
2343 }
2344
2345 for (candidate, source) in requested {
2346 if provider_catalog_has_model(&providers, &candidate) {
2347 return (Some(candidate), source.to_string());
2348 }
2349 }
2350
2351 let fallback = providers
2352 .into_iter()
2353 .find(|provider| !provider.models.is_empty())
2354 .and_then(|provider| {
2355 let model = provider.models.first()?;
2356 Some(ModelSpec {
2357 provider_id: provider.id,
2358 model_id: model.id.clone(),
2359 })
2360 });
2361
2362 (fallback, "provider_catalog_fallback".to_string())
2363}
2364
2365#[cfg(test)]
2366mod tests {
2367 use super::*;
2368
2369 fn test_state_with_path(path: PathBuf) -> AppState {
2370 let mut state = AppState::new_starting("test-attempt".to_string(), true);
2371 state.shared_resources_path = path;
2372 state.routines_path = tmp_routines_file("shared-state");
2373 state.routine_history_path = tmp_routines_file("routine-history");
2374 state.routine_runs_path = tmp_routines_file("routine-runs");
2375 state
2376 }
2377
2378 fn tmp_resource_file(name: &str) -> PathBuf {
2379 std::env::temp_dir().join(format!(
2380 "tandem-server-{name}-{}.json",
2381 uuid::Uuid::new_v4()
2382 ))
2383 }
2384
2385 fn tmp_routines_file(name: &str) -> PathBuf {
2386 std::env::temp_dir().join(format!(
2387 "tandem-server-routines-{name}-{}.json",
2388 uuid::Uuid::new_v4()
2389 ))
2390 }
2391
2392 #[tokio::test]
2393 async fn shared_resource_put_increments_revision() {
2394 let path = tmp_resource_file("shared-resource-put");
2395 let state = test_state_with_path(path.clone());
2396
2397 let first = state
2398 .put_shared_resource(
2399 "project/demo/board".to_string(),
2400 serde_json::json!({"status":"todo"}),
2401 None,
2402 "agent-1".to_string(),
2403 None,
2404 )
2405 .await
2406 .expect("first put");
2407 assert_eq!(first.rev, 1);
2408
2409 let second = state
2410 .put_shared_resource(
2411 "project/demo/board".to_string(),
2412 serde_json::json!({"status":"doing"}),
2413 Some(1),
2414 "agent-2".to_string(),
2415 Some(60_000),
2416 )
2417 .await
2418 .expect("second put");
2419 assert_eq!(second.rev, 2);
2420 assert_eq!(second.updated_by, "agent-2");
2421 assert_eq!(second.ttl_ms, Some(60_000));
2422
2423 let raw = tokio::fs::read_to_string(path.clone())
2424 .await
2425 .expect("persisted");
2426 assert!(raw.contains("\"rev\": 2"));
2427 let _ = tokio::fs::remove_file(path).await;
2428 }
2429
2430 #[tokio::test]
2431 async fn shared_resource_put_detects_revision_conflict() {
2432 let path = tmp_resource_file("shared-resource-conflict");
2433 let state = test_state_with_path(path.clone());
2434
2435 let _ = state
2436 .put_shared_resource(
2437 "mission/demo/card-1".to_string(),
2438 serde_json::json!({"title":"Card 1"}),
2439 None,
2440 "agent-1".to_string(),
2441 None,
2442 )
2443 .await
2444 .expect("seed put");
2445
2446 let conflict = state
2447 .put_shared_resource(
2448 "mission/demo/card-1".to_string(),
2449 serde_json::json!({"title":"Card 1 edited"}),
2450 Some(99),
2451 "agent-2".to_string(),
2452 None,
2453 )
2454 .await
2455 .expect_err("expected conflict");
2456
2457 match conflict {
2458 ResourceStoreError::RevisionConflict(conflict) => {
2459 assert_eq!(conflict.expected_rev, Some(99));
2460 assert_eq!(conflict.current_rev, Some(1));
2461 }
2462 other => panic!("unexpected error: {other:?}"),
2463 }
2464
2465 let _ = tokio::fs::remove_file(path).await;
2466 }
2467
2468 #[tokio::test]
2469 async fn shared_resource_rejects_invalid_namespace_key() {
2470 let path = tmp_resource_file("shared-resource-invalid-key");
2471 let state = test_state_with_path(path.clone());
2472
2473 let error = state
2474 .put_shared_resource(
2475 "global/demo/key".to_string(),
2476 serde_json::json!({"x":1}),
2477 None,
2478 "agent-1".to_string(),
2479 None,
2480 )
2481 .await
2482 .expect_err("invalid key should fail");
2483
2484 match error {
2485 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
2486 other => panic!("unexpected error: {other:?}"),
2487 }
2488
2489 assert!(!path.exists());
2490 }
2491
2492 #[test]
2493 fn derive_status_index_update_for_run_started() {
2494 let event = EngineEvent::new(
2495 "session.run.started",
2496 serde_json::json!({
2497 "sessionID": "s-1",
2498 "runID": "r-1"
2499 }),
2500 );
2501 let update = derive_status_index_update(&event).expect("update");
2502 assert_eq!(update.key, "run/s-1/status");
2503 assert_eq!(
2504 update.value.get("state").and_then(|v| v.as_str()),
2505 Some("running")
2506 );
2507 assert_eq!(
2508 update.value.get("phase").and_then(|v| v.as_str()),
2509 Some("run")
2510 );
2511 }
2512
2513 #[test]
2514 fn derive_status_index_update_for_tool_invocation() {
2515 let event = EngineEvent::new(
2516 "message.part.updated",
2517 serde_json::json!({
2518 "sessionID": "s-2",
2519 "runID": "r-2",
2520 "part": { "type": "tool-invocation", "tool": "todo_write" }
2521 }),
2522 );
2523 let update = derive_status_index_update(&event).expect("update");
2524 assert_eq!(update.key, "run/s-2/status");
2525 assert_eq!(
2526 update.value.get("phase").and_then(|v| v.as_str()),
2527 Some("tool")
2528 );
2529 assert_eq!(
2530 update.value.get("toolActive").and_then(|v| v.as_bool()),
2531 Some(true)
2532 );
2533 assert_eq!(
2534 update.value.get("tool").and_then(|v| v.as_str()),
2535 Some("todo_write")
2536 );
2537 }
2538
2539 #[test]
2540 fn misfire_skip_drops_runs_and_advances_next_fire() {
2541 let (count, next_fire) =
2542 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
2543 assert_eq!(count, 0);
2544 assert_eq!(next_fire, 11_000);
2545 }
2546
2547 #[test]
2548 fn misfire_run_once_emits_single_trigger() {
2549 let (count, next_fire) =
2550 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
2551 assert_eq!(count, 1);
2552 assert_eq!(next_fire, 11_000);
2553 }
2554
2555 #[test]
2556 fn misfire_catch_up_caps_trigger_count() {
2557 let (count, next_fire) = compute_misfire_plan(
2558 25_000,
2559 5_000,
2560 1_000,
2561 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2562 );
2563 assert_eq!(count, 3);
2564 assert_eq!(next_fire, 26_000);
2565 }
2566
2567 #[tokio::test]
2568 async fn routine_put_persists_and_loads() {
2569 let routines_path = tmp_routines_file("persist-load");
2570 let mut state = AppState::new_starting("routines-put".to_string(), true);
2571 state.routines_path = routines_path.clone();
2572
2573 let routine = RoutineSpec {
2574 routine_id: "routine-1".to_string(),
2575 name: "Digest".to_string(),
2576 status: RoutineStatus::Active,
2577 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2578 timezone: "UTC".to_string(),
2579 misfire_policy: RoutineMisfirePolicy::RunOnce,
2580 entrypoint: "mission.default".to_string(),
2581 args: serde_json::json!({"topic":"status"}),
2582 allowed_tools: vec![],
2583 output_targets: vec![],
2584 creator_type: "user".to_string(),
2585 creator_id: "user-1".to_string(),
2586 requires_approval: true,
2587 external_integrations_allowed: false,
2588 next_fire_at_ms: Some(5_000),
2589 last_fired_at_ms: None,
2590 };
2591
2592 state.put_routine(routine).await.expect("store routine");
2593
2594 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
2595 reloaded.routines_path = routines_path.clone();
2596 reloaded.load_routines().await.expect("load routines");
2597 let list = reloaded.list_routines().await;
2598 assert_eq!(list.len(), 1);
2599 assert_eq!(list[0].routine_id, "routine-1");
2600
2601 let _ = tokio::fs::remove_file(routines_path).await;
2602 }
2603
2604 #[tokio::test]
2605 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
2606 let routines_path = tmp_routines_file("misfire-eval");
2607 let mut state = AppState::new_starting("routines-eval".to_string(), true);
2608 state.routines_path = routines_path.clone();
2609
2610 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
2611 routine_id: id.to_string(),
2612 name: id.to_string(),
2613 status: RoutineStatus::Active,
2614 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
2615 timezone: "UTC".to_string(),
2616 misfire_policy: policy,
2617 entrypoint: "mission.default".to_string(),
2618 args: serde_json::json!({}),
2619 allowed_tools: vec![],
2620 output_targets: vec![],
2621 creator_type: "user".to_string(),
2622 creator_id: "u-1".to_string(),
2623 requires_approval: false,
2624 external_integrations_allowed: false,
2625 next_fire_at_ms: Some(5_000),
2626 last_fired_at_ms: None,
2627 };
2628
2629 state
2630 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
2631 .await
2632 .expect("put skip");
2633 state
2634 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
2635 .await
2636 .expect("put once");
2637 state
2638 .put_routine(base(
2639 "routine-catch",
2640 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
2641 ))
2642 .await
2643 .expect("put catch");
2644
2645 let plans = state.evaluate_routine_misfires(10_500).await;
2646 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
2647 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
2648 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
2649
2650 assert!(plan_skip.is_none());
2651 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
2652 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
2653
2654 let stored = state.list_routines().await;
2655 let skip_next = stored
2656 .iter()
2657 .find(|r| r.routine_id == "routine-skip")
2658 .and_then(|r| r.next_fire_at_ms)
2659 .expect("skip next");
2660 assert!(skip_next > 10_500);
2661
2662 let _ = tokio::fs::remove_file(routines_path).await;
2663 }
2664
2665 #[test]
2666 fn routine_policy_blocks_external_side_effects_by_default() {
2667 let routine = RoutineSpec {
2668 routine_id: "routine-policy-1".to_string(),
2669 name: "Connector routine".to_string(),
2670 status: RoutineStatus::Active,
2671 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2672 timezone: "UTC".to_string(),
2673 misfire_policy: RoutineMisfirePolicy::RunOnce,
2674 entrypoint: "connector.email.reply".to_string(),
2675 args: serde_json::json!({}),
2676 allowed_tools: vec![],
2677 output_targets: vec![],
2678 creator_type: "user".to_string(),
2679 creator_id: "u-1".to_string(),
2680 requires_approval: true,
2681 external_integrations_allowed: false,
2682 next_fire_at_ms: None,
2683 last_fired_at_ms: None,
2684 };
2685
2686 let decision = evaluate_routine_execution_policy(&routine, "manual");
2687 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
2688 }
2689
2690 #[test]
2691 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
2692 let routine = RoutineSpec {
2693 routine_id: "routine-policy-2".to_string(),
2694 name: "Connector routine".to_string(),
2695 status: RoutineStatus::Active,
2696 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2697 timezone: "UTC".to_string(),
2698 misfire_policy: RoutineMisfirePolicy::RunOnce,
2699 entrypoint: "connector.email.reply".to_string(),
2700 args: serde_json::json!({}),
2701 allowed_tools: vec![],
2702 output_targets: vec![],
2703 creator_type: "user".to_string(),
2704 creator_id: "u-1".to_string(),
2705 requires_approval: true,
2706 external_integrations_allowed: true,
2707 next_fire_at_ms: None,
2708 last_fired_at_ms: None,
2709 };
2710
2711 let decision = evaluate_routine_execution_policy(&routine, "manual");
2712 assert!(matches!(
2713 decision,
2714 RoutineExecutionDecision::RequiresApproval { .. }
2715 ));
2716 }
2717
2718 #[test]
2719 fn routine_policy_allows_non_external_entrypoints() {
2720 let routine = RoutineSpec {
2721 routine_id: "routine-policy-3".to_string(),
2722 name: "Internal mission routine".to_string(),
2723 status: RoutineStatus::Active,
2724 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
2725 timezone: "UTC".to_string(),
2726 misfire_policy: RoutineMisfirePolicy::RunOnce,
2727 entrypoint: "mission.default".to_string(),
2728 args: serde_json::json!({}),
2729 allowed_tools: vec![],
2730 output_targets: vec![],
2731 creator_type: "user".to_string(),
2732 creator_id: "u-1".to_string(),
2733 requires_approval: true,
2734 external_integrations_allowed: false,
2735 next_fire_at_ms: None,
2736 last_fired_at_ms: None,
2737 };
2738
2739 let decision = evaluate_routine_execution_policy(&routine, "manual");
2740 assert_eq!(decision, RoutineExecutionDecision::Allowed);
2741 }
2742
2743 #[tokio::test]
2744 async fn claim_next_queued_routine_run_marks_oldest_running() {
2745 let mut state = AppState::new_starting("routine-claim".to_string(), true);
2746 state.routine_runs_path = tmp_routines_file("routine-claim-runs");
2747
2748 let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
2749 run_id: run_id.to_string(),
2750 routine_id: "routine-claim".to_string(),
2751 trigger_type: "manual".to_string(),
2752 run_count: 1,
2753 status: RoutineRunStatus::Queued,
2754 created_at_ms,
2755 updated_at_ms: created_at_ms,
2756 fired_at_ms: Some(created_at_ms),
2757 started_at_ms: None,
2758 finished_at_ms: None,
2759 requires_approval: false,
2760 approval_reason: None,
2761 denial_reason: None,
2762 paused_reason: None,
2763 detail: None,
2764 entrypoint: "mission.default".to_string(),
2765 args: serde_json::json!({}),
2766 allowed_tools: vec![],
2767 output_targets: vec![],
2768 artifacts: vec![],
2769 };
2770
2771 {
2772 let mut guard = state.routine_runs.write().await;
2773 guard.insert("run-late".to_string(), mk("run-late", 2_000));
2774 guard.insert("run-early".to_string(), mk("run-early", 1_000));
2775 }
2776 state.persist_routine_runs().await.expect("persist");
2777
2778 let claimed = state
2779 .claim_next_queued_routine_run()
2780 .await
2781 .expect("claimed run");
2782 assert_eq!(claimed.run_id, "run-early");
2783 assert_eq!(claimed.status, RoutineRunStatus::Running);
2784 assert!(claimed.started_at_ms.is_some());
2785 }
2786
2787 #[tokio::test]
2788 async fn routine_session_policy_roundtrip_normalizes_tools() {
2789 let state = AppState::new_starting("routine-policy-hook".to_string(), true);
2790 state
2791 .set_routine_session_policy(
2792 "session-routine-1".to_string(),
2793 "run-1".to_string(),
2794 "routine-1".to_string(),
2795 vec![
2796 "read".to_string(),
2797 " mcp.arcade.search ".to_string(),
2798 "read".to_string(),
2799 "".to_string(),
2800 ],
2801 )
2802 .await;
2803
2804 let policy = state
2805 .routine_session_policy("session-routine-1")
2806 .await
2807 .expect("policy");
2808 assert_eq!(
2809 policy.allowed_tools,
2810 vec!["read".to_string(), "mcp.arcade.search".to_string()]
2811 );
2812 }
2813
2814 #[test]
2815 fn routine_mission_prompt_includes_orchestrated_contract() {
2816 let run = RoutineRunRecord {
2817 run_id: "run-orchestrated-1".to_string(),
2818 routine_id: "automation-orchestrated".to_string(),
2819 trigger_type: "manual".to_string(),
2820 run_count: 1,
2821 status: RoutineRunStatus::Queued,
2822 created_at_ms: 1_000,
2823 updated_at_ms: 1_000,
2824 fired_at_ms: Some(1_000),
2825 started_at_ms: None,
2826 finished_at_ms: None,
2827 requires_approval: true,
2828 approval_reason: None,
2829 denial_reason: None,
2830 paused_reason: None,
2831 detail: None,
2832 entrypoint: "mission.default".to_string(),
2833 args: serde_json::json!({
2834 "prompt": "Coordinate a multi-step release readiness check.",
2835 "mode": "orchestrated",
2836 "success_criteria": ["All blockers listed", "Output artifact written"],
2837 "orchestrator_only_tool_calls": true
2838 }),
2839 allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
2840 output_targets: vec!["file://reports/release-readiness.md".to_string()],
2841 artifacts: vec![],
2842 };
2843
2844 let objective = routine_objective_from_args(&run).expect("objective");
2845 let prompt = build_routine_mission_prompt(&run, &objective);
2846
2847 assert!(prompt.contains("Mode: orchestrated"));
2848 assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
2849 assert!(prompt.contains("only the orchestrator may execute tools"));
2850 assert!(prompt.contains("Allowed Tools: read, webfetch"));
2851 assert!(prompt.contains("file://reports/release-readiness.md"));
2852 }
2853
2854 #[test]
2855 fn routine_mission_prompt_includes_standalone_defaults() {
2856 let run = RoutineRunRecord {
2857 run_id: "run-standalone-1".to_string(),
2858 routine_id: "automation-standalone".to_string(),
2859 trigger_type: "manual".to_string(),
2860 run_count: 1,
2861 status: RoutineRunStatus::Queued,
2862 created_at_ms: 2_000,
2863 updated_at_ms: 2_000,
2864 fired_at_ms: Some(2_000),
2865 started_at_ms: None,
2866 finished_at_ms: None,
2867 requires_approval: false,
2868 approval_reason: None,
2869 denial_reason: None,
2870 paused_reason: None,
2871 detail: None,
2872 entrypoint: "mission.default".to_string(),
2873 args: serde_json::json!({
2874 "prompt": "Summarize top engineering updates.",
2875 "success_criteria": ["Three bullet summary"]
2876 }),
2877 allowed_tools: vec![],
2878 output_targets: vec![],
2879 artifacts: vec![],
2880 };
2881
2882 let objective = routine_objective_from_args(&run).expect("objective");
2883 let prompt = build_routine_mission_prompt(&run, &objective);
2884
2885 assert!(prompt.contains("Mode: standalone"));
2886 assert!(prompt.contains("Execution Pattern: Standalone mission run"));
2887 assert!(prompt.contains("Allowed Tools: all available by current policy"));
2888 assert!(prompt.contains("Output Targets: none configured"));
2889 }
2890}