1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22 EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
23 SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30 resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31 PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38
39mod agent_teams;
40mod capability_resolver;
41mod http;
42mod mcp_catalog;
43mod pack_manager;
44mod preset_composer;
45mod preset_registry;
46mod preset_summary;
47pub mod webui;
48
49pub use agent_teams::AgentTeamRuntime;
50pub use capability_resolver::CapabilityResolver;
51pub use http::serve;
52pub use pack_manager::PackManager;
53pub use preset_composer::PromptComposeInput;
54pub use preset_registry::PresetRegistry;
55
56#[derive(Debug, Clone, Serialize, Deserialize, Default)]
57pub struct ChannelStatus {
58 pub enabled: bool,
59 pub connected: bool,
60 pub last_error: Option<String>,
61 pub active_sessions: u64,
62 pub meta: Value,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, Default)]
66pub struct WebUiConfig {
67 #[serde(default)]
68 pub enabled: bool,
69 #[serde(default = "default_web_ui_prefix")]
70 pub path_prefix: String,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, Default)]
74pub struct ChannelsConfigFile {
75 pub telegram: Option<TelegramConfigFile>,
76 pub discord: Option<DiscordConfigFile>,
77 pub slack: Option<SlackConfigFile>,
78 #[serde(default)]
79 pub tool_policy: tandem_channels::config::ChannelToolPolicy,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct TelegramConfigFile {
84 pub bot_token: String,
85 #[serde(default = "default_allow_all")]
86 pub allowed_users: Vec<String>,
87 #[serde(default)]
88 pub mention_only: bool,
89 #[serde(default)]
90 pub style_profile: tandem_channels::config::TelegramStyleProfile,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct DiscordConfigFile {
95 pub bot_token: String,
96 #[serde(default)]
97 pub guild_id: Option<String>,
98 #[serde(default = "default_allow_all")]
99 pub allowed_users: Vec<String>,
100 #[serde(default = "default_discord_mention_only")]
101 pub mention_only: bool,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SlackConfigFile {
106 pub bot_token: String,
107 pub channel_id: String,
108 #[serde(default = "default_allow_all")]
109 pub allowed_users: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, Default)]
113struct EffectiveAppConfig {
114 #[serde(default)]
115 pub channels: ChannelsConfigFile,
116 #[serde(default)]
117 pub web_ui: WebUiConfig,
118 #[serde(default)]
119 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
120}
121
122#[derive(Default)]
123pub struct ChannelRuntime {
124 pub listeners: Option<tokio::task::JoinSet<()>>,
125 pub statuses: std::collections::HashMap<String, ChannelStatus>,
126}
127
128#[derive(Debug, Clone)]
129pub struct EngineLease {
130 pub lease_id: String,
131 pub client_id: String,
132 pub client_type: String,
133 pub acquired_at_ms: u64,
134 pub last_renewed_at_ms: u64,
135 pub ttl_ms: u64,
136}
137
138impl EngineLease {
139 pub fn is_expired(&self, now_ms: u64) -> bool {
140 now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
141 }
142}
143
144#[derive(Debug, Clone, Serialize)]
145pub struct ActiveRun {
146 #[serde(rename = "runID")]
147 pub run_id: String,
148 #[serde(rename = "startedAtMs")]
149 pub started_at_ms: u64,
150 #[serde(rename = "lastActivityAtMs")]
151 pub last_activity_at_ms: u64,
152 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
153 pub client_id: Option<String>,
154 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
155 pub agent_id: Option<String>,
156 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
157 pub agent_profile: Option<String>,
158}
159
160#[derive(Clone, Default)]
161pub struct RunRegistry {
162 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
163}
164
165impl RunRegistry {
166 pub fn new() -> Self {
167 Self::default()
168 }
169
170 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
171 self.active.read().await.get(session_id).cloned()
172 }
173
174 pub async fn acquire(
175 &self,
176 session_id: &str,
177 run_id: String,
178 client_id: Option<String>,
179 agent_id: Option<String>,
180 agent_profile: Option<String>,
181 ) -> std::result::Result<ActiveRun, ActiveRun> {
182 let mut guard = self.active.write().await;
183 if let Some(existing) = guard.get(session_id).cloned() {
184 return Err(existing);
185 }
186 let now = now_ms();
187 let run = ActiveRun {
188 run_id,
189 started_at_ms: now,
190 last_activity_at_ms: now,
191 client_id,
192 agent_id,
193 agent_profile,
194 };
195 guard.insert(session_id.to_string(), run.clone());
196 Ok(run)
197 }
198
199 pub async fn touch(&self, session_id: &str, run_id: &str) {
200 let mut guard = self.active.write().await;
201 if let Some(run) = guard.get_mut(session_id) {
202 if run.run_id == run_id {
203 run.last_activity_at_ms = now_ms();
204 }
205 }
206 }
207
208 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
209 let mut guard = self.active.write().await;
210 if let Some(run) = guard.get(session_id) {
211 if run.run_id == run_id {
212 return guard.remove(session_id);
213 }
214 }
215 None
216 }
217
218 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
219 self.active.write().await.remove(session_id)
220 }
221
222 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
223 let now = now_ms();
224 let mut guard = self.active.write().await;
225 let stale_ids = guard
226 .iter()
227 .filter_map(|(session_id, run)| {
228 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
229 Some(session_id.clone())
230 } else {
231 None
232 }
233 })
234 .collect::<Vec<_>>();
235 let mut out = Vec::with_capacity(stale_ids.len());
236 for session_id in stale_ids {
237 if let Some(run) = guard.remove(&session_id) {
238 out.push((session_id, run));
239 }
240 }
241 out
242 }
243}
244
245pub fn now_ms() -> u64 {
246 SystemTime::now()
247 .duration_since(UNIX_EPOCH)
248 .map(|d| d.as_millis() as u64)
249 .unwrap_or(0)
250}
251
252pub fn build_id() -> String {
253 if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
254 let trimmed = explicit.trim();
255 if !trimmed.is_empty() {
256 return trimmed.to_string();
257 }
258 }
259 if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
260 let trimmed = git_sha.trim();
261 if !trimmed.is_empty() {
262 return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
263 }
264 }
265 env!("CARGO_PKG_VERSION").to_string()
266}
267
268pub fn detect_host_runtime_context() -> HostRuntimeContext {
269 let os = if cfg!(target_os = "windows") {
270 HostOs::Windows
271 } else if cfg!(target_os = "macos") {
272 HostOs::Macos
273 } else {
274 HostOs::Linux
275 };
276 let (shell_family, path_style) = match os {
277 HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
278 HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
279 };
280 HostRuntimeContext {
281 os,
282 arch: std::env::consts::ARCH.to_string(),
283 shell_family,
284 path_style,
285 }
286}
287
288pub fn binary_path_for_health() -> Option<String> {
289 #[cfg(debug_assertions)]
290 {
291 std::env::current_exe()
292 .ok()
293 .map(|p| p.to_string_lossy().to_string())
294 }
295 #[cfg(not(debug_assertions))]
296 {
297 None
298 }
299}
300
301#[derive(Clone)]
302pub struct RuntimeState {
303 pub storage: Arc<Storage>,
304 pub config: ConfigStore,
305 pub event_bus: EventBus,
306 pub providers: ProviderRegistry,
307 pub plugins: PluginRegistry,
308 pub agents: AgentRegistry,
309 pub tools: ToolRegistry,
310 pub permissions: PermissionManager,
311 pub mcp: McpRegistry,
312 pub pty: PtyManager,
313 pub lsp: LspManager,
314 pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
315 pub logs: Arc<RwLock<Vec<Value>>>,
316 pub workspace_index: WorkspaceIndex,
317 pub cancellations: CancellationRegistry,
318 pub engine_loop: EngineLoop,
319 pub host_runtime_context: HostRuntimeContext,
320}
321
322#[derive(Debug, Clone)]
323pub struct GovernedMemoryRecord {
324 pub id: String,
325 pub run_id: String,
326 pub partition: MemoryPartition,
327 pub kind: MemoryContentKind,
328 pub content: String,
329 pub artifact_refs: Vec<String>,
330 pub classification: MemoryClassification,
331 pub metadata: Option<Value>,
332 pub source_memory_id: Option<String>,
333 pub created_at_ms: u64,
334}
335
336#[derive(Debug, Clone, Serialize)]
337pub struct MemoryAuditEvent {
338 pub audit_id: String,
339 pub action: String,
340 pub run_id: String,
341 pub memory_id: Option<String>,
342 pub source_memory_id: Option<String>,
343 pub to_tier: Option<GovernedMemoryTier>,
344 pub partition_key: String,
345 pub actor: String,
346 pub status: String,
347 #[serde(skip_serializing_if = "Option::is_none")]
348 pub detail: Option<String>,
349 pub created_at_ms: u64,
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize)]
353pub struct SharedResourceRecord {
354 pub key: String,
355 pub value: Value,
356 pub rev: u64,
357 pub updated_at_ms: u64,
358 pub updated_by: String,
359 #[serde(skip_serializing_if = "Option::is_none")]
360 pub ttl_ms: Option<u64>,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
364#[serde(rename_all = "snake_case")]
365pub enum RoutineSchedule {
366 IntervalSeconds { seconds: u64 },
367 Cron { expression: String },
368}
369
370#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
371#[serde(rename_all = "snake_case", tag = "type")]
372pub enum RoutineMisfirePolicy {
373 Skip,
374 RunOnce,
375 CatchUp { max_runs: u32 },
376}
377
378#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
379#[serde(rename_all = "snake_case")]
380pub enum RoutineStatus {
381 Active,
382 Paused,
383}
384
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct RoutineSpec {
387 pub routine_id: String,
388 pub name: String,
389 pub status: RoutineStatus,
390 pub schedule: RoutineSchedule,
391 pub timezone: String,
392 pub misfire_policy: RoutineMisfirePolicy,
393 pub entrypoint: String,
394 #[serde(default)]
395 pub args: Value,
396 #[serde(default)]
397 pub allowed_tools: Vec<String>,
398 #[serde(default)]
399 pub output_targets: Vec<String>,
400 pub creator_type: String,
401 pub creator_id: String,
402 pub requires_approval: bool,
403 pub external_integrations_allowed: bool,
404 #[serde(default, skip_serializing_if = "Option::is_none")]
405 pub next_fire_at_ms: Option<u64>,
406 #[serde(default, skip_serializing_if = "Option::is_none")]
407 pub last_fired_at_ms: Option<u64>,
408}
409
410#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct RoutineHistoryEvent {
412 pub routine_id: String,
413 pub trigger_type: String,
414 pub run_count: u32,
415 pub fired_at_ms: u64,
416 pub status: String,
417 #[serde(default, skip_serializing_if = "Option::is_none")]
418 pub detail: Option<String>,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
422#[serde(rename_all = "snake_case")]
423pub enum RoutineRunStatus {
424 Queued,
425 PendingApproval,
426 Running,
427 Paused,
428 BlockedPolicy,
429 Denied,
430 Completed,
431 Failed,
432 Cancelled,
433}
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct RoutineRunArtifact {
437 pub artifact_id: String,
438 pub uri: String,
439 pub kind: String,
440 #[serde(default, skip_serializing_if = "Option::is_none")]
441 pub label: Option<String>,
442 pub created_at_ms: u64,
443 #[serde(default, skip_serializing_if = "Option::is_none")]
444 pub metadata: Option<Value>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct RoutineRunRecord {
449 pub run_id: String,
450 pub routine_id: String,
451 pub trigger_type: String,
452 pub run_count: u32,
453 pub status: RoutineRunStatus,
454 pub created_at_ms: u64,
455 pub updated_at_ms: u64,
456 #[serde(default, skip_serializing_if = "Option::is_none")]
457 pub fired_at_ms: Option<u64>,
458 #[serde(default, skip_serializing_if = "Option::is_none")]
459 pub started_at_ms: Option<u64>,
460 #[serde(default, skip_serializing_if = "Option::is_none")]
461 pub finished_at_ms: Option<u64>,
462 pub requires_approval: bool,
463 #[serde(default, skip_serializing_if = "Option::is_none")]
464 pub approval_reason: Option<String>,
465 #[serde(default, skip_serializing_if = "Option::is_none")]
466 pub denial_reason: Option<String>,
467 #[serde(default, skip_serializing_if = "Option::is_none")]
468 pub paused_reason: Option<String>,
469 #[serde(default, skip_serializing_if = "Option::is_none")]
470 pub detail: Option<String>,
471 pub entrypoint: String,
472 #[serde(default)]
473 pub args: Value,
474 #[serde(default)]
475 pub allowed_tools: Vec<String>,
476 #[serde(default)]
477 pub output_targets: Vec<String>,
478 #[serde(default)]
479 pub artifacts: Vec<RoutineRunArtifact>,
480 #[serde(default)]
481 pub active_session_ids: Vec<String>,
482 #[serde(default)]
483 pub prompt_tokens: u64,
484 #[serde(default)]
485 pub completion_tokens: u64,
486 #[serde(default)]
487 pub total_tokens: u64,
488 #[serde(default)]
489 pub estimated_cost_usd: f64,
490}
491
492#[derive(Debug, Clone)]
493pub struct RoutineSessionPolicy {
494 pub session_id: String,
495 pub run_id: String,
496 pub routine_id: String,
497 pub allowed_tools: Vec<String>,
498}
499
500#[derive(Debug, Clone, Serialize)]
501pub struct RoutineTriggerPlan {
502 pub routine_id: String,
503 pub run_count: u32,
504 pub scheduled_at_ms: u64,
505 pub next_fire_at_ms: u64,
506}
507
508#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
509#[serde(rename_all = "snake_case")]
510pub enum AutomationV2Status {
511 Active,
512 Paused,
513 Draft,
514}
515
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
517#[serde(rename_all = "snake_case")]
518pub enum AutomationV2ScheduleType {
519 Cron,
520 Interval,
521 Manual,
522}
523
524#[derive(Debug, Clone, Serialize, Deserialize)]
525pub struct AutomationV2Schedule {
526 #[serde(rename = "type")]
527 pub schedule_type: AutomationV2ScheduleType,
528 #[serde(default, skip_serializing_if = "Option::is_none")]
529 pub cron_expression: Option<String>,
530 #[serde(default, skip_serializing_if = "Option::is_none")]
531 pub interval_seconds: Option<u64>,
532 pub timezone: String,
533 pub misfire_policy: RoutineMisfirePolicy,
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize)]
537pub struct AutomationAgentToolPolicy {
538 #[serde(default)]
539 pub allowlist: Vec<String>,
540 #[serde(default)]
541 pub denylist: Vec<String>,
542}
543
544#[derive(Debug, Clone, Serialize, Deserialize)]
545pub struct AutomationAgentMcpPolicy {
546 #[serde(default)]
547 pub allowed_servers: Vec<String>,
548 #[serde(default, skip_serializing_if = "Option::is_none")]
549 pub allowed_tools: Option<Vec<String>>,
550}
551
552#[derive(Debug, Clone, Serialize, Deserialize)]
553pub struct AutomationAgentProfile {
554 pub agent_id: String,
555 #[serde(default, skip_serializing_if = "Option::is_none")]
556 pub template_id: Option<String>,
557 pub display_name: String,
558 #[serde(default, skip_serializing_if = "Option::is_none")]
559 pub avatar_url: Option<String>,
560 #[serde(default, skip_serializing_if = "Option::is_none")]
561 pub model_policy: Option<Value>,
562 #[serde(default)]
563 pub skills: Vec<String>,
564 pub tool_policy: AutomationAgentToolPolicy,
565 pub mcp_policy: AutomationAgentMcpPolicy,
566 #[serde(default, skip_serializing_if = "Option::is_none")]
567 pub approval_policy: Option<String>,
568}
569
570#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct AutomationFlowNode {
572 pub node_id: String,
573 pub agent_id: String,
574 pub objective: String,
575 #[serde(default)]
576 pub depends_on: Vec<String>,
577 #[serde(default, skip_serializing_if = "Option::is_none")]
578 pub retry_policy: Option<Value>,
579 #[serde(default, skip_serializing_if = "Option::is_none")]
580 pub timeout_ms: Option<u64>,
581}
582
583#[derive(Debug, Clone, Serialize, Deserialize)]
584pub struct AutomationFlowSpec {
585 #[serde(default)]
586 pub nodes: Vec<AutomationFlowNode>,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct AutomationExecutionPolicy {
591 #[serde(default, skip_serializing_if = "Option::is_none")]
592 pub max_parallel_agents: Option<u32>,
593 #[serde(default, skip_serializing_if = "Option::is_none")]
594 pub max_total_runtime_ms: Option<u64>,
595 #[serde(default, skip_serializing_if = "Option::is_none")]
596 pub max_total_tool_calls: Option<u32>,
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
600pub struct AutomationV2Spec {
601 pub automation_id: String,
602 pub name: String,
603 #[serde(default, skip_serializing_if = "Option::is_none")]
604 pub description: Option<String>,
605 pub status: AutomationV2Status,
606 pub schedule: AutomationV2Schedule,
607 #[serde(default)]
608 pub agents: Vec<AutomationAgentProfile>,
609 pub flow: AutomationFlowSpec,
610 pub execution: AutomationExecutionPolicy,
611 #[serde(default)]
612 pub output_targets: Vec<String>,
613 pub created_at_ms: u64,
614 pub updated_at_ms: u64,
615 pub creator_id: String,
616 #[serde(default, skip_serializing_if = "Option::is_none")]
617 pub next_fire_at_ms: Option<u64>,
618 #[serde(default, skip_serializing_if = "Option::is_none")]
619 pub last_fired_at_ms: Option<u64>,
620}
621
622#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
623#[serde(rename_all = "snake_case")]
624pub enum AutomationRunStatus {
625 Queued,
626 Running,
627 Pausing,
628 Paused,
629 Completed,
630 Failed,
631 Cancelled,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
635pub struct AutomationRunCheckpoint {
636 #[serde(default)]
637 pub completed_nodes: Vec<String>,
638 #[serde(default)]
639 pub pending_nodes: Vec<String>,
640 #[serde(default)]
641 pub node_outputs: std::collections::HashMap<String, Value>,
642}
643
644#[derive(Debug, Clone, Serialize, Deserialize)]
645pub struct AutomationV2RunRecord {
646 pub run_id: String,
647 pub automation_id: String,
648 pub trigger_type: String,
649 pub status: AutomationRunStatus,
650 pub created_at_ms: u64,
651 pub updated_at_ms: u64,
652 #[serde(default, skip_serializing_if = "Option::is_none")]
653 pub started_at_ms: Option<u64>,
654 #[serde(default, skip_serializing_if = "Option::is_none")]
655 pub finished_at_ms: Option<u64>,
656 #[serde(default)]
657 pub active_session_ids: Vec<String>,
658 #[serde(default)]
659 pub active_instance_ids: Vec<String>,
660 pub checkpoint: AutomationRunCheckpoint,
661 #[serde(default, skip_serializing_if = "Option::is_none")]
662 pub pause_reason: Option<String>,
663 #[serde(default, skip_serializing_if = "Option::is_none")]
664 pub resume_reason: Option<String>,
665 #[serde(default, skip_serializing_if = "Option::is_none")]
666 pub detail: Option<String>,
667 #[serde(default)]
668 pub prompt_tokens: u64,
669 #[serde(default)]
670 pub completion_tokens: u64,
671 #[serde(default)]
672 pub total_tokens: u64,
673 #[serde(default)]
674 pub estimated_cost_usd: f64,
675}
676
677#[derive(Debug, Clone, Serialize)]
678pub struct ResourceConflict {
679 pub key: String,
680 pub expected_rev: Option<u64>,
681 pub current_rev: Option<u64>,
682}
683
684#[derive(Debug, Clone, Serialize)]
685#[serde(tag = "type", rename_all = "snake_case")]
686pub enum ResourceStoreError {
687 InvalidKey { key: String },
688 RevisionConflict(ResourceConflict),
689 PersistFailed { message: String },
690}
691
692#[derive(Debug, Clone, Serialize)]
693#[serde(tag = "type", rename_all = "snake_case")]
694pub enum RoutineStoreError {
695 InvalidRoutineId { routine_id: String },
696 InvalidSchedule { detail: String },
697 PersistFailed { message: String },
698}
699
700#[derive(Debug, Clone)]
701pub enum StartupStatus {
702 Starting,
703 Ready,
704 Failed,
705}
706
707#[derive(Debug, Clone)]
708pub struct StartupState {
709 pub status: StartupStatus,
710 pub phase: String,
711 pub started_at_ms: u64,
712 pub attempt_id: String,
713 pub last_error: Option<String>,
714}
715
716#[derive(Debug, Clone)]
717pub struct StartupSnapshot {
718 pub status: StartupStatus,
719 pub phase: String,
720 pub started_at_ms: u64,
721 pub attempt_id: String,
722 pub last_error: Option<String>,
723 pub elapsed_ms: u64,
724}
725
726#[derive(Clone)]
727pub struct AppState {
728 pub runtime: Arc<OnceLock<RuntimeState>>,
729 pub startup: Arc<RwLock<StartupState>>,
730 pub in_process_mode: Arc<AtomicBool>,
731 pub api_token: Arc<RwLock<Option<String>>>,
732 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
733 pub run_registry: RunRegistry,
734 pub run_stale_ms: u64,
735 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
736 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
737 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
738 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
739 pub shared_resources_path: PathBuf,
740 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
741 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
742 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
743 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
744 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
745 pub routine_session_policies:
746 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
747 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
748 pub token_cost_per_1k_usd: f64,
749 pub routines_path: PathBuf,
750 pub routine_history_path: PathBuf,
751 pub routine_runs_path: PathBuf,
752 pub automations_v2_path: PathBuf,
753 pub automation_v2_runs_path: PathBuf,
754 pub agent_teams: AgentTeamRuntime,
755 pub web_ui_enabled: Arc<AtomicBool>,
756 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
757 pub server_base_url: Arc<std::sync::RwLock<String>>,
758 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
759 pub host_runtime_context: HostRuntimeContext,
760 pub pack_manager: Arc<PackManager>,
761 pub capability_resolver: Arc<CapabilityResolver>,
762 pub preset_registry: Arc<PresetRegistry>,
763}
764
765#[derive(Debug, Clone)]
766struct StatusIndexUpdate {
767 key: String,
768 value: Value,
769}
770
771impl AppState {
772 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
773 Self {
774 runtime: Arc::new(OnceLock::new()),
775 startup: Arc::new(RwLock::new(StartupState {
776 status: StartupStatus::Starting,
777 phase: "boot".to_string(),
778 started_at_ms: now_ms(),
779 attempt_id,
780 last_error: None,
781 })),
782 in_process_mode: Arc::new(AtomicBool::new(in_process)),
783 api_token: Arc::new(RwLock::new(None)),
784 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
785 run_registry: RunRegistry::new(),
786 run_stale_ms: resolve_run_stale_ms(),
787 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
788 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
789 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
790 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
791 shared_resources_path: resolve_shared_resources_path(),
792 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
793 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
794 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
795 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
796 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
797 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
798 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
799 routines_path: resolve_routines_path(),
800 routine_history_path: resolve_routine_history_path(),
801 routine_runs_path: resolve_routine_runs_path(),
802 automations_v2_path: resolve_automations_v2_path(),
803 automation_v2_runs_path: resolve_automation_v2_runs_path(),
804 agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
805 web_ui_enabled: Arc::new(AtomicBool::new(false)),
806 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
807 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
808 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
809 host_runtime_context: detect_host_runtime_context(),
810 token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
811 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
812 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
813 preset_registry: Arc::new(PresetRegistry::new(
814 PackManager::default_root(),
815 resolve_shared_paths()
816 .map(|paths| paths.canonical_root)
817 .unwrap_or_else(|_| {
818 dirs::home_dir()
819 .unwrap_or_else(|| PathBuf::from("."))
820 .join(".tandem")
821 }),
822 )),
823 }
824 }
825
826 pub fn is_ready(&self) -> bool {
827 self.runtime.get().is_some()
828 }
829
830 pub fn mode_label(&self) -> &'static str {
831 if self.in_process_mode.load(Ordering::Relaxed) {
832 "in-process"
833 } else {
834 "sidecar"
835 }
836 }
837
838 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
839 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
840 if let Ok(mut guard) = self.web_ui_prefix.write() {
841 *guard = normalize_web_ui_prefix(&prefix);
842 }
843 }
844
845 pub fn web_ui_enabled(&self) -> bool {
846 self.web_ui_enabled.load(Ordering::Relaxed)
847 }
848
849 pub fn web_ui_prefix(&self) -> String {
850 self.web_ui_prefix
851 .read()
852 .map(|v| v.clone())
853 .unwrap_or_else(|_| "/admin".to_string())
854 }
855
856 pub fn set_server_base_url(&self, base_url: String) {
857 if let Ok(mut guard) = self.server_base_url.write() {
858 *guard = base_url;
859 }
860 }
861
862 pub fn server_base_url(&self) -> String {
863 self.server_base_url
864 .read()
865 .map(|v| v.clone())
866 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
867 }
868
869 pub async fn api_token(&self) -> Option<String> {
870 self.api_token.read().await.clone()
871 }
872
873 pub async fn set_api_token(&self, token: Option<String>) {
874 *self.api_token.write().await = token;
875 }
876
877 pub async fn startup_snapshot(&self) -> StartupSnapshot {
878 let state = self.startup.read().await.clone();
879 StartupSnapshot {
880 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
881 status: state.status,
882 phase: state.phase,
883 started_at_ms: state.started_at_ms,
884 attempt_id: state.attempt_id,
885 last_error: state.last_error,
886 }
887 }
888
889 pub fn host_runtime_context(&self) -> HostRuntimeContext {
890 self.runtime
891 .get()
892 .map(|runtime| runtime.host_runtime_context.clone())
893 .unwrap_or_else(|| self.host_runtime_context.clone())
894 }
895
896 pub async fn set_phase(&self, phase: impl Into<String>) {
897 let mut startup = self.startup.write().await;
898 startup.phase = phase.into();
899 }
900
901 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
902 self.runtime
903 .set(runtime)
904 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
905 self.engine_loop
906 .set_spawn_agent_hook(std::sync::Arc::new(
907 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
908 ))
909 .await;
910 self.engine_loop
911 .set_tool_policy_hook(std::sync::Arc::new(
912 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
913 ))
914 .await;
915 self.engine_loop
916 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
917 self.clone(),
918 )))
919 .await;
920 let _ = self.load_shared_resources().await;
921 let _ = self.load_routines().await;
922 let _ = self.load_routine_history().await;
923 let _ = self.load_routine_runs().await;
924 let _ = self.load_automations_v2().await;
925 let _ = self.load_automation_v2_runs().await;
926 let workspace_root = self.workspace_index.snapshot().await.root;
927 let _ = self
928 .agent_teams
929 .ensure_loaded_for_workspace(&workspace_root)
930 .await;
931 let mut startup = self.startup.write().await;
932 startup.status = StartupStatus::Ready;
933 startup.phase = "ready".to_string();
934 startup.last_error = None;
935 Ok(())
936 }
937
938 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
939 let mut startup = self.startup.write().await;
940 startup.status = StartupStatus::Failed;
941 startup.phase = phase.into();
942 startup.last_error = Some(error.into());
943 }
944
945 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
946 let runtime = self.channels_runtime.lock().await;
947 runtime.statuses.clone()
948 }
949
950 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
951 let effective = self.config.get_effective_value().await;
952 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
953 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
954
955 let mut runtime = self.channels_runtime.lock().await;
956 if let Some(listeners) = runtime.listeners.as_mut() {
957 listeners.abort_all();
958 }
959 runtime.listeners = None;
960 runtime.statuses.clear();
961
962 let mut status_map = std::collections::HashMap::new();
963 status_map.insert(
964 "telegram".to_string(),
965 ChannelStatus {
966 enabled: parsed.channels.telegram.is_some(),
967 connected: false,
968 last_error: None,
969 active_sessions: 0,
970 meta: serde_json::json!({}),
971 },
972 );
973 status_map.insert(
974 "discord".to_string(),
975 ChannelStatus {
976 enabled: parsed.channels.discord.is_some(),
977 connected: false,
978 last_error: None,
979 active_sessions: 0,
980 meta: serde_json::json!({}),
981 },
982 );
983 status_map.insert(
984 "slack".to_string(),
985 ChannelStatus {
986 enabled: parsed.channels.slack.is_some(),
987 connected: false,
988 last_error: None,
989 active_sessions: 0,
990 meta: serde_json::json!({}),
991 },
992 );
993
994 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
995 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
996 runtime.listeners = Some(listeners);
997 for status in status_map.values_mut() {
998 if status.enabled {
999 status.connected = true;
1000 }
1001 }
1002 }
1003
1004 runtime.statuses = status_map.clone();
1005 drop(runtime);
1006
1007 self.event_bus.publish(EngineEvent::new(
1008 "channel.status.changed",
1009 serde_json::json!({ "channels": status_map }),
1010 ));
1011 Ok(())
1012 }
1013
1014 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
1015 if !self.shared_resources_path.exists() {
1016 return Ok(());
1017 }
1018 let raw = fs::read_to_string(&self.shared_resources_path).await?;
1019 let parsed =
1020 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
1021 .unwrap_or_default();
1022 let mut guard = self.shared_resources.write().await;
1023 *guard = parsed;
1024 Ok(())
1025 }
1026
1027 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1028 if let Some(parent) = self.shared_resources_path.parent() {
1029 fs::create_dir_all(parent).await?;
1030 }
1031 let payload = {
1032 let guard = self.shared_resources.read().await;
1033 serde_json::to_string_pretty(&*guard)?
1034 };
1035 fs::write(&self.shared_resources_path, payload).await?;
1036 Ok(())
1037 }
1038
1039 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1040 self.shared_resources.read().await.get(key).cloned()
1041 }
1042
1043 pub async fn list_shared_resources(
1044 &self,
1045 prefix: Option<&str>,
1046 limit: usize,
1047 ) -> Vec<SharedResourceRecord> {
1048 let limit = limit.clamp(1, 500);
1049 let mut rows = self
1050 .shared_resources
1051 .read()
1052 .await
1053 .values()
1054 .filter(|record| {
1055 if let Some(prefix) = prefix {
1056 record.key.starts_with(prefix)
1057 } else {
1058 true
1059 }
1060 })
1061 .cloned()
1062 .collect::<Vec<_>>();
1063 rows.sort_by(|a, b| a.key.cmp(&b.key));
1064 rows.truncate(limit);
1065 rows
1066 }
1067
1068 pub async fn put_shared_resource(
1069 &self,
1070 key: String,
1071 value: Value,
1072 if_match_rev: Option<u64>,
1073 updated_by: String,
1074 ttl_ms: Option<u64>,
1075 ) -> Result<SharedResourceRecord, ResourceStoreError> {
1076 if !is_valid_resource_key(&key) {
1077 return Err(ResourceStoreError::InvalidKey { key });
1078 }
1079
1080 let now = now_ms();
1081 let mut guard = self.shared_resources.write().await;
1082 let existing = guard.get(&key).cloned();
1083
1084 if let Some(expected) = if_match_rev {
1085 let current = existing.as_ref().map(|row| row.rev);
1086 if current != Some(expected) {
1087 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1088 key,
1089 expected_rev: Some(expected),
1090 current_rev: current,
1091 }));
1092 }
1093 }
1094
1095 let next_rev = existing
1096 .as_ref()
1097 .map(|row| row.rev.saturating_add(1))
1098 .unwrap_or(1);
1099
1100 let record = SharedResourceRecord {
1101 key: key.clone(),
1102 value,
1103 rev: next_rev,
1104 updated_at_ms: now,
1105 updated_by,
1106 ttl_ms,
1107 };
1108
1109 let previous = guard.insert(key.clone(), record.clone());
1110 drop(guard);
1111
1112 if let Err(error) = self.persist_shared_resources().await {
1113 let mut rollback = self.shared_resources.write().await;
1114 if let Some(previous) = previous {
1115 rollback.insert(key, previous);
1116 } else {
1117 rollback.remove(&key);
1118 }
1119 return Err(ResourceStoreError::PersistFailed {
1120 message: error.to_string(),
1121 });
1122 }
1123
1124 Ok(record)
1125 }
1126
1127 pub async fn delete_shared_resource(
1128 &self,
1129 key: &str,
1130 if_match_rev: Option<u64>,
1131 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1132 if !is_valid_resource_key(key) {
1133 return Err(ResourceStoreError::InvalidKey {
1134 key: key.to_string(),
1135 });
1136 }
1137
1138 let mut guard = self.shared_resources.write().await;
1139 let current = guard.get(key).cloned();
1140 if let Some(expected) = if_match_rev {
1141 let current_rev = current.as_ref().map(|row| row.rev);
1142 if current_rev != Some(expected) {
1143 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1144 key: key.to_string(),
1145 expected_rev: Some(expected),
1146 current_rev,
1147 }));
1148 }
1149 }
1150
1151 let removed = guard.remove(key);
1152 drop(guard);
1153
1154 if let Err(error) = self.persist_shared_resources().await {
1155 if let Some(record) = removed.clone() {
1156 self.shared_resources
1157 .write()
1158 .await
1159 .insert(record.key.clone(), record);
1160 }
1161 return Err(ResourceStoreError::PersistFailed {
1162 message: error.to_string(),
1163 });
1164 }
1165
1166 Ok(removed)
1167 }
1168
1169 pub async fn load_routines(&self) -> anyhow::Result<()> {
1170 if !self.routines_path.exists() {
1171 return Ok(());
1172 }
1173 let raw = fs::read_to_string(&self.routines_path).await?;
1174 let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
1175 .unwrap_or_default();
1176 let mut guard = self.routines.write().await;
1177 *guard = parsed;
1178 Ok(())
1179 }
1180
1181 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1182 if !self.routine_history_path.exists() {
1183 return Ok(());
1184 }
1185 let raw = fs::read_to_string(&self.routine_history_path).await?;
1186 let parsed = serde_json::from_str::<
1187 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1188 >(&raw)
1189 .unwrap_or_default();
1190 let mut guard = self.routine_history.write().await;
1191 *guard = parsed;
1192 Ok(())
1193 }
1194
1195 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1196 if !self.routine_runs_path.exists() {
1197 return Ok(());
1198 }
1199 let raw = fs::read_to_string(&self.routine_runs_path).await?;
1200 let parsed =
1201 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1202 .unwrap_or_default();
1203 let mut guard = self.routine_runs.write().await;
1204 *guard = parsed;
1205 Ok(())
1206 }
1207
1208 pub async fn persist_routines(&self) -> anyhow::Result<()> {
1209 if let Some(parent) = self.routines_path.parent() {
1210 fs::create_dir_all(parent).await?;
1211 }
1212 let payload = {
1213 let guard = self.routines.read().await;
1214 serde_json::to_string_pretty(&*guard)?
1215 };
1216 fs::write(&self.routines_path, payload).await?;
1217 Ok(())
1218 }
1219
1220 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1221 if let Some(parent) = self.routine_history_path.parent() {
1222 fs::create_dir_all(parent).await?;
1223 }
1224 let payload = {
1225 let guard = self.routine_history.read().await;
1226 serde_json::to_string_pretty(&*guard)?
1227 };
1228 fs::write(&self.routine_history_path, payload).await?;
1229 Ok(())
1230 }
1231
1232 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1233 if let Some(parent) = self.routine_runs_path.parent() {
1234 fs::create_dir_all(parent).await?;
1235 }
1236 let payload = {
1237 let guard = self.routine_runs.read().await;
1238 serde_json::to_string_pretty(&*guard)?
1239 };
1240 fs::write(&self.routine_runs_path, payload).await?;
1241 Ok(())
1242 }
1243
1244 pub async fn put_routine(
1245 &self,
1246 mut routine: RoutineSpec,
1247 ) -> Result<RoutineSpec, RoutineStoreError> {
1248 if routine.routine_id.trim().is_empty() {
1249 return Err(RoutineStoreError::InvalidRoutineId {
1250 routine_id: routine.routine_id,
1251 });
1252 }
1253
1254 routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1255 routine.output_targets = normalize_non_empty_list(routine.output_targets);
1256
1257 let now = now_ms();
1258 let next_schedule_fire =
1259 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1260 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1261 detail: "invalid schedule or timezone".to_string(),
1262 })?;
1263 match routine.schedule {
1264 RoutineSchedule::IntervalSeconds { seconds } => {
1265 if seconds == 0 {
1266 return Err(RoutineStoreError::InvalidSchedule {
1267 detail: "interval_seconds must be > 0".to_string(),
1268 });
1269 }
1270 let _ = seconds;
1271 }
1272 RoutineSchedule::Cron { .. } => {}
1273 }
1274 if routine.next_fire_at_ms.is_none() {
1275 routine.next_fire_at_ms = Some(next_schedule_fire);
1276 }
1277
1278 let mut guard = self.routines.write().await;
1279 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1280 drop(guard);
1281
1282 if let Err(error) = self.persist_routines().await {
1283 let mut rollback = self.routines.write().await;
1284 if let Some(previous) = previous {
1285 rollback.insert(previous.routine_id.clone(), previous);
1286 } else {
1287 rollback.remove(&routine.routine_id);
1288 }
1289 return Err(RoutineStoreError::PersistFailed {
1290 message: error.to_string(),
1291 });
1292 }
1293
1294 Ok(routine)
1295 }
1296
1297 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1298 let mut rows = self
1299 .routines
1300 .read()
1301 .await
1302 .values()
1303 .cloned()
1304 .collect::<Vec<_>>();
1305 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1306 rows
1307 }
1308
1309 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1310 self.routines.read().await.get(routine_id).cloned()
1311 }
1312
1313 pub async fn delete_routine(
1314 &self,
1315 routine_id: &str,
1316 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1317 let mut guard = self.routines.write().await;
1318 let removed = guard.remove(routine_id);
1319 drop(guard);
1320
1321 if let Err(error) = self.persist_routines().await {
1322 if let Some(removed) = removed.clone() {
1323 self.routines
1324 .write()
1325 .await
1326 .insert(removed.routine_id.clone(), removed);
1327 }
1328 return Err(RoutineStoreError::PersistFailed {
1329 message: error.to_string(),
1330 });
1331 }
1332 Ok(removed)
1333 }
1334
1335 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1336 let mut plans = Vec::new();
1337 let mut guard = self.routines.write().await;
1338 for routine in guard.values_mut() {
1339 if routine.status != RoutineStatus::Active {
1340 continue;
1341 }
1342 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1343 continue;
1344 };
1345 if now_ms < next_fire_at_ms {
1346 continue;
1347 }
1348 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1349 now_ms,
1350 next_fire_at_ms,
1351 &routine.schedule,
1352 &routine.timezone,
1353 &routine.misfire_policy,
1354 );
1355 routine.next_fire_at_ms = Some(next_fire_at_ms);
1356 if run_count == 0 {
1357 continue;
1358 }
1359 plans.push(RoutineTriggerPlan {
1360 routine_id: routine.routine_id.clone(),
1361 run_count,
1362 scheduled_at_ms: now_ms,
1363 next_fire_at_ms,
1364 });
1365 }
1366 drop(guard);
1367 let _ = self.persist_routines().await;
1368 plans
1369 }
1370
1371 pub async fn mark_routine_fired(
1372 &self,
1373 routine_id: &str,
1374 fired_at_ms: u64,
1375 ) -> Option<RoutineSpec> {
1376 let mut guard = self.routines.write().await;
1377 let routine = guard.get_mut(routine_id)?;
1378 routine.last_fired_at_ms = Some(fired_at_ms);
1379 let updated = routine.clone();
1380 drop(guard);
1381 let _ = self.persist_routines().await;
1382 Some(updated)
1383 }
1384
1385 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1386 let mut history = self.routine_history.write().await;
1387 history
1388 .entry(event.routine_id.clone())
1389 .or_default()
1390 .push(event);
1391 drop(history);
1392 let _ = self.persist_routine_history().await;
1393 }
1394
1395 pub async fn list_routine_history(
1396 &self,
1397 routine_id: &str,
1398 limit: usize,
1399 ) -> Vec<RoutineHistoryEvent> {
1400 let limit = limit.clamp(1, 500);
1401 let mut rows = self
1402 .routine_history
1403 .read()
1404 .await
1405 .get(routine_id)
1406 .cloned()
1407 .unwrap_or_default();
1408 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1409 rows.truncate(limit);
1410 rows
1411 }
1412
1413 pub async fn create_routine_run(
1414 &self,
1415 routine: &RoutineSpec,
1416 trigger_type: &str,
1417 run_count: u32,
1418 status: RoutineRunStatus,
1419 detail: Option<String>,
1420 ) -> RoutineRunRecord {
1421 let now = now_ms();
1422 let record = RoutineRunRecord {
1423 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1424 routine_id: routine.routine_id.clone(),
1425 trigger_type: trigger_type.to_string(),
1426 run_count,
1427 status,
1428 created_at_ms: now,
1429 updated_at_ms: now,
1430 fired_at_ms: Some(now),
1431 started_at_ms: None,
1432 finished_at_ms: None,
1433 requires_approval: routine.requires_approval,
1434 approval_reason: None,
1435 denial_reason: None,
1436 paused_reason: None,
1437 detail,
1438 entrypoint: routine.entrypoint.clone(),
1439 args: routine.args.clone(),
1440 allowed_tools: routine.allowed_tools.clone(),
1441 output_targets: routine.output_targets.clone(),
1442 artifacts: Vec::new(),
1443 active_session_ids: Vec::new(),
1444 prompt_tokens: 0,
1445 completion_tokens: 0,
1446 total_tokens: 0,
1447 estimated_cost_usd: 0.0,
1448 };
1449 self.routine_runs
1450 .write()
1451 .await
1452 .insert(record.run_id.clone(), record.clone());
1453 let _ = self.persist_routine_runs().await;
1454 record
1455 }
1456
1457 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1458 self.routine_runs.read().await.get(run_id).cloned()
1459 }
1460
1461 pub async fn list_routine_runs(
1462 &self,
1463 routine_id: Option<&str>,
1464 limit: usize,
1465 ) -> Vec<RoutineRunRecord> {
1466 let mut rows = self
1467 .routine_runs
1468 .read()
1469 .await
1470 .values()
1471 .filter(|row| {
1472 if let Some(id) = routine_id {
1473 row.routine_id == id
1474 } else {
1475 true
1476 }
1477 })
1478 .cloned()
1479 .collect::<Vec<_>>();
1480 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1481 rows.truncate(limit.clamp(1, 500));
1482 rows
1483 }
1484
1485 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1486 let mut guard = self.routine_runs.write().await;
1487 let next_run_id = guard
1488 .values()
1489 .filter(|row| row.status == RoutineRunStatus::Queued)
1490 .min_by(|a, b| {
1491 a.created_at_ms
1492 .cmp(&b.created_at_ms)
1493 .then_with(|| a.run_id.cmp(&b.run_id))
1494 })
1495 .map(|row| row.run_id.clone())?;
1496 let now = now_ms();
1497 let row = guard.get_mut(&next_run_id)?;
1498 row.status = RoutineRunStatus::Running;
1499 row.updated_at_ms = now;
1500 row.started_at_ms = Some(now);
1501 let claimed = row.clone();
1502 drop(guard);
1503 let _ = self.persist_routine_runs().await;
1504 Some(claimed)
1505 }
1506
1507 pub async fn set_routine_session_policy(
1508 &self,
1509 session_id: String,
1510 run_id: String,
1511 routine_id: String,
1512 allowed_tools: Vec<String>,
1513 ) {
1514 let policy = RoutineSessionPolicy {
1515 session_id: session_id.clone(),
1516 run_id,
1517 routine_id,
1518 allowed_tools: normalize_allowed_tools(allowed_tools),
1519 };
1520 self.routine_session_policies
1521 .write()
1522 .await
1523 .insert(session_id, policy);
1524 }
1525
1526 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1527 self.routine_session_policies
1528 .read()
1529 .await
1530 .get(session_id)
1531 .cloned()
1532 }
1533
1534 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1535 self.routine_session_policies
1536 .write()
1537 .await
1538 .remove(session_id);
1539 }
1540
1541 pub async fn update_routine_run_status(
1542 &self,
1543 run_id: &str,
1544 status: RoutineRunStatus,
1545 reason: Option<String>,
1546 ) -> Option<RoutineRunRecord> {
1547 let mut guard = self.routine_runs.write().await;
1548 let row = guard.get_mut(run_id)?;
1549 row.status = status.clone();
1550 row.updated_at_ms = now_ms();
1551 match status {
1552 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1553 RoutineRunStatus::Running => {
1554 row.started_at_ms.get_or_insert_with(now_ms);
1555 if let Some(detail) = reason {
1556 row.detail = Some(detail);
1557 }
1558 }
1559 RoutineRunStatus::Denied => row.denial_reason = reason,
1560 RoutineRunStatus::Paused => row.paused_reason = reason,
1561 RoutineRunStatus::Completed
1562 | RoutineRunStatus::Failed
1563 | RoutineRunStatus::Cancelled => {
1564 row.finished_at_ms = Some(now_ms());
1565 if let Some(detail) = reason {
1566 row.detail = Some(detail);
1567 }
1568 }
1569 _ => {
1570 if let Some(detail) = reason {
1571 row.detail = Some(detail);
1572 }
1573 }
1574 }
1575 let updated = row.clone();
1576 drop(guard);
1577 let _ = self.persist_routine_runs().await;
1578 Some(updated)
1579 }
1580
1581 pub async fn append_routine_run_artifact(
1582 &self,
1583 run_id: &str,
1584 artifact: RoutineRunArtifact,
1585 ) -> Option<RoutineRunRecord> {
1586 let mut guard = self.routine_runs.write().await;
1587 let row = guard.get_mut(run_id)?;
1588 row.updated_at_ms = now_ms();
1589 row.artifacts.push(artifact);
1590 let updated = row.clone();
1591 drop(guard);
1592 let _ = self.persist_routine_runs().await;
1593 Some(updated)
1594 }
1595
1596 pub async fn add_active_session_id(
1597 &self,
1598 run_id: &str,
1599 session_id: String,
1600 ) -> Option<RoutineRunRecord> {
1601 let mut guard = self.routine_runs.write().await;
1602 let row = guard.get_mut(run_id)?;
1603 if !row.active_session_ids.iter().any(|id| id == &session_id) {
1604 row.active_session_ids.push(session_id);
1605 }
1606 row.updated_at_ms = now_ms();
1607 let updated = row.clone();
1608 drop(guard);
1609 let _ = self.persist_routine_runs().await;
1610 Some(updated)
1611 }
1612
1613 pub async fn clear_active_session_id(
1614 &self,
1615 run_id: &str,
1616 session_id: &str,
1617 ) -> Option<RoutineRunRecord> {
1618 let mut guard = self.routine_runs.write().await;
1619 let row = guard.get_mut(run_id)?;
1620 row.active_session_ids.retain(|id| id != session_id);
1621 row.updated_at_ms = now_ms();
1622 let updated = row.clone();
1623 drop(guard);
1624 let _ = self.persist_routine_runs().await;
1625 Some(updated)
1626 }
1627
1628 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1629 if !self.automations_v2_path.exists() {
1630 return Ok(());
1631 }
1632 let raw = fs::read_to_string(&self.automations_v2_path).await?;
1633 let parsed =
1634 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(&raw)
1635 .unwrap_or_default();
1636 *self.automations_v2.write().await = parsed;
1637 Ok(())
1638 }
1639
1640 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1641 if let Some(parent) = self.automations_v2_path.parent() {
1642 fs::create_dir_all(parent).await?;
1643 }
1644 let payload = {
1645 let guard = self.automations_v2.read().await;
1646 serde_json::to_string_pretty(&*guard)?
1647 };
1648 fs::write(&self.automations_v2_path, payload).await?;
1649 Ok(())
1650 }
1651
1652 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1653 if !self.automation_v2_runs_path.exists() {
1654 return Ok(());
1655 }
1656 let raw = fs::read_to_string(&self.automation_v2_runs_path).await?;
1657 let parsed =
1658 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(&raw)
1659 .unwrap_or_default();
1660 *self.automation_v2_runs.write().await = parsed;
1661 Ok(())
1662 }
1663
1664 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1665 if let Some(parent) = self.automation_v2_runs_path.parent() {
1666 fs::create_dir_all(parent).await?;
1667 }
1668 let payload = {
1669 let guard = self.automation_v2_runs.read().await;
1670 serde_json::to_string_pretty(&*guard)?
1671 };
1672 fs::write(&self.automation_v2_runs_path, payload).await?;
1673 Ok(())
1674 }
1675
1676 pub async fn put_automation_v2(
1677 &self,
1678 mut automation: AutomationV2Spec,
1679 ) -> anyhow::Result<AutomationV2Spec> {
1680 if automation.automation_id.trim().is_empty() {
1681 anyhow::bail!("automation_id is required");
1682 }
1683 for agent in &mut automation.agents {
1684 if agent.display_name.trim().is_empty() {
1685 agent.display_name = auto_generated_agent_name(&agent.agent_id);
1686 }
1687 agent.tool_policy.allowlist =
1688 normalize_allowed_tools(agent.tool_policy.allowlist.clone());
1689 agent.tool_policy.denylist =
1690 normalize_allowed_tools(agent.tool_policy.denylist.clone());
1691 agent.mcp_policy.allowed_servers =
1692 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
1693 agent.mcp_policy.allowed_tools = agent
1694 .mcp_policy
1695 .allowed_tools
1696 .take()
1697 .map(normalize_allowed_tools);
1698 }
1699 let now = now_ms();
1700 if automation.created_at_ms == 0 {
1701 automation.created_at_ms = now;
1702 }
1703 automation.updated_at_ms = now;
1704 if automation.next_fire_at_ms.is_none() {
1705 automation.next_fire_at_ms =
1706 automation_schedule_next_fire_at_ms(&automation.schedule, now);
1707 }
1708 self.automations_v2
1709 .write()
1710 .await
1711 .insert(automation.automation_id.clone(), automation.clone());
1712 self.persist_automations_v2().await?;
1713 Ok(automation)
1714 }
1715
1716 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
1717 self.automations_v2.read().await.get(automation_id).cloned()
1718 }
1719
1720 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
1721 let mut rows = self
1722 .automations_v2
1723 .read()
1724 .await
1725 .values()
1726 .cloned()
1727 .collect::<Vec<_>>();
1728 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
1729 rows
1730 }
1731
1732 pub async fn delete_automation_v2(
1733 &self,
1734 automation_id: &str,
1735 ) -> anyhow::Result<Option<AutomationV2Spec>> {
1736 let removed = self.automations_v2.write().await.remove(automation_id);
1737 self.persist_automations_v2().await?;
1738 Ok(removed)
1739 }
1740
1741 pub async fn create_automation_v2_run(
1742 &self,
1743 automation: &AutomationV2Spec,
1744 trigger_type: &str,
1745 ) -> anyhow::Result<AutomationV2RunRecord> {
1746 let now = now_ms();
1747 let pending_nodes = automation
1748 .flow
1749 .nodes
1750 .iter()
1751 .map(|n| n.node_id.clone())
1752 .collect::<Vec<_>>();
1753 let run = AutomationV2RunRecord {
1754 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
1755 automation_id: automation.automation_id.clone(),
1756 trigger_type: trigger_type.to_string(),
1757 status: AutomationRunStatus::Queued,
1758 created_at_ms: now,
1759 updated_at_ms: now,
1760 started_at_ms: None,
1761 finished_at_ms: None,
1762 active_session_ids: Vec::new(),
1763 active_instance_ids: Vec::new(),
1764 checkpoint: AutomationRunCheckpoint {
1765 completed_nodes: Vec::new(),
1766 pending_nodes,
1767 node_outputs: std::collections::HashMap::new(),
1768 },
1769 pause_reason: None,
1770 resume_reason: None,
1771 detail: None,
1772 prompt_tokens: 0,
1773 completion_tokens: 0,
1774 total_tokens: 0,
1775 estimated_cost_usd: 0.0,
1776 };
1777 self.automation_v2_runs
1778 .write()
1779 .await
1780 .insert(run.run_id.clone(), run.clone());
1781 self.persist_automation_v2_runs().await?;
1782 Ok(run)
1783 }
1784
1785 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
1786 self.automation_v2_runs.read().await.get(run_id).cloned()
1787 }
1788
1789 pub async fn list_automation_v2_runs(
1790 &self,
1791 automation_id: Option<&str>,
1792 limit: usize,
1793 ) -> Vec<AutomationV2RunRecord> {
1794 let mut rows = self
1795 .automation_v2_runs
1796 .read()
1797 .await
1798 .values()
1799 .filter(|row| {
1800 if let Some(id) = automation_id {
1801 row.automation_id == id
1802 } else {
1803 true
1804 }
1805 })
1806 .cloned()
1807 .collect::<Vec<_>>();
1808 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1809 rows.truncate(limit.clamp(1, 500));
1810 rows
1811 }
1812
1813 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
1814 let mut guard = self.automation_v2_runs.write().await;
1815 let run_id = guard
1816 .values()
1817 .filter(|row| row.status == AutomationRunStatus::Queued)
1818 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
1819 .map(|row| row.run_id.clone())?;
1820 let now = now_ms();
1821 let run = guard.get_mut(&run_id)?;
1822 run.status = AutomationRunStatus::Running;
1823 run.updated_at_ms = now;
1824 run.started_at_ms.get_or_insert(now);
1825 let claimed = run.clone();
1826 drop(guard);
1827 let _ = self.persist_automation_v2_runs().await;
1828 Some(claimed)
1829 }
1830
1831 pub async fn update_automation_v2_run(
1832 &self,
1833 run_id: &str,
1834 update: impl FnOnce(&mut AutomationV2RunRecord),
1835 ) -> Option<AutomationV2RunRecord> {
1836 let mut guard = self.automation_v2_runs.write().await;
1837 let run = guard.get_mut(run_id)?;
1838 update(run);
1839 run.updated_at_ms = now_ms();
1840 if matches!(
1841 run.status,
1842 AutomationRunStatus::Completed
1843 | AutomationRunStatus::Failed
1844 | AutomationRunStatus::Cancelled
1845 ) {
1846 run.finished_at_ms.get_or_insert_with(now_ms);
1847 }
1848 let out = run.clone();
1849 drop(guard);
1850 let _ = self.persist_automation_v2_runs().await;
1851 Some(out)
1852 }
1853
1854 pub async fn add_automation_v2_session(
1855 &self,
1856 run_id: &str,
1857 session_id: &str,
1858 ) -> Option<AutomationV2RunRecord> {
1859 let updated = self
1860 .update_automation_v2_run(run_id, |row| {
1861 if !row.active_session_ids.iter().any(|id| id == session_id) {
1862 row.active_session_ids.push(session_id.to_string());
1863 }
1864 })
1865 .await;
1866 self.automation_v2_session_runs
1867 .write()
1868 .await
1869 .insert(session_id.to_string(), run_id.to_string());
1870 updated
1871 }
1872
1873 pub async fn clear_automation_v2_session(
1874 &self,
1875 run_id: &str,
1876 session_id: &str,
1877 ) -> Option<AutomationV2RunRecord> {
1878 self.automation_v2_session_runs
1879 .write()
1880 .await
1881 .remove(session_id);
1882 self.update_automation_v2_run(run_id, |row| {
1883 row.active_session_ids.retain(|id| id != session_id);
1884 })
1885 .await
1886 }
1887
1888 pub async fn apply_provider_usage_to_runs(
1889 &self,
1890 session_id: &str,
1891 prompt_tokens: u64,
1892 completion_tokens: u64,
1893 total_tokens: u64,
1894 ) {
1895 if let Some(policy) = self.routine_session_policy(session_id).await {
1896 let rate = self.token_cost_per_1k_usd.max(0.0);
1897 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
1898 let mut guard = self.routine_runs.write().await;
1899 if let Some(run) = guard.get_mut(&policy.run_id) {
1900 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
1901 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
1902 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
1903 run.estimated_cost_usd += delta_cost;
1904 run.updated_at_ms = now_ms();
1905 }
1906 drop(guard);
1907 let _ = self.persist_routine_runs().await;
1908 }
1909
1910 let maybe_v2_run_id = self
1911 .automation_v2_session_runs
1912 .read()
1913 .await
1914 .get(session_id)
1915 .cloned();
1916 if let Some(run_id) = maybe_v2_run_id {
1917 let rate = self.token_cost_per_1k_usd.max(0.0);
1918 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
1919 let mut guard = self.automation_v2_runs.write().await;
1920 if let Some(run) = guard.get_mut(&run_id) {
1921 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
1922 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
1923 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
1924 run.estimated_cost_usd += delta_cost;
1925 run.updated_at_ms = now_ms();
1926 }
1927 drop(guard);
1928 let _ = self.persist_automation_v2_runs().await;
1929 }
1930 }
1931
1932 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
1933 let mut fired = Vec::new();
1934 let mut guard = self.automations_v2.write().await;
1935 for automation in guard.values_mut() {
1936 if automation.status != AutomationV2Status::Active {
1937 continue;
1938 }
1939 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
1940 automation.next_fire_at_ms =
1941 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
1942 continue;
1943 };
1944 if now_ms < next_fire_at_ms {
1945 continue;
1946 }
1947 let run_count =
1948 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
1949 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
1950 automation.next_fire_at_ms = next;
1951 automation.last_fired_at_ms = Some(now_ms);
1952 for _ in 0..run_count {
1953 fired.push(automation.automation_id.clone());
1954 }
1955 }
1956 drop(guard);
1957 let _ = self.persist_automations_v2().await;
1958 fired
1959 }
1960}
1961
1962async fn build_channels_config(
1963 state: &AppState,
1964 channels: &ChannelsConfigFile,
1965) -> Option<ChannelsConfig> {
1966 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1967 return None;
1968 }
1969 Some(ChannelsConfig {
1970 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1971 bot_token: cfg.bot_token,
1972 allowed_users: cfg.allowed_users,
1973 mention_only: cfg.mention_only,
1974 style_profile: cfg.style_profile,
1975 }),
1976 discord: channels.discord.clone().map(|cfg| DiscordConfig {
1977 bot_token: cfg.bot_token,
1978 guild_id: cfg.guild_id,
1979 allowed_users: cfg.allowed_users,
1980 mention_only: cfg.mention_only,
1981 }),
1982 slack: channels.slack.clone().map(|cfg| SlackConfig {
1983 bot_token: cfg.bot_token,
1984 channel_id: cfg.channel_id,
1985 allowed_users: cfg.allowed_users,
1986 }),
1987 server_base_url: state.server_base_url(),
1988 api_token: state.api_token().await.unwrap_or_default(),
1989 tool_policy: channels.tool_policy.clone(),
1990 })
1991}
1992
1993fn normalize_web_ui_prefix(prefix: &str) -> String {
1994 let trimmed = prefix.trim();
1995 if trimmed.is_empty() || trimmed == "/" {
1996 return "/admin".to_string();
1997 }
1998 let with_leading = if trimmed.starts_with('/') {
1999 trimmed.to_string()
2000 } else {
2001 format!("/{trimmed}")
2002 };
2003 with_leading.trim_end_matches('/').to_string()
2004}
2005
2006fn default_web_ui_prefix() -> String {
2007 "/admin".to_string()
2008}
2009
2010fn default_allow_all() -> Vec<String> {
2011 vec!["*".to_string()]
2012}
2013
2014fn default_discord_mention_only() -> bool {
2015 true
2016}
2017
2018fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
2019 normalize_non_empty_list(raw)
2020}
2021
2022fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
2023 let mut out = Vec::new();
2024 let mut seen = std::collections::HashSet::new();
2025 for item in raw {
2026 let normalized = item.trim().to_string();
2027 if normalized.is_empty() {
2028 continue;
2029 }
2030 if seen.insert(normalized.clone()) {
2031 out.push(normalized);
2032 }
2033 }
2034 out
2035}
2036
2037fn resolve_run_stale_ms() -> u64 {
2038 std::env::var("TANDEM_RUN_STALE_MS")
2039 .ok()
2040 .and_then(|v| v.trim().parse::<u64>().ok())
2041 .unwrap_or(120_000)
2042 .clamp(30_000, 600_000)
2043}
2044
2045fn resolve_token_cost_per_1k_usd() -> f64 {
2046 std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
2047 .ok()
2048 .and_then(|v| v.trim().parse::<f64>().ok())
2049 .unwrap_or(0.0)
2050 .max(0.0)
2051}
2052
2053fn resolve_shared_resources_path() -> PathBuf {
2054 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
2055 let trimmed = dir.trim();
2056 if !trimmed.is_empty() {
2057 return PathBuf::from(trimmed).join("shared_resources.json");
2058 }
2059 }
2060 default_state_dir().join("shared_resources.json")
2061}
2062
2063fn resolve_routines_path() -> PathBuf {
2064 if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
2065 let trimmed = dir.trim();
2066 if !trimmed.is_empty() {
2067 return PathBuf::from(trimmed).join("routines.json");
2068 }
2069 }
2070 default_state_dir().join("routines.json")
2071}
2072
2073fn resolve_routine_history_path() -> PathBuf {
2074 if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
2075 let trimmed = root.trim();
2076 if !trimmed.is_empty() {
2077 return PathBuf::from(trimmed).join("routine_history.json");
2078 }
2079 }
2080 default_state_dir().join("routine_history.json")
2081}
2082
2083fn resolve_routine_runs_path() -> PathBuf {
2084 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2085 let trimmed = root.trim();
2086 if !trimmed.is_empty() {
2087 return PathBuf::from(trimmed).join("routine_runs.json");
2088 }
2089 }
2090 default_state_dir().join("routine_runs.json")
2091}
2092
2093fn resolve_automations_v2_path() -> PathBuf {
2094 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2095 let trimmed = root.trim();
2096 if !trimmed.is_empty() {
2097 return PathBuf::from(trimmed).join("automations_v2.json");
2098 }
2099 }
2100 default_state_dir().join("automations_v2.json")
2101}
2102
2103fn resolve_automation_v2_runs_path() -> PathBuf {
2104 if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2105 let trimmed = root.trim();
2106 if !trimmed.is_empty() {
2107 return PathBuf::from(trimmed).join("automation_v2_runs.json");
2108 }
2109 }
2110 default_state_dir().join("automation_v2_runs.json")
2111}
2112
2113fn resolve_agent_team_audit_path() -> PathBuf {
2114 if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
2115 let trimmed = base.trim();
2116 if !trimmed.is_empty() {
2117 return PathBuf::from(trimmed)
2118 .join("agent-team")
2119 .join("audit.log.jsonl");
2120 }
2121 }
2122 default_state_dir()
2123 .join("agent-team")
2124 .join("audit.log.jsonl")
2125}
2126
2127fn default_state_dir() -> PathBuf {
2128 if let Ok(paths) = resolve_shared_paths() {
2129 return paths.engine_state_dir;
2130 }
2131 if let Some(data_dir) = dirs::data_dir() {
2132 return data_dir.join("tandem").join("data");
2133 }
2134 dirs::home_dir()
2135 .map(|home| home.join(".tandem").join("data"))
2136 .unwrap_or_else(|| PathBuf::from(".tandem"))
2137}
2138
2139fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
2140 match schedule {
2141 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
2142 RoutineSchedule::Cron { .. } => None,
2143 }
2144}
2145
2146fn parse_timezone(timezone: &str) -> Option<Tz> {
2147 timezone.trim().parse::<Tz>().ok()
2148}
2149
2150fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
2151 let tz = parse_timezone(timezone)?;
2152 let schedule = Schedule::from_str(expression).ok()?;
2153 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
2154 let local_from = from_dt.with_timezone(&tz);
2155 let next = schedule.after(&local_from).next()?;
2156 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
2157}
2158
2159fn compute_next_schedule_fire_at_ms(
2160 schedule: &RoutineSchedule,
2161 timezone: &str,
2162 from_ms: u64,
2163) -> Option<u64> {
2164 let _ = parse_timezone(timezone)?;
2165 match schedule {
2166 RoutineSchedule::IntervalSeconds { seconds } => {
2167 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
2168 }
2169 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
2170 }
2171}
2172
2173fn compute_misfire_plan_for_schedule(
2174 now_ms: u64,
2175 next_fire_at_ms: u64,
2176 schedule: &RoutineSchedule,
2177 timezone: &str,
2178 policy: &RoutineMisfirePolicy,
2179) -> (u32, u64) {
2180 match schedule {
2181 RoutineSchedule::IntervalSeconds { .. } => {
2182 let Some(interval_ms) = routine_interval_ms(schedule) else {
2183 return (0, next_fire_at_ms);
2184 };
2185 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
2186 }
2187 RoutineSchedule::Cron { expression } => {
2188 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
2189 .unwrap_or_else(|| now_ms.saturating_add(60_000));
2190 match policy {
2191 RoutineMisfirePolicy::Skip => (0, aligned_next),
2192 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
2193 RoutineMisfirePolicy::CatchUp { max_runs } => {
2194 let mut count = 0u32;
2195 let mut cursor = next_fire_at_ms;
2196 while cursor <= now_ms && count < *max_runs {
2197 count = count.saturating_add(1);
2198 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
2199 break;
2200 };
2201 if next <= cursor {
2202 break;
2203 }
2204 cursor = next;
2205 }
2206 (count, aligned_next)
2207 }
2208 }
2209 }
2210 }
2211}
2212
2213fn compute_misfire_plan(
2214 now_ms: u64,
2215 next_fire_at_ms: u64,
2216 interval_ms: u64,
2217 policy: &RoutineMisfirePolicy,
2218) -> (u32, u64) {
2219 if now_ms < next_fire_at_ms || interval_ms == 0 {
2220 return (0, next_fire_at_ms);
2221 }
2222 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
2223 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
2224 match policy {
2225 RoutineMisfirePolicy::Skip => (0, aligned_next),
2226 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
2227 RoutineMisfirePolicy::CatchUp { max_runs } => {
2228 let count = missed.min(u64::from(*max_runs)) as u32;
2229 (count, aligned_next)
2230 }
2231 }
2232}
2233
2234fn auto_generated_agent_name(agent_id: &str) -> String {
2235 let names = [
2236 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
2237 ];
2238 let digest = Sha256::digest(agent_id.as_bytes());
2239 let idx = usize::from(digest[0]) % names.len();
2240 format!("{}-{:02x}", names[idx], digest[1])
2241}
2242
2243fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
2244 match schedule.schedule_type {
2245 AutomationV2ScheduleType::Manual => None,
2246 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
2247 seconds: schedule.interval_seconds.unwrap_or(60),
2248 }),
2249 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
2250 expression: schedule.cron_expression.clone().unwrap_or_default(),
2251 }),
2252 }
2253}
2254
2255fn automation_schedule_next_fire_at_ms(
2256 schedule: &AutomationV2Schedule,
2257 from_ms: u64,
2258) -> Option<u64> {
2259 let routine_schedule = schedule_from_automation_v2(schedule)?;
2260 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
2261}
2262
2263fn automation_schedule_due_count(
2264 schedule: &AutomationV2Schedule,
2265 now_ms: u64,
2266 next_fire_at_ms: u64,
2267) -> u32 {
2268 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
2269 return 0;
2270 };
2271 let (count, _) = compute_misfire_plan_for_schedule(
2272 now_ms,
2273 next_fire_at_ms,
2274 &routine_schedule,
2275 &schedule.timezone,
2276 &schedule.misfire_policy,
2277 );
2278 count.max(1)
2279}
2280
2281#[derive(Debug, Clone, PartialEq, Eq)]
2282pub enum RoutineExecutionDecision {
2283 Allowed,
2284 RequiresApproval { reason: String },
2285 Blocked { reason: String },
2286}
2287
2288pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
2289 let entrypoint = routine.entrypoint.to_ascii_lowercase();
2290 if entrypoint.starts_with("connector.")
2291 || entrypoint.starts_with("integration.")
2292 || entrypoint.contains("external")
2293 {
2294 return true;
2295 }
2296 routine
2297 .args
2298 .get("uses_external_integrations")
2299 .and_then(|v| v.as_bool())
2300 .unwrap_or(false)
2301 || routine
2302 .args
2303 .get("connector_id")
2304 .and_then(|v| v.as_str())
2305 .is_some()
2306}
2307
2308pub fn evaluate_routine_execution_policy(
2309 routine: &RoutineSpec,
2310 trigger_type: &str,
2311) -> RoutineExecutionDecision {
2312 if !routine_uses_external_integrations(routine) {
2313 return RoutineExecutionDecision::Allowed;
2314 }
2315 if !routine.external_integrations_allowed {
2316 return RoutineExecutionDecision::Blocked {
2317 reason: "external integrations are disabled by policy".to_string(),
2318 };
2319 }
2320 if routine.requires_approval {
2321 return RoutineExecutionDecision::RequiresApproval {
2322 reason: format!(
2323 "manual approval required before external side effects ({})",
2324 trigger_type
2325 ),
2326 };
2327 }
2328 RoutineExecutionDecision::Allowed
2329}
2330
2331fn is_valid_resource_key(key: &str) -> bool {
2332 let trimmed = key.trim();
2333 if trimmed.is_empty() {
2334 return false;
2335 }
2336 if trimmed == "swarm.active_tasks" {
2337 return true;
2338 }
2339 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
2340 if !allowed_prefix
2341 .iter()
2342 .any(|prefix| trimmed.starts_with(prefix))
2343 {
2344 return false;
2345 }
2346 !trimmed.contains("//")
2347}
2348
2349impl Deref for AppState {
2350 type Target = RuntimeState;
2351
2352 fn deref(&self) -> &Self::Target {
2353 self.runtime
2354 .get()
2355 .expect("runtime accessed before startup completion")
2356 }
2357}
2358
2359#[derive(Clone)]
2360struct ServerPromptContextHook {
2361 state: AppState,
2362}
2363
2364impl ServerPromptContextHook {
2365 fn new(state: AppState) -> Self {
2366 Self { state }
2367 }
2368
2369 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
2370 let paths = resolve_shared_paths().ok()?;
2371 MemoryDatabase::new(&paths.memory_db_path).await.ok()
2372 }
2373
2374 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
2375 let paths = resolve_shared_paths().ok()?;
2376 tandem_memory::MemoryManager::new(&paths.memory_db_path)
2377 .await
2378 .ok()
2379 }
2380
2381 fn hash_query(input: &str) -> String {
2382 let mut hasher = Sha256::new();
2383 hasher.update(input.as_bytes());
2384 format!("{:x}", hasher.finalize())
2385 }
2386
2387 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
2388 let mut out = vec!["<memory_context>".to_string()];
2389 let mut used = 0usize;
2390 for hit in hits {
2391 let text = hit
2392 .record
2393 .content
2394 .split_whitespace()
2395 .take(60)
2396 .collect::<Vec<_>>()
2397 .join(" ");
2398 let line = format!(
2399 "- [{:.3}] {} (source={}, run={})",
2400 hit.score, text, hit.record.source_type, hit.record.run_id
2401 );
2402 used = used.saturating_add(line.len());
2403 if used > 2200 {
2404 break;
2405 }
2406 out.push(line);
2407 }
2408 out.push("</memory_context>".to_string());
2409 out.join("\n")
2410 }
2411
2412 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
2413 chunk
2414 .metadata
2415 .as_ref()
2416 .and_then(|meta| meta.get("source_url"))
2417 .and_then(Value::as_str)
2418 .map(str::trim)
2419 .filter(|v| !v.is_empty())
2420 .map(ToString::to_string)
2421 }
2422
2423 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
2424 if let Some(path) = chunk
2425 .metadata
2426 .as_ref()
2427 .and_then(|meta| meta.get("relative_path"))
2428 .and_then(Value::as_str)
2429 .map(str::trim)
2430 .filter(|v| !v.is_empty())
2431 {
2432 return path.to_string();
2433 }
2434 chunk
2435 .source
2436 .strip_prefix("guide_docs:")
2437 .unwrap_or(chunk.source.as_str())
2438 .to_string()
2439 }
2440
2441 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
2442 let mut out = vec!["<docs_context>".to_string()];
2443 let mut used = 0usize;
2444 for hit in hits {
2445 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
2446 let path = Self::extract_docs_relative_path(&hit.chunk);
2447 let text = hit
2448 .chunk
2449 .content
2450 .split_whitespace()
2451 .take(70)
2452 .collect::<Vec<_>>()
2453 .join(" ");
2454 let line = format!(
2455 "- [{:.3}] {} (doc_path={}, source_url={})",
2456 hit.similarity, text, path, url
2457 );
2458 used = used.saturating_add(line.len());
2459 if used > 2800 {
2460 break;
2461 }
2462 out.push(line);
2463 }
2464 out.push("</docs_context>".to_string());
2465 out.join("\n")
2466 }
2467
2468 async fn search_embedded_docs(
2469 &self,
2470 query: &str,
2471 limit: usize,
2472 ) -> Vec<tandem_memory::types::MemorySearchResult> {
2473 let Some(manager) = self.open_memory_manager().await else {
2474 return Vec::new();
2475 };
2476 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
2477 manager
2478 .search(
2479 query,
2480 Some(MemoryTier::Global),
2481 None,
2482 None,
2483 Some(search_limit),
2484 )
2485 .await
2486 .unwrap_or_default()
2487 .into_iter()
2488 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
2489 .take(limit)
2490 .collect()
2491 }
2492
2493 fn should_skip_memory_injection(query: &str) -> bool {
2494 let trimmed = query.trim();
2495 if trimmed.is_empty() {
2496 return true;
2497 }
2498 let lower = trimmed.to_ascii_lowercase();
2499 let social = [
2500 "hi",
2501 "hello",
2502 "hey",
2503 "thanks",
2504 "thank you",
2505 "ok",
2506 "okay",
2507 "cool",
2508 "nice",
2509 "yo",
2510 "good morning",
2511 "good afternoon",
2512 "good evening",
2513 ];
2514 lower.len() <= 32 && social.contains(&lower.as_str())
2515 }
2516
2517 fn personality_preset_text(preset: &str) -> &'static str {
2518 match preset {
2519 "concise" => {
2520 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
2521 }
2522 "friendly" => {
2523 "Default style: friendly and supportive while staying technically rigorous and concrete."
2524 }
2525 "mentor" => {
2526 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
2527 }
2528 "critical" => {
2529 "Default style: critical and risk-first. Surface failure modes and assumptions early."
2530 }
2531 _ => {
2532 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
2533 }
2534 }
2535 }
2536
2537 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
2538 let allow_agent_override = agent_name
2539 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
2540 .unwrap_or(false);
2541 let legacy_bot_name = config
2542 .get("bot_name")
2543 .and_then(Value::as_str)
2544 .map(str::trim)
2545 .filter(|v| !v.is_empty());
2546 let bot_name = config
2547 .get("identity")
2548 .and_then(|identity| identity.get("bot"))
2549 .and_then(|bot| bot.get("canonical_name"))
2550 .and_then(Value::as_str)
2551 .map(str::trim)
2552 .filter(|v| !v.is_empty())
2553 .or(legacy_bot_name)
2554 .unwrap_or("Tandem");
2555
2556 let default_profile = config
2557 .get("identity")
2558 .and_then(|identity| identity.get("personality"))
2559 .and_then(|personality| personality.get("default"));
2560 let default_preset = default_profile
2561 .and_then(|profile| profile.get("preset"))
2562 .and_then(Value::as_str)
2563 .map(str::trim)
2564 .filter(|v| !v.is_empty())
2565 .unwrap_or("balanced");
2566 let default_custom = default_profile
2567 .and_then(|profile| profile.get("custom_instructions"))
2568 .and_then(Value::as_str)
2569 .map(str::trim)
2570 .filter(|v| !v.is_empty())
2571 .map(ToString::to_string);
2572 let legacy_persona = config
2573 .get("persona")
2574 .and_then(Value::as_str)
2575 .map(str::trim)
2576 .filter(|v| !v.is_empty())
2577 .map(ToString::to_string);
2578
2579 let per_agent_profile = if allow_agent_override {
2580 agent_name.and_then(|name| {
2581 config
2582 .get("identity")
2583 .and_then(|identity| identity.get("personality"))
2584 .and_then(|personality| personality.get("per_agent"))
2585 .and_then(|per_agent| per_agent.get(name))
2586 })
2587 } else {
2588 None
2589 };
2590 let preset = per_agent_profile
2591 .and_then(|profile| profile.get("preset"))
2592 .and_then(Value::as_str)
2593 .map(str::trim)
2594 .filter(|v| !v.is_empty())
2595 .unwrap_or(default_preset);
2596 let custom = per_agent_profile
2597 .and_then(|profile| profile.get("custom_instructions"))
2598 .and_then(Value::as_str)
2599 .map(str::trim)
2600 .filter(|v| !v.is_empty())
2601 .map(ToString::to_string)
2602 .or(default_custom)
2603 .or(legacy_persona);
2604
2605 let mut lines = vec![
2606 format!("You are {bot_name}, an AI assistant."),
2607 Self::personality_preset_text(preset).to_string(),
2608 ];
2609 if let Some(custom) = custom {
2610 lines.push(format!("Additional personality instructions: {custom}"));
2611 }
2612 Some(lines.join("\n"))
2613 }
2614}
2615
2616impl PromptContextHook for ServerPromptContextHook {
2617 fn augment_provider_messages(
2618 &self,
2619 ctx: PromptContextHookContext,
2620 mut messages: Vec<ChatMessage>,
2621 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
2622 let this = self.clone();
2623 Box::pin(async move {
2624 if !this.state.is_ready() {
2627 return Ok(messages);
2628 }
2629 let run = this.state.run_registry.get(&ctx.session_id).await;
2630 let Some(run) = run else {
2631 return Ok(messages);
2632 };
2633 let config = this.state.config.get_effective_value().await;
2634 if let Some(identity_block) =
2635 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
2636 {
2637 messages.push(ChatMessage {
2638 role: "system".to_string(),
2639 content: identity_block,
2640 attachments: Vec::new(),
2641 });
2642 }
2643 let run_id = run.run_id;
2644 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
2645 let query = messages
2646 .iter()
2647 .rev()
2648 .find(|m| m.role == "user")
2649 .map(|m| m.content.clone())
2650 .unwrap_or_default();
2651 if query.trim().is_empty() {
2652 return Ok(messages);
2653 }
2654 if Self::should_skip_memory_injection(&query) {
2655 return Ok(messages);
2656 }
2657
2658 let docs_hits = this.search_embedded_docs(&query, 6).await;
2659 if !docs_hits.is_empty() {
2660 let docs_block = Self::build_docs_memory_block(&docs_hits);
2661 messages.push(ChatMessage {
2662 role: "system".to_string(),
2663 content: docs_block.clone(),
2664 attachments: Vec::new(),
2665 });
2666 this.state.event_bus.publish(EngineEvent::new(
2667 "memory.docs.context.injected",
2668 json!({
2669 "runID": run_id,
2670 "sessionID": ctx.session_id,
2671 "messageID": ctx.message_id,
2672 "iteration": ctx.iteration,
2673 "count": docs_hits.len(),
2674 "tokenSizeApprox": docs_block.split_whitespace().count(),
2675 "sourcePrefix": "guide_docs:"
2676 }),
2677 ));
2678 return Ok(messages);
2679 }
2680
2681 let Some(db) = this.open_memory_db().await else {
2682 return Ok(messages);
2683 };
2684 let started = now_ms();
2685 let hits = db
2686 .search_global_memory(&user_id, &query, 8, None, None, None)
2687 .await
2688 .unwrap_or_default();
2689 let latency_ms = now_ms().saturating_sub(started);
2690 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
2691 this.state.event_bus.publish(EngineEvent::new(
2692 "memory.search.performed",
2693 json!({
2694 "runID": run_id,
2695 "sessionID": ctx.session_id,
2696 "messageID": ctx.message_id,
2697 "providerID": ctx.provider_id,
2698 "modelID": ctx.model_id,
2699 "iteration": ctx.iteration,
2700 "queryHash": Self::hash_query(&query),
2701 "resultCount": hits.len(),
2702 "scoreMin": scores.iter().copied().reduce(f64::min),
2703 "scoreMax": scores.iter().copied().reduce(f64::max),
2704 "scores": scores,
2705 "latencyMs": latency_ms,
2706 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
2707 }),
2708 ));
2709
2710 if hits.is_empty() {
2711 return Ok(messages);
2712 }
2713
2714 let memory_block = Self::build_memory_block(&hits);
2715 messages.push(ChatMessage {
2716 role: "system".to_string(),
2717 content: memory_block.clone(),
2718 attachments: Vec::new(),
2719 });
2720 this.state.event_bus.publish(EngineEvent::new(
2721 "memory.context.injected",
2722 json!({
2723 "runID": run_id,
2724 "sessionID": ctx.session_id,
2725 "messageID": ctx.message_id,
2726 "iteration": ctx.iteration,
2727 "count": hits.len(),
2728 "tokenSizeApprox": memory_block.split_whitespace().count(),
2729 }),
2730 ));
2731 Ok(messages)
2732 })
2733 }
2734}
2735
2736fn extract_event_session_id(properties: &Value) -> Option<String> {
2737 properties
2738 .get("sessionID")
2739 .or_else(|| properties.get("sessionId"))
2740 .or_else(|| properties.get("id"))
2741 .and_then(|v| v.as_str())
2742 .map(|s| s.to_string())
2743}
2744
2745fn extract_event_run_id(properties: &Value) -> Option<String> {
2746 properties
2747 .get("runID")
2748 .or_else(|| properties.get("run_id"))
2749 .and_then(|v| v.as_str())
2750 .map(|s| s.to_string())
2751}
2752
2753fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
2754 let session_id = extract_event_session_id(&event.properties)?;
2755 let run_id = extract_event_run_id(&event.properties);
2756 let key = format!("run/{session_id}/status");
2757
2758 let mut base = serde_json::Map::new();
2759 base.insert("sessionID".to_string(), Value::String(session_id));
2760 if let Some(run_id) = run_id {
2761 base.insert("runID".to_string(), Value::String(run_id));
2762 }
2763
2764 match event.event_type.as_str() {
2765 "session.run.started" => {
2766 base.insert("state".to_string(), Value::String("running".to_string()));
2767 base.insert("phase".to_string(), Value::String("run".to_string()));
2768 base.insert(
2769 "eventType".to_string(),
2770 Value::String("session.run.started".to_string()),
2771 );
2772 Some(StatusIndexUpdate {
2773 key,
2774 value: Value::Object(base),
2775 })
2776 }
2777 "session.run.finished" => {
2778 base.insert("state".to_string(), Value::String("finished".to_string()));
2779 base.insert("phase".to_string(), Value::String("run".to_string()));
2780 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
2781 base.insert("result".to_string(), Value::String(status.to_string()));
2782 }
2783 base.insert(
2784 "eventType".to_string(),
2785 Value::String("session.run.finished".to_string()),
2786 );
2787 Some(StatusIndexUpdate {
2788 key,
2789 value: Value::Object(base),
2790 })
2791 }
2792 "message.part.updated" => {
2793 let part_type = event
2794 .properties
2795 .get("part")
2796 .and_then(|v| v.get("type"))
2797 .and_then(|v| v.as_str())?;
2798 let (phase, tool_active) = match part_type {
2799 "tool-invocation" => ("tool", true),
2800 "tool-result" => ("run", false),
2801 _ => return None,
2802 };
2803 base.insert("state".to_string(), Value::String("running".to_string()));
2804 base.insert("phase".to_string(), Value::String(phase.to_string()));
2805 base.insert("toolActive".to_string(), Value::Bool(tool_active));
2806 if let Some(tool_name) = event
2807 .properties
2808 .get("part")
2809 .and_then(|v| v.get("tool"))
2810 .and_then(|v| v.as_str())
2811 {
2812 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
2813 }
2814 base.insert(
2815 "eventType".to_string(),
2816 Value::String("message.part.updated".to_string()),
2817 );
2818 Some(StatusIndexUpdate {
2819 key,
2820 value: Value::Object(base),
2821 })
2822 }
2823 _ => None,
2824 }
2825}
2826
2827pub async fn run_status_indexer(state: AppState) {
2828 let mut rx = state.event_bus.subscribe();
2829 loop {
2830 match rx.recv().await {
2831 Ok(event) => {
2832 if let Some(update) = derive_status_index_update(&event) {
2833 if let Err(error) = state
2834 .put_shared_resource(
2835 update.key,
2836 update.value,
2837 None,
2838 "system.status_indexer".to_string(),
2839 None,
2840 )
2841 .await
2842 {
2843 tracing::warn!("status indexer failed to persist update: {error:?}");
2844 }
2845 }
2846 }
2847 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2848 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2849 }
2850 }
2851}
2852
2853pub async fn run_agent_team_supervisor(state: AppState) {
2854 let mut rx = state.event_bus.subscribe();
2855 loop {
2856 match rx.recv().await {
2857 Ok(event) => {
2858 state.agent_teams.handle_engine_event(&state, &event).await;
2859 }
2860 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2861 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2862 }
2863 }
2864}
2865
2866pub async fn run_usage_aggregator(state: AppState) {
2867 let mut rx = state.event_bus.subscribe();
2868 loop {
2869 match rx.recv().await {
2870 Ok(event) => {
2871 if event.event_type != "provider.usage" {
2872 continue;
2873 }
2874 let session_id = event
2875 .properties
2876 .get("sessionID")
2877 .and_then(|v| v.as_str())
2878 .unwrap_or("");
2879 if session_id.is_empty() {
2880 continue;
2881 }
2882 let prompt_tokens = event
2883 .properties
2884 .get("promptTokens")
2885 .and_then(|v| v.as_u64())
2886 .unwrap_or(0);
2887 let completion_tokens = event
2888 .properties
2889 .get("completionTokens")
2890 .and_then(|v| v.as_u64())
2891 .unwrap_or(0);
2892 let total_tokens = event
2893 .properties
2894 .get("totalTokens")
2895 .and_then(|v| v.as_u64())
2896 .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
2897 state
2898 .apply_provider_usage_to_runs(
2899 session_id,
2900 prompt_tokens,
2901 completion_tokens,
2902 total_tokens,
2903 )
2904 .await;
2905 }
2906 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2907 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2908 }
2909 }
2910}
2911
2912pub async fn run_routine_scheduler(state: AppState) {
2913 loop {
2914 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
2915 let now = now_ms();
2916 let plans = state.evaluate_routine_misfires(now).await;
2917 for plan in plans {
2918 let Some(routine) = state.get_routine(&plan.routine_id).await else {
2919 continue;
2920 };
2921 match evaluate_routine_execution_policy(&routine, "scheduled") {
2922 RoutineExecutionDecision::Allowed => {
2923 let _ = state.mark_routine_fired(&plan.routine_id, now).await;
2924 let run = state
2925 .create_routine_run(
2926 &routine,
2927 "scheduled",
2928 plan.run_count,
2929 RoutineRunStatus::Queued,
2930 None,
2931 )
2932 .await;
2933 state
2934 .append_routine_history(RoutineHistoryEvent {
2935 routine_id: plan.routine_id.clone(),
2936 trigger_type: "scheduled".to_string(),
2937 run_count: plan.run_count,
2938 fired_at_ms: now,
2939 status: "queued".to_string(),
2940 detail: None,
2941 })
2942 .await;
2943 state.event_bus.publish(EngineEvent::new(
2944 "routine.fired",
2945 serde_json::json!({
2946 "routineID": plan.routine_id,
2947 "runID": run.run_id,
2948 "runCount": plan.run_count,
2949 "scheduledAtMs": plan.scheduled_at_ms,
2950 "nextFireAtMs": plan.next_fire_at_ms,
2951 }),
2952 ));
2953 state.event_bus.publish(EngineEvent::new(
2954 "routine.run.created",
2955 serde_json::json!({
2956 "run": run,
2957 }),
2958 ));
2959 }
2960 RoutineExecutionDecision::RequiresApproval { reason } => {
2961 let run = state
2962 .create_routine_run(
2963 &routine,
2964 "scheduled",
2965 plan.run_count,
2966 RoutineRunStatus::PendingApproval,
2967 Some(reason.clone()),
2968 )
2969 .await;
2970 state
2971 .append_routine_history(RoutineHistoryEvent {
2972 routine_id: plan.routine_id.clone(),
2973 trigger_type: "scheduled".to_string(),
2974 run_count: plan.run_count,
2975 fired_at_ms: now,
2976 status: "pending_approval".to_string(),
2977 detail: Some(reason.clone()),
2978 })
2979 .await;
2980 state.event_bus.publish(EngineEvent::new(
2981 "routine.approval_required",
2982 serde_json::json!({
2983 "routineID": plan.routine_id,
2984 "runID": run.run_id,
2985 "runCount": plan.run_count,
2986 "triggerType": "scheduled",
2987 "reason": reason,
2988 }),
2989 ));
2990 state.event_bus.publish(EngineEvent::new(
2991 "routine.run.created",
2992 serde_json::json!({
2993 "run": run,
2994 }),
2995 ));
2996 }
2997 RoutineExecutionDecision::Blocked { reason } => {
2998 let run = state
2999 .create_routine_run(
3000 &routine,
3001 "scheduled",
3002 plan.run_count,
3003 RoutineRunStatus::BlockedPolicy,
3004 Some(reason.clone()),
3005 )
3006 .await;
3007 state
3008 .append_routine_history(RoutineHistoryEvent {
3009 routine_id: plan.routine_id.clone(),
3010 trigger_type: "scheduled".to_string(),
3011 run_count: plan.run_count,
3012 fired_at_ms: now,
3013 status: "blocked_policy".to_string(),
3014 detail: Some(reason.clone()),
3015 })
3016 .await;
3017 state.event_bus.publish(EngineEvent::new(
3018 "routine.blocked",
3019 serde_json::json!({
3020 "routineID": plan.routine_id,
3021 "runID": run.run_id,
3022 "runCount": plan.run_count,
3023 "triggerType": "scheduled",
3024 "reason": reason,
3025 }),
3026 ));
3027 state.event_bus.publish(EngineEvent::new(
3028 "routine.run.created",
3029 serde_json::json!({
3030 "run": run,
3031 }),
3032 ));
3033 }
3034 }
3035 }
3036 }
3037}
3038
3039pub async fn run_routine_executor(state: AppState) {
3040 loop {
3041 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
3042 let Some(run) = state.claim_next_queued_routine_run().await else {
3043 continue;
3044 };
3045
3046 state.event_bus.publish(EngineEvent::new(
3047 "routine.run.started",
3048 serde_json::json!({
3049 "runID": run.run_id,
3050 "routineID": run.routine_id,
3051 "triggerType": run.trigger_type,
3052 "startedAtMs": now_ms(),
3053 }),
3054 ));
3055
3056 let workspace_root = state.workspace_index.snapshot().await.root;
3057 let mut session = Session::new(
3058 Some(format!("Routine {}", run.routine_id)),
3059 Some(workspace_root.clone()),
3060 );
3061 let session_id = session.id.clone();
3062 session.workspace_root = Some(workspace_root);
3063
3064 if let Err(error) = state.storage.save_session(session).await {
3065 let detail = format!("failed to create routine session: {error}");
3066 let _ = state
3067 .update_routine_run_status(
3068 &run.run_id,
3069 RoutineRunStatus::Failed,
3070 Some(detail.clone()),
3071 )
3072 .await;
3073 state.event_bus.publish(EngineEvent::new(
3074 "routine.run.failed",
3075 serde_json::json!({
3076 "runID": run.run_id,
3077 "routineID": run.routine_id,
3078 "reason": detail,
3079 }),
3080 ));
3081 continue;
3082 }
3083
3084 state
3085 .set_routine_session_policy(
3086 session_id.clone(),
3087 run.run_id.clone(),
3088 run.routine_id.clone(),
3089 run.allowed_tools.clone(),
3090 )
3091 .await;
3092 state
3093 .add_active_session_id(&run.run_id, session_id.clone())
3094 .await;
3095 state
3096 .engine_loop
3097 .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
3098 .await;
3099
3100 let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
3101 if let Some(spec) = selected_model.as_ref() {
3102 state.event_bus.publish(EngineEvent::new(
3103 "routine.run.model_selected",
3104 serde_json::json!({
3105 "runID": run.run_id,
3106 "routineID": run.routine_id,
3107 "providerID": spec.provider_id,
3108 "modelID": spec.model_id,
3109 "source": model_source,
3110 }),
3111 ));
3112 }
3113
3114 let request = SendMessageRequest {
3115 parts: vec![MessagePartInput::Text {
3116 text: build_routine_prompt(&state, &run).await,
3117 }],
3118 model: selected_model,
3119 agent: None,
3120 tool_mode: None,
3121 tool_allowlist: None,
3122 context_mode: None,
3123 };
3124
3125 let run_result = state
3126 .engine_loop
3127 .run_prompt_async_with_context(
3128 session_id.clone(),
3129 request,
3130 Some(format!("routine:{}", run.run_id)),
3131 )
3132 .await;
3133
3134 state.clear_routine_session_policy(&session_id).await;
3135 state
3136 .clear_active_session_id(&run.run_id, &session_id)
3137 .await;
3138 state
3139 .engine_loop
3140 .clear_session_allowed_tools(&session_id)
3141 .await;
3142
3143 match run_result {
3144 Ok(()) => {
3145 append_configured_output_artifacts(&state, &run).await;
3146 let _ = state
3147 .update_routine_run_status(
3148 &run.run_id,
3149 RoutineRunStatus::Completed,
3150 Some("routine run completed".to_string()),
3151 )
3152 .await;
3153 state.event_bus.publish(EngineEvent::new(
3154 "routine.run.completed",
3155 serde_json::json!({
3156 "runID": run.run_id,
3157 "routineID": run.routine_id,
3158 "sessionID": session_id,
3159 "finishedAtMs": now_ms(),
3160 }),
3161 ));
3162 }
3163 Err(error) => {
3164 if let Some(latest) = state.get_routine_run(&run.run_id).await {
3165 if latest.status == RoutineRunStatus::Paused {
3166 state.event_bus.publish(EngineEvent::new(
3167 "routine.run.paused",
3168 serde_json::json!({
3169 "runID": run.run_id,
3170 "routineID": run.routine_id,
3171 "sessionID": session_id,
3172 "finishedAtMs": now_ms(),
3173 }),
3174 ));
3175 continue;
3176 }
3177 }
3178 let detail = truncate_text(&error.to_string(), 500);
3179 let _ = state
3180 .update_routine_run_status(
3181 &run.run_id,
3182 RoutineRunStatus::Failed,
3183 Some(detail.clone()),
3184 )
3185 .await;
3186 state.event_bus.publish(EngineEvent::new(
3187 "routine.run.failed",
3188 serde_json::json!({
3189 "runID": run.run_id,
3190 "routineID": run.routine_id,
3191 "sessionID": session_id,
3192 "reason": detail,
3193 "finishedAtMs": now_ms(),
3194 }),
3195 ));
3196 }
3197 }
3198 }
3199}
3200
3201pub async fn run_automation_v2_scheduler(state: AppState) {
3202 loop {
3203 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
3204 let now = now_ms();
3205 let due = state.evaluate_automation_v2_misfires(now).await;
3206 for automation_id in due {
3207 let Some(automation) = state.get_automation_v2(&automation_id).await else {
3208 continue;
3209 };
3210 if let Ok(run) = state
3211 .create_automation_v2_run(&automation, "scheduled")
3212 .await
3213 {
3214 state.event_bus.publish(EngineEvent::new(
3215 "automation.v2.run.created",
3216 serde_json::json!({
3217 "automationID": automation_id,
3218 "run": run,
3219 "triggerType": "scheduled",
3220 }),
3221 ));
3222 }
3223 }
3224 }
3225}
3226
3227async fn execute_automation_v2_node(
3228 state: &AppState,
3229 run_id: &str,
3230 automation: &AutomationV2Spec,
3231 node: &AutomationFlowNode,
3232 agent: &AutomationAgentProfile,
3233) -> anyhow::Result<Value> {
3234 let workspace_root = state.workspace_index.snapshot().await.root;
3235 let mut session = Session::new(
3236 Some(format!(
3237 "Automation {} / {}",
3238 automation.automation_id, node.node_id
3239 )),
3240 Some(workspace_root.clone()),
3241 );
3242 let session_id = session.id.clone();
3243 session.workspace_root = Some(workspace_root);
3244 state.storage.save_session(session).await?;
3245
3246 state.add_automation_v2_session(run_id, &session_id).await;
3247
3248 let mut allowlist = agent.tool_policy.allowlist.clone();
3249 if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
3250 allowlist.extend(mcp_tools.clone());
3251 }
3252 state
3253 .engine_loop
3254 .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
3255 .await;
3256
3257 let model = agent
3258 .model_policy
3259 .as_ref()
3260 .and_then(|policy| policy.get("default_model"))
3261 .and_then(parse_model_spec);
3262 let prompt = format!(
3263 "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}",
3264 automation.automation_id, run_id, node.node_id, agent.display_name, node.objective
3265 );
3266 let req = SendMessageRequest {
3267 parts: vec![MessagePartInput::Text { text: prompt }],
3268 model,
3269 agent: None,
3270 tool_mode: None,
3271 tool_allowlist: None,
3272 context_mode: None,
3273 };
3274 let result = state
3275 .engine_loop
3276 .run_prompt_async_with_context(
3277 session_id.clone(),
3278 req,
3279 Some(format!("automation-v2:{run_id}")),
3280 )
3281 .await;
3282
3283 state
3284 .engine_loop
3285 .clear_session_allowed_tools(&session_id)
3286 .await;
3287 state.clear_automation_v2_session(run_id, &session_id).await;
3288
3289 result.map(|_| {
3290 serde_json::json!({
3291 "sessionID": session_id,
3292 "status": "completed",
3293 })
3294 })
3295}
3296
3297pub async fn run_automation_v2_executor(state: AppState) {
3298 loop {
3299 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3300 let Some(run) = state.claim_next_queued_automation_v2_run().await else {
3301 continue;
3302 };
3303 let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
3304 let _ = state
3305 .update_automation_v2_run(&run.run_id, |row| {
3306 row.status = AutomationRunStatus::Failed;
3307 row.detail = Some("automation not found".to_string());
3308 })
3309 .await;
3310 continue;
3311 };
3312 let max_parallel = automation
3313 .execution
3314 .max_parallel_agents
3315 .unwrap_or(1)
3316 .clamp(1, 16) as usize;
3317
3318 loop {
3319 let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
3320 break;
3321 };
3322 if matches!(
3323 latest.status,
3324 AutomationRunStatus::Paused
3325 | AutomationRunStatus::Pausing
3326 | AutomationRunStatus::Cancelled
3327 | AutomationRunStatus::Failed
3328 | AutomationRunStatus::Completed
3329 ) {
3330 break;
3331 }
3332 if latest.checkpoint.pending_nodes.is_empty() {
3333 let _ = state
3334 .update_automation_v2_run(&run.run_id, |row| {
3335 row.status = AutomationRunStatus::Completed;
3336 row.detail = Some("automation run completed".to_string());
3337 })
3338 .await;
3339 break;
3340 }
3341
3342 let completed = latest
3343 .checkpoint
3344 .completed_nodes
3345 .iter()
3346 .cloned()
3347 .collect::<std::collections::HashSet<_>>();
3348 let pending = latest.checkpoint.pending_nodes.clone();
3349 let runnable = pending
3350 .iter()
3351 .filter_map(|node_id| {
3352 let node = automation
3353 .flow
3354 .nodes
3355 .iter()
3356 .find(|n| n.node_id == *node_id)?;
3357 if node.depends_on.iter().all(|dep| completed.contains(dep)) {
3358 Some(node.clone())
3359 } else {
3360 None
3361 }
3362 })
3363 .take(max_parallel)
3364 .collect::<Vec<_>>();
3365
3366 if runnable.is_empty() {
3367 let _ = state
3368 .update_automation_v2_run(&run.run_id, |row| {
3369 row.status = AutomationRunStatus::Failed;
3370 row.detail = Some("flow deadlock: no runnable nodes".to_string());
3371 })
3372 .await;
3373 break;
3374 }
3375
3376 let tasks = runnable
3377 .iter()
3378 .map(|node| {
3379 let Some(agent) = automation
3380 .agents
3381 .iter()
3382 .find(|a| a.agent_id == node.agent_id)
3383 .cloned()
3384 else {
3385 return futures::future::ready((
3386 node.node_id.clone(),
3387 Err(anyhow::anyhow!("agent not found")),
3388 ))
3389 .boxed();
3390 };
3391 let state = state.clone();
3392 let run_id = run.run_id.clone();
3393 let automation = automation.clone();
3394 let node = node.clone();
3395 async move {
3396 let result =
3397 execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
3398 .await;
3399 (node.node_id, result)
3400 }
3401 .boxed()
3402 })
3403 .collect::<Vec<_>>();
3404 let outcomes = join_all(tasks).await;
3405
3406 let mut any_failed = false;
3407 for (node_id, result) in outcomes {
3408 match result {
3409 Ok(output) => {
3410 let _ = state
3411 .update_automation_v2_run(&run.run_id, |row| {
3412 row.checkpoint.pending_nodes.retain(|id| id != &node_id);
3413 if !row
3414 .checkpoint
3415 .completed_nodes
3416 .iter()
3417 .any(|id| id == &node_id)
3418 {
3419 row.checkpoint.completed_nodes.push(node_id.clone());
3420 }
3421 row.checkpoint.node_outputs.insert(node_id.clone(), output);
3422 })
3423 .await;
3424 }
3425 Err(error) => {
3426 any_failed = true;
3427 let is_paused = state
3428 .get_automation_v2_run(&run.run_id)
3429 .await
3430 .map(|row| row.status == AutomationRunStatus::Paused)
3431 .unwrap_or(false);
3432 if is_paused {
3433 break;
3434 }
3435 let detail = truncate_text(&error.to_string(), 500);
3436 let _ = state
3437 .update_automation_v2_run(&run.run_id, |row| {
3438 row.status = AutomationRunStatus::Failed;
3439 row.detail = Some(detail.clone());
3440 })
3441 .await;
3442 }
3443 }
3444 }
3445 if any_failed {
3446 break;
3447 }
3448 }
3449 }
3450}
3451
3452async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
3453 let normalized_entrypoint = run.entrypoint.trim();
3454 let known_tool = state
3455 .tools
3456 .list()
3457 .await
3458 .into_iter()
3459 .any(|schema| schema.name == normalized_entrypoint);
3460 if known_tool {
3461 let args = if run.args.is_object() {
3462 run.args.clone()
3463 } else {
3464 serde_json::json!({})
3465 };
3466 return format!("/tool {} {}", normalized_entrypoint, args);
3467 }
3468
3469 if let Some(objective) = routine_objective_from_args(run) {
3470 return build_routine_mission_prompt(run, &objective);
3471 }
3472
3473 format!(
3474 "Execute routine '{}' using entrypoint '{}' with args: {}",
3475 run.routine_id, run.entrypoint, run.args
3476 )
3477}
3478
3479fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
3480 run.args
3481 .get("prompt")
3482 .and_then(|v| v.as_str())
3483 .map(str::trim)
3484 .filter(|v| !v.is_empty())
3485 .map(ToString::to_string)
3486}
3487
3488fn routine_mode_from_args(args: &Value) -> &str {
3489 args.get("mode")
3490 .and_then(|v| v.as_str())
3491 .map(str::trim)
3492 .filter(|v| !v.is_empty())
3493 .unwrap_or("standalone")
3494}
3495
3496fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
3497 args.get("success_criteria")
3498 .and_then(|v| v.as_array())
3499 .map(|rows| {
3500 rows.iter()
3501 .filter_map(|row| row.as_str())
3502 .map(str::trim)
3503 .filter(|row| !row.is_empty())
3504 .map(ToString::to_string)
3505 .collect::<Vec<_>>()
3506 })
3507 .unwrap_or_default()
3508}
3509
3510fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
3511 let mode = routine_mode_from_args(&run.args);
3512 let success_criteria = routine_success_criteria_from_args(&run.args);
3513 let orchestrator_only_tool_calls = run
3514 .args
3515 .get("orchestrator_only_tool_calls")
3516 .and_then(|v| v.as_bool())
3517 .unwrap_or(false);
3518
3519 let mut lines = vec![
3520 format!("Automation ID: {}", run.routine_id),
3521 format!("Run ID: {}", run.run_id),
3522 format!("Mode: {}", mode),
3523 format!("Mission Objective: {}", objective),
3524 ];
3525
3526 if !success_criteria.is_empty() {
3527 lines.push("Success Criteria:".to_string());
3528 for criterion in success_criteria {
3529 lines.push(format!("- {}", criterion));
3530 }
3531 }
3532
3533 if run.allowed_tools.is_empty() {
3534 lines.push("Allowed Tools: all available by current policy".to_string());
3535 } else {
3536 lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
3537 }
3538
3539 if run.output_targets.is_empty() {
3540 lines.push("Output Targets: none configured".to_string());
3541 } else {
3542 lines.push("Output Targets:".to_string());
3543 for target in &run.output_targets {
3544 lines.push(format!("- {}", target));
3545 }
3546 }
3547
3548 if mode.eq_ignore_ascii_case("orchestrated") {
3549 lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
3550 lines
3551 .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
3552 if orchestrator_only_tool_calls {
3553 lines.push(
3554 "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
3555 .to_string(),
3556 );
3557 }
3558 } else {
3559 lines.push("Execution Pattern: Standalone mission run".to_string());
3560 }
3561
3562 lines.push(
3563 "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
3564 .to_string(),
3565 );
3566
3567 lines.join("\n")
3568}
3569
3570fn truncate_text(input: &str, max_len: usize) -> String {
3571 if input.len() <= max_len {
3572 return input.to_string();
3573 }
3574 let mut out = input[..max_len].to_string();
3575 out.push_str("...<truncated>");
3576 out
3577}
3578
3579async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
3580 if run.output_targets.is_empty() {
3581 return;
3582 }
3583 for target in &run.output_targets {
3584 let artifact = RoutineRunArtifact {
3585 artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
3586 uri: target.clone(),
3587 kind: "output_target".to_string(),
3588 label: Some("configured output target".to_string()),
3589 created_at_ms: now_ms(),
3590 metadata: Some(serde_json::json!({
3591 "source": "routine.output_targets",
3592 "runID": run.run_id,
3593 "routineID": run.routine_id,
3594 })),
3595 };
3596 let _ = state
3597 .append_routine_run_artifact(&run.run_id, artifact.clone())
3598 .await;
3599 state.event_bus.publish(EngineEvent::new(
3600 "routine.run.artifact_added",
3601 serde_json::json!({
3602 "runID": run.run_id,
3603 "routineID": run.routine_id,
3604 "artifact": artifact,
3605 }),
3606 ));
3607 }
3608}
3609
3610fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
3611 let obj = value.as_object()?;
3612 let provider_id = obj.get("provider_id")?.as_str()?.trim();
3613 let model_id = obj.get("model_id")?.as_str()?.trim();
3614 if provider_id.is_empty() || model_id.is_empty() {
3615 return None;
3616 }
3617 Some(ModelSpec {
3618 provider_id: provider_id.to_string(),
3619 model_id: model_id.to_string(),
3620 })
3621}
3622
3623fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
3624 args.get("model_policy")
3625 .and_then(|v| v.get("role_models"))
3626 .and_then(|v| v.get(role))
3627 .and_then(parse_model_spec)
3628}
3629
3630fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
3631 args.get("model_policy")
3632 .and_then(|v| v.get("default_model"))
3633 .and_then(parse_model_spec)
3634}
3635
3636fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
3637 let provider_id = config
3638 .get("default_provider")
3639 .and_then(|v| v.as_str())
3640 .map(str::trim)
3641 .filter(|v| !v.is_empty())?;
3642 let model_id = config
3643 .get("providers")
3644 .and_then(|v| v.get(provider_id))
3645 .and_then(|v| v.get("default_model"))
3646 .and_then(|v| v.as_str())
3647 .map(str::trim)
3648 .filter(|v| !v.is_empty())?;
3649 Some(ModelSpec {
3650 provider_id: provider_id.to_string(),
3651 model_id: model_id.to_string(),
3652 })
3653}
3654
3655fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
3656 providers.iter().any(|provider| {
3657 provider.id == spec.provider_id
3658 && provider
3659 .models
3660 .iter()
3661 .any(|model| model.id == spec.model_id)
3662 })
3663}
3664
3665async fn resolve_routine_model_spec_for_run(
3666 state: &AppState,
3667 run: &RoutineRunRecord,
3668) -> (Option<ModelSpec>, String) {
3669 let providers = state.providers.list().await;
3670 let mode = routine_mode_from_args(&run.args);
3671 let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
3672
3673 if mode.eq_ignore_ascii_case("orchestrated") {
3674 if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
3675 requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
3676 }
3677 }
3678 if let Some(default_model) = default_model_spec_from_args(&run.args) {
3679 requested.push((default_model, "args.model_policy.default_model"));
3680 }
3681 let effective_config = state.config.get_effective_value().await;
3682 if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
3683 requested.push((config_default, "config.default_provider"));
3684 }
3685
3686 for (candidate, source) in requested {
3687 if provider_catalog_has_model(&providers, &candidate) {
3688 return (Some(candidate), source.to_string());
3689 }
3690 }
3691
3692 let fallback = providers
3693 .into_iter()
3694 .find(|provider| !provider.models.is_empty())
3695 .and_then(|provider| {
3696 let model = provider.models.first()?;
3697 Some(ModelSpec {
3698 provider_id: provider.id,
3699 model_id: model.id.clone(),
3700 })
3701 });
3702
3703 (fallback, "provider_catalog_fallback".to_string())
3704}
3705
3706#[cfg(test)]
3707mod tests {
3708 use super::*;
3709
3710 fn test_state_with_path(path: PathBuf) -> AppState {
3711 let mut state = AppState::new_starting("test-attempt".to_string(), true);
3712 state.shared_resources_path = path;
3713 state.routines_path = tmp_routines_file("shared-state");
3714 state.routine_history_path = tmp_routines_file("routine-history");
3715 state.routine_runs_path = tmp_routines_file("routine-runs");
3716 state
3717 }
3718
3719 fn tmp_resource_file(name: &str) -> PathBuf {
3720 std::env::temp_dir().join(format!(
3721 "tandem-server-{name}-{}.json",
3722 uuid::Uuid::new_v4()
3723 ))
3724 }
3725
3726 fn tmp_routines_file(name: &str) -> PathBuf {
3727 std::env::temp_dir().join(format!(
3728 "tandem-server-routines-{name}-{}.json",
3729 uuid::Uuid::new_v4()
3730 ))
3731 }
3732
3733 #[test]
3734 fn default_model_spec_from_effective_config_reads_default_route() {
3735 let cfg = serde_json::json!({
3736 "default_provider": "openrouter",
3737 "providers": {
3738 "openrouter": {
3739 "default_model": "google/gemini-3-flash-preview"
3740 }
3741 }
3742 });
3743 let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
3744 assert_eq!(spec.provider_id, "openrouter");
3745 assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
3746 }
3747
3748 #[test]
3749 fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
3750 let missing_provider = serde_json::json!({
3751 "providers": {
3752 "openrouter": {
3753 "default_model": "google/gemini-3-flash-preview"
3754 }
3755 }
3756 });
3757 assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
3758
3759 let missing_model = serde_json::json!({
3760 "default_provider": "openrouter",
3761 "providers": {
3762 "openrouter": {}
3763 }
3764 });
3765 assert!(default_model_spec_from_effective_config(&missing_model).is_none());
3766 }
3767
3768 #[tokio::test]
3769 async fn shared_resource_put_increments_revision() {
3770 let path = tmp_resource_file("shared-resource-put");
3771 let state = test_state_with_path(path.clone());
3772
3773 let first = state
3774 .put_shared_resource(
3775 "project/demo/board".to_string(),
3776 serde_json::json!({"status":"todo"}),
3777 None,
3778 "agent-1".to_string(),
3779 None,
3780 )
3781 .await
3782 .expect("first put");
3783 assert_eq!(first.rev, 1);
3784
3785 let second = state
3786 .put_shared_resource(
3787 "project/demo/board".to_string(),
3788 serde_json::json!({"status":"doing"}),
3789 Some(1),
3790 "agent-2".to_string(),
3791 Some(60_000),
3792 )
3793 .await
3794 .expect("second put");
3795 assert_eq!(second.rev, 2);
3796 assert_eq!(second.updated_by, "agent-2");
3797 assert_eq!(second.ttl_ms, Some(60_000));
3798
3799 let raw = tokio::fs::read_to_string(path.clone())
3800 .await
3801 .expect("persisted");
3802 assert!(raw.contains("\"rev\": 2"));
3803 let _ = tokio::fs::remove_file(path).await;
3804 }
3805
3806 #[tokio::test]
3807 async fn shared_resource_put_detects_revision_conflict() {
3808 let path = tmp_resource_file("shared-resource-conflict");
3809 let state = test_state_with_path(path.clone());
3810
3811 let _ = state
3812 .put_shared_resource(
3813 "mission/demo/card-1".to_string(),
3814 serde_json::json!({"title":"Card 1"}),
3815 None,
3816 "agent-1".to_string(),
3817 None,
3818 )
3819 .await
3820 .expect("seed put");
3821
3822 let conflict = state
3823 .put_shared_resource(
3824 "mission/demo/card-1".to_string(),
3825 serde_json::json!({"title":"Card 1 edited"}),
3826 Some(99),
3827 "agent-2".to_string(),
3828 None,
3829 )
3830 .await
3831 .expect_err("expected conflict");
3832
3833 match conflict {
3834 ResourceStoreError::RevisionConflict(conflict) => {
3835 assert_eq!(conflict.expected_rev, Some(99));
3836 assert_eq!(conflict.current_rev, Some(1));
3837 }
3838 other => panic!("unexpected error: {other:?}"),
3839 }
3840
3841 let _ = tokio::fs::remove_file(path).await;
3842 }
3843
3844 #[tokio::test]
3845 async fn shared_resource_rejects_invalid_namespace_key() {
3846 let path = tmp_resource_file("shared-resource-invalid-key");
3847 let state = test_state_with_path(path.clone());
3848
3849 let error = state
3850 .put_shared_resource(
3851 "global/demo/key".to_string(),
3852 serde_json::json!({"x":1}),
3853 None,
3854 "agent-1".to_string(),
3855 None,
3856 )
3857 .await
3858 .expect_err("invalid key should fail");
3859
3860 match error {
3861 ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
3862 other => panic!("unexpected error: {other:?}"),
3863 }
3864
3865 assert!(!path.exists());
3866 }
3867
3868 #[test]
3869 fn derive_status_index_update_for_run_started() {
3870 let event = EngineEvent::new(
3871 "session.run.started",
3872 serde_json::json!({
3873 "sessionID": "s-1",
3874 "runID": "r-1"
3875 }),
3876 );
3877 let update = derive_status_index_update(&event).expect("update");
3878 assert_eq!(update.key, "run/s-1/status");
3879 assert_eq!(
3880 update.value.get("state").and_then(|v| v.as_str()),
3881 Some("running")
3882 );
3883 assert_eq!(
3884 update.value.get("phase").and_then(|v| v.as_str()),
3885 Some("run")
3886 );
3887 }
3888
3889 #[test]
3890 fn derive_status_index_update_for_tool_invocation() {
3891 let event = EngineEvent::new(
3892 "message.part.updated",
3893 serde_json::json!({
3894 "sessionID": "s-2",
3895 "runID": "r-2",
3896 "part": { "type": "tool-invocation", "tool": "todo_write" }
3897 }),
3898 );
3899 let update = derive_status_index_update(&event).expect("update");
3900 assert_eq!(update.key, "run/s-2/status");
3901 assert_eq!(
3902 update.value.get("phase").and_then(|v| v.as_str()),
3903 Some("tool")
3904 );
3905 assert_eq!(
3906 update.value.get("toolActive").and_then(|v| v.as_bool()),
3907 Some(true)
3908 );
3909 assert_eq!(
3910 update.value.get("tool").and_then(|v| v.as_str()),
3911 Some("todo_write")
3912 );
3913 }
3914
3915 #[test]
3916 fn misfire_skip_drops_runs_and_advances_next_fire() {
3917 let (count, next_fire) =
3918 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
3919 assert_eq!(count, 0);
3920 assert_eq!(next_fire, 11_000);
3921 }
3922
3923 #[test]
3924 fn misfire_run_once_emits_single_trigger() {
3925 let (count, next_fire) =
3926 compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
3927 assert_eq!(count, 1);
3928 assert_eq!(next_fire, 11_000);
3929 }
3930
3931 #[test]
3932 fn misfire_catch_up_caps_trigger_count() {
3933 let (count, next_fire) = compute_misfire_plan(
3934 25_000,
3935 5_000,
3936 1_000,
3937 &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
3938 );
3939 assert_eq!(count, 3);
3940 assert_eq!(next_fire, 26_000);
3941 }
3942
3943 #[tokio::test]
3944 async fn routine_put_persists_and_loads() {
3945 let routines_path = tmp_routines_file("persist-load");
3946 let mut state = AppState::new_starting("routines-put".to_string(), true);
3947 state.routines_path = routines_path.clone();
3948
3949 let routine = RoutineSpec {
3950 routine_id: "routine-1".to_string(),
3951 name: "Digest".to_string(),
3952 status: RoutineStatus::Active,
3953 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
3954 timezone: "UTC".to_string(),
3955 misfire_policy: RoutineMisfirePolicy::RunOnce,
3956 entrypoint: "mission.default".to_string(),
3957 args: serde_json::json!({"topic":"status"}),
3958 allowed_tools: vec![],
3959 output_targets: vec![],
3960 creator_type: "user".to_string(),
3961 creator_id: "user-1".to_string(),
3962 requires_approval: true,
3963 external_integrations_allowed: false,
3964 next_fire_at_ms: Some(5_000),
3965 last_fired_at_ms: None,
3966 };
3967
3968 state.put_routine(routine).await.expect("store routine");
3969
3970 let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
3971 reloaded.routines_path = routines_path.clone();
3972 reloaded.load_routines().await.expect("load routines");
3973 let list = reloaded.list_routines().await;
3974 assert_eq!(list.len(), 1);
3975 assert_eq!(list[0].routine_id, "routine-1");
3976
3977 let _ = tokio::fs::remove_file(routines_path).await;
3978 }
3979
3980 #[tokio::test]
3981 async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
3982 let routines_path = tmp_routines_file("misfire-eval");
3983 let mut state = AppState::new_starting("routines-eval".to_string(), true);
3984 state.routines_path = routines_path.clone();
3985
3986 let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
3987 routine_id: id.to_string(),
3988 name: id.to_string(),
3989 status: RoutineStatus::Active,
3990 schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
3991 timezone: "UTC".to_string(),
3992 misfire_policy: policy,
3993 entrypoint: "mission.default".to_string(),
3994 args: serde_json::json!({}),
3995 allowed_tools: vec![],
3996 output_targets: vec![],
3997 creator_type: "user".to_string(),
3998 creator_id: "u-1".to_string(),
3999 requires_approval: false,
4000 external_integrations_allowed: false,
4001 next_fire_at_ms: Some(5_000),
4002 last_fired_at_ms: None,
4003 };
4004
4005 state
4006 .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
4007 .await
4008 .expect("put skip");
4009 state
4010 .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
4011 .await
4012 .expect("put once");
4013 state
4014 .put_routine(base(
4015 "routine-catch",
4016 RoutineMisfirePolicy::CatchUp { max_runs: 3 },
4017 ))
4018 .await
4019 .expect("put catch");
4020
4021 let plans = state.evaluate_routine_misfires(10_500).await;
4022 let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
4023 let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
4024 let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
4025
4026 assert!(plan_skip.is_none());
4027 assert_eq!(plan_once.map(|p| p.run_count), Some(1));
4028 assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
4029
4030 let stored = state.list_routines().await;
4031 let skip_next = stored
4032 .iter()
4033 .find(|r| r.routine_id == "routine-skip")
4034 .and_then(|r| r.next_fire_at_ms)
4035 .expect("skip next");
4036 assert!(skip_next > 10_500);
4037
4038 let _ = tokio::fs::remove_file(routines_path).await;
4039 }
4040
4041 #[test]
4042 fn routine_policy_blocks_external_side_effects_by_default() {
4043 let routine = RoutineSpec {
4044 routine_id: "routine-policy-1".to_string(),
4045 name: "Connector routine".to_string(),
4046 status: RoutineStatus::Active,
4047 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4048 timezone: "UTC".to_string(),
4049 misfire_policy: RoutineMisfirePolicy::RunOnce,
4050 entrypoint: "connector.email.reply".to_string(),
4051 args: serde_json::json!({}),
4052 allowed_tools: vec![],
4053 output_targets: vec![],
4054 creator_type: "user".to_string(),
4055 creator_id: "u-1".to_string(),
4056 requires_approval: true,
4057 external_integrations_allowed: false,
4058 next_fire_at_ms: None,
4059 last_fired_at_ms: None,
4060 };
4061
4062 let decision = evaluate_routine_execution_policy(&routine, "manual");
4063 assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
4064 }
4065
4066 #[test]
4067 fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
4068 let routine = RoutineSpec {
4069 routine_id: "routine-policy-2".to_string(),
4070 name: "Connector routine".to_string(),
4071 status: RoutineStatus::Active,
4072 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4073 timezone: "UTC".to_string(),
4074 misfire_policy: RoutineMisfirePolicy::RunOnce,
4075 entrypoint: "connector.email.reply".to_string(),
4076 args: serde_json::json!({}),
4077 allowed_tools: vec![],
4078 output_targets: vec![],
4079 creator_type: "user".to_string(),
4080 creator_id: "u-1".to_string(),
4081 requires_approval: true,
4082 external_integrations_allowed: true,
4083 next_fire_at_ms: None,
4084 last_fired_at_ms: None,
4085 };
4086
4087 let decision = evaluate_routine_execution_policy(&routine, "manual");
4088 assert!(matches!(
4089 decision,
4090 RoutineExecutionDecision::RequiresApproval { .. }
4091 ));
4092 }
4093
4094 #[test]
4095 fn routine_policy_allows_non_external_entrypoints() {
4096 let routine = RoutineSpec {
4097 routine_id: "routine-policy-3".to_string(),
4098 name: "Internal mission routine".to_string(),
4099 status: RoutineStatus::Active,
4100 schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4101 timezone: "UTC".to_string(),
4102 misfire_policy: RoutineMisfirePolicy::RunOnce,
4103 entrypoint: "mission.default".to_string(),
4104 args: serde_json::json!({}),
4105 allowed_tools: vec![],
4106 output_targets: vec![],
4107 creator_type: "user".to_string(),
4108 creator_id: "u-1".to_string(),
4109 requires_approval: true,
4110 external_integrations_allowed: false,
4111 next_fire_at_ms: None,
4112 last_fired_at_ms: None,
4113 };
4114
4115 let decision = evaluate_routine_execution_policy(&routine, "manual");
4116 assert_eq!(decision, RoutineExecutionDecision::Allowed);
4117 }
4118
4119 #[tokio::test]
4120 async fn claim_next_queued_routine_run_marks_oldest_running() {
4121 let mut state = AppState::new_starting("routine-claim".to_string(), true);
4122 state.routine_runs_path = tmp_routines_file("routine-claim-runs");
4123
4124 let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
4125 run_id: run_id.to_string(),
4126 routine_id: "routine-claim".to_string(),
4127 trigger_type: "manual".to_string(),
4128 run_count: 1,
4129 status: RoutineRunStatus::Queued,
4130 created_at_ms,
4131 updated_at_ms: created_at_ms,
4132 fired_at_ms: Some(created_at_ms),
4133 started_at_ms: None,
4134 finished_at_ms: None,
4135 requires_approval: false,
4136 approval_reason: None,
4137 denial_reason: None,
4138 paused_reason: None,
4139 detail: None,
4140 entrypoint: "mission.default".to_string(),
4141 args: serde_json::json!({}),
4142 allowed_tools: vec![],
4143 output_targets: vec![],
4144 artifacts: vec![],
4145 active_session_ids: vec![],
4146 prompt_tokens: 0,
4147 completion_tokens: 0,
4148 total_tokens: 0,
4149 estimated_cost_usd: 0.0,
4150 };
4151
4152 {
4153 let mut guard = state.routine_runs.write().await;
4154 guard.insert("run-late".to_string(), mk("run-late", 2_000));
4155 guard.insert("run-early".to_string(), mk("run-early", 1_000));
4156 }
4157 state.persist_routine_runs().await.expect("persist");
4158
4159 let claimed = state
4160 .claim_next_queued_routine_run()
4161 .await
4162 .expect("claimed run");
4163 assert_eq!(claimed.run_id, "run-early");
4164 assert_eq!(claimed.status, RoutineRunStatus::Running);
4165 assert!(claimed.started_at_ms.is_some());
4166 }
4167
4168 #[tokio::test]
4169 async fn routine_session_policy_roundtrip_normalizes_tools() {
4170 let state = AppState::new_starting("routine-policy-hook".to_string(), true);
4171 state
4172 .set_routine_session_policy(
4173 "session-routine-1".to_string(),
4174 "run-1".to_string(),
4175 "routine-1".to_string(),
4176 vec![
4177 "read".to_string(),
4178 " mcp.arcade.search ".to_string(),
4179 "read".to_string(),
4180 "".to_string(),
4181 ],
4182 )
4183 .await;
4184
4185 let policy = state
4186 .routine_session_policy("session-routine-1")
4187 .await
4188 .expect("policy");
4189 assert_eq!(
4190 policy.allowed_tools,
4191 vec!["read".to_string(), "mcp.arcade.search".to_string()]
4192 );
4193 }
4194
4195 #[test]
4196 fn routine_mission_prompt_includes_orchestrated_contract() {
4197 let run = RoutineRunRecord {
4198 run_id: "run-orchestrated-1".to_string(),
4199 routine_id: "automation-orchestrated".to_string(),
4200 trigger_type: "manual".to_string(),
4201 run_count: 1,
4202 status: RoutineRunStatus::Queued,
4203 created_at_ms: 1_000,
4204 updated_at_ms: 1_000,
4205 fired_at_ms: Some(1_000),
4206 started_at_ms: None,
4207 finished_at_ms: None,
4208 requires_approval: true,
4209 approval_reason: None,
4210 denial_reason: None,
4211 paused_reason: None,
4212 detail: None,
4213 entrypoint: "mission.default".to_string(),
4214 args: serde_json::json!({
4215 "prompt": "Coordinate a multi-step release readiness check.",
4216 "mode": "orchestrated",
4217 "success_criteria": ["All blockers listed", "Output artifact written"],
4218 "orchestrator_only_tool_calls": true
4219 }),
4220 allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
4221 output_targets: vec!["file://reports/release-readiness.md".to_string()],
4222 artifacts: vec![],
4223 active_session_ids: vec![],
4224 prompt_tokens: 0,
4225 completion_tokens: 0,
4226 total_tokens: 0,
4227 estimated_cost_usd: 0.0,
4228 };
4229
4230 let objective = routine_objective_from_args(&run).expect("objective");
4231 let prompt = build_routine_mission_prompt(&run, &objective);
4232
4233 assert!(prompt.contains("Mode: orchestrated"));
4234 assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
4235 assert!(prompt.contains("only the orchestrator may execute tools"));
4236 assert!(prompt.contains("Allowed Tools: read, webfetch"));
4237 assert!(prompt.contains("file://reports/release-readiness.md"));
4238 }
4239
4240 #[test]
4241 fn routine_mission_prompt_includes_standalone_defaults() {
4242 let run = RoutineRunRecord {
4243 run_id: "run-standalone-1".to_string(),
4244 routine_id: "automation-standalone".to_string(),
4245 trigger_type: "manual".to_string(),
4246 run_count: 1,
4247 status: RoutineRunStatus::Queued,
4248 created_at_ms: 2_000,
4249 updated_at_ms: 2_000,
4250 fired_at_ms: Some(2_000),
4251 started_at_ms: None,
4252 finished_at_ms: None,
4253 requires_approval: false,
4254 approval_reason: None,
4255 denial_reason: None,
4256 paused_reason: None,
4257 detail: None,
4258 entrypoint: "mission.default".to_string(),
4259 args: serde_json::json!({
4260 "prompt": "Summarize top engineering updates.",
4261 "success_criteria": ["Three bullet summary"]
4262 }),
4263 allowed_tools: vec![],
4264 output_targets: vec![],
4265 artifacts: vec![],
4266 active_session_ids: vec![],
4267 prompt_tokens: 0,
4268 completion_tokens: 0,
4269 total_tokens: 0,
4270 estimated_cost_usd: 0.0,
4271 };
4272
4273 let objective = routine_objective_from_args(&run).expect("objective");
4274 let prompt = build_routine_mission_prompt(&run, &objective);
4275
4276 assert!(prompt.contains("Mode: standalone"));
4277 assert!(prompt.contains("Execution Pattern: Standalone mission run"));
4278 assert!(prompt.contains("Allowed Tools: all available by current policy"));
4279 assert!(prompt.contains("Output Targets: none configured"));
4280 }
4281
4282 #[test]
4283 fn shared_resource_key_validator_accepts_swarm_active_tasks() {
4284 assert!(is_valid_resource_key("swarm.active_tasks"));
4285 assert!(is_valid_resource_key("project/demo"));
4286 assert!(!is_valid_resource_key("swarm//active_tasks"));
4287 assert!(!is_valid_resource_key("misc/demo"));
4288 }
4289}