Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22    EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
23    SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30    resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31    PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38
39mod agent_teams;
40mod http;
41pub mod webui;
42
43pub use agent_teams::AgentTeamRuntime;
44pub use http::serve;
45
46#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47pub struct ChannelStatus {
48    pub enabled: bool,
49    pub connected: bool,
50    pub last_error: Option<String>,
51    pub active_sessions: u64,
52    pub meta: Value,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, Default)]
56pub struct WebUiConfig {
57    #[serde(default)]
58    pub enabled: bool,
59    #[serde(default = "default_web_ui_prefix")]
60    pub path_prefix: String,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, Default)]
64pub struct ChannelsConfigFile {
65    pub telegram: Option<TelegramConfigFile>,
66    pub discord: Option<DiscordConfigFile>,
67    pub slack: Option<SlackConfigFile>,
68    #[serde(default)]
69    pub tool_policy: tandem_channels::config::ChannelToolPolicy,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct TelegramConfigFile {
74    pub bot_token: String,
75    #[serde(default = "default_allow_all")]
76    pub allowed_users: Vec<String>,
77    #[serde(default)]
78    pub mention_only: bool,
79    #[serde(default)]
80    pub style_profile: tandem_channels::config::TelegramStyleProfile,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct DiscordConfigFile {
85    pub bot_token: String,
86    #[serde(default)]
87    pub guild_id: Option<String>,
88    #[serde(default = "default_allow_all")]
89    pub allowed_users: Vec<String>,
90    #[serde(default = "default_discord_mention_only")]
91    pub mention_only: bool,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct SlackConfigFile {
96    pub bot_token: String,
97    pub channel_id: String,
98    #[serde(default = "default_allow_all")]
99    pub allowed_users: Vec<String>,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize, Default)]
103struct EffectiveAppConfig {
104    #[serde(default)]
105    pub channels: ChannelsConfigFile,
106    #[serde(default)]
107    pub web_ui: WebUiConfig,
108    #[serde(default)]
109    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
110}
111
112#[derive(Default)]
113pub struct ChannelRuntime {
114    pub listeners: Option<tokio::task::JoinSet<()>>,
115    pub statuses: std::collections::HashMap<String, ChannelStatus>,
116}
117
118#[derive(Debug, Clone)]
119pub struct EngineLease {
120    pub lease_id: String,
121    pub client_id: String,
122    pub client_type: String,
123    pub acquired_at_ms: u64,
124    pub last_renewed_at_ms: u64,
125    pub ttl_ms: u64,
126}
127
128impl EngineLease {
129    pub fn is_expired(&self, now_ms: u64) -> bool {
130        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
131    }
132}
133
134#[derive(Debug, Clone, Serialize)]
135pub struct ActiveRun {
136    #[serde(rename = "runID")]
137    pub run_id: String,
138    #[serde(rename = "startedAtMs")]
139    pub started_at_ms: u64,
140    #[serde(rename = "lastActivityAtMs")]
141    pub last_activity_at_ms: u64,
142    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
143    pub client_id: Option<String>,
144    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
145    pub agent_id: Option<String>,
146    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
147    pub agent_profile: Option<String>,
148}
149
150#[derive(Clone, Default)]
151pub struct RunRegistry {
152    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
153}
154
155impl RunRegistry {
156    pub fn new() -> Self {
157        Self::default()
158    }
159
160    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
161        self.active.read().await.get(session_id).cloned()
162    }
163
164    pub async fn acquire(
165        &self,
166        session_id: &str,
167        run_id: String,
168        client_id: Option<String>,
169        agent_id: Option<String>,
170        agent_profile: Option<String>,
171    ) -> std::result::Result<ActiveRun, ActiveRun> {
172        let mut guard = self.active.write().await;
173        if let Some(existing) = guard.get(session_id).cloned() {
174            return Err(existing);
175        }
176        let now = now_ms();
177        let run = ActiveRun {
178            run_id,
179            started_at_ms: now,
180            last_activity_at_ms: now,
181            client_id,
182            agent_id,
183            agent_profile,
184        };
185        guard.insert(session_id.to_string(), run.clone());
186        Ok(run)
187    }
188
189    pub async fn touch(&self, session_id: &str, run_id: &str) {
190        let mut guard = self.active.write().await;
191        if let Some(run) = guard.get_mut(session_id) {
192            if run.run_id == run_id {
193                run.last_activity_at_ms = now_ms();
194            }
195        }
196    }
197
198    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
199        let mut guard = self.active.write().await;
200        if let Some(run) = guard.get(session_id) {
201            if run.run_id == run_id {
202                return guard.remove(session_id);
203            }
204        }
205        None
206    }
207
208    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
209        self.active.write().await.remove(session_id)
210    }
211
212    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
213        let now = now_ms();
214        let mut guard = self.active.write().await;
215        let stale_ids = guard
216            .iter()
217            .filter_map(|(session_id, run)| {
218                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
219                    Some(session_id.clone())
220                } else {
221                    None
222                }
223            })
224            .collect::<Vec<_>>();
225        let mut out = Vec::with_capacity(stale_ids.len());
226        for session_id in stale_ids {
227            if let Some(run) = guard.remove(&session_id) {
228                out.push((session_id, run));
229            }
230        }
231        out
232    }
233}
234
235pub fn now_ms() -> u64 {
236    SystemTime::now()
237        .duration_since(UNIX_EPOCH)
238        .map(|d| d.as_millis() as u64)
239        .unwrap_or(0)
240}
241
242pub fn build_id() -> String {
243    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
244        let trimmed = explicit.trim();
245        if !trimmed.is_empty() {
246            return trimmed.to_string();
247        }
248    }
249    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
250        let trimmed = git_sha.trim();
251        if !trimmed.is_empty() {
252            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
253        }
254    }
255    env!("CARGO_PKG_VERSION").to_string()
256}
257
258pub fn detect_host_runtime_context() -> HostRuntimeContext {
259    let os = if cfg!(target_os = "windows") {
260        HostOs::Windows
261    } else if cfg!(target_os = "macos") {
262        HostOs::Macos
263    } else {
264        HostOs::Linux
265    };
266    let (shell_family, path_style) = match os {
267        HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
268        HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
269    };
270    HostRuntimeContext {
271        os,
272        arch: std::env::consts::ARCH.to_string(),
273        shell_family,
274        path_style,
275    }
276}
277
278pub fn binary_path_for_health() -> Option<String> {
279    #[cfg(debug_assertions)]
280    {
281        std::env::current_exe()
282            .ok()
283            .map(|p| p.to_string_lossy().to_string())
284    }
285    #[cfg(not(debug_assertions))]
286    {
287        None
288    }
289}
290
291#[derive(Clone)]
292pub struct RuntimeState {
293    pub storage: Arc<Storage>,
294    pub config: ConfigStore,
295    pub event_bus: EventBus,
296    pub providers: ProviderRegistry,
297    pub plugins: PluginRegistry,
298    pub agents: AgentRegistry,
299    pub tools: ToolRegistry,
300    pub permissions: PermissionManager,
301    pub mcp: McpRegistry,
302    pub pty: PtyManager,
303    pub lsp: LspManager,
304    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
305    pub logs: Arc<RwLock<Vec<Value>>>,
306    pub workspace_index: WorkspaceIndex,
307    pub cancellations: CancellationRegistry,
308    pub engine_loop: EngineLoop,
309    pub host_runtime_context: HostRuntimeContext,
310}
311
312#[derive(Debug, Clone)]
313pub struct GovernedMemoryRecord {
314    pub id: String,
315    pub run_id: String,
316    pub partition: MemoryPartition,
317    pub kind: MemoryContentKind,
318    pub content: String,
319    pub artifact_refs: Vec<String>,
320    pub classification: MemoryClassification,
321    pub metadata: Option<Value>,
322    pub source_memory_id: Option<String>,
323    pub created_at_ms: u64,
324}
325
326#[derive(Debug, Clone, Serialize)]
327pub struct MemoryAuditEvent {
328    pub audit_id: String,
329    pub action: String,
330    pub run_id: String,
331    pub memory_id: Option<String>,
332    pub source_memory_id: Option<String>,
333    pub to_tier: Option<GovernedMemoryTier>,
334    pub partition_key: String,
335    pub actor: String,
336    pub status: String,
337    #[serde(skip_serializing_if = "Option::is_none")]
338    pub detail: Option<String>,
339    pub created_at_ms: u64,
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct SharedResourceRecord {
344    pub key: String,
345    pub value: Value,
346    pub rev: u64,
347    pub updated_at_ms: u64,
348    pub updated_by: String,
349    #[serde(skip_serializing_if = "Option::is_none")]
350    pub ttl_ms: Option<u64>,
351}
352
353#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
354#[serde(rename_all = "snake_case")]
355pub enum RoutineSchedule {
356    IntervalSeconds { seconds: u64 },
357    Cron { expression: String },
358}
359
360#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
361#[serde(rename_all = "snake_case", tag = "type")]
362pub enum RoutineMisfirePolicy {
363    Skip,
364    RunOnce,
365    CatchUp { max_runs: u32 },
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
369#[serde(rename_all = "snake_case")]
370pub enum RoutineStatus {
371    Active,
372    Paused,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct RoutineSpec {
377    pub routine_id: String,
378    pub name: String,
379    pub status: RoutineStatus,
380    pub schedule: RoutineSchedule,
381    pub timezone: String,
382    pub misfire_policy: RoutineMisfirePolicy,
383    pub entrypoint: String,
384    #[serde(default)]
385    pub args: Value,
386    #[serde(default)]
387    pub allowed_tools: Vec<String>,
388    #[serde(default)]
389    pub output_targets: Vec<String>,
390    pub creator_type: String,
391    pub creator_id: String,
392    pub requires_approval: bool,
393    pub external_integrations_allowed: bool,
394    #[serde(default, skip_serializing_if = "Option::is_none")]
395    pub next_fire_at_ms: Option<u64>,
396    #[serde(default, skip_serializing_if = "Option::is_none")]
397    pub last_fired_at_ms: Option<u64>,
398}
399
400#[derive(Debug, Clone, Serialize, Deserialize)]
401pub struct RoutineHistoryEvent {
402    pub routine_id: String,
403    pub trigger_type: String,
404    pub run_count: u32,
405    pub fired_at_ms: u64,
406    pub status: String,
407    #[serde(default, skip_serializing_if = "Option::is_none")]
408    pub detail: Option<String>,
409}
410
411#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
412#[serde(rename_all = "snake_case")]
413pub enum RoutineRunStatus {
414    Queued,
415    PendingApproval,
416    Running,
417    Paused,
418    BlockedPolicy,
419    Denied,
420    Completed,
421    Failed,
422    Cancelled,
423}
424
425#[derive(Debug, Clone, Serialize, Deserialize)]
426pub struct RoutineRunArtifact {
427    pub artifact_id: String,
428    pub uri: String,
429    pub kind: String,
430    #[serde(default, skip_serializing_if = "Option::is_none")]
431    pub label: Option<String>,
432    pub created_at_ms: u64,
433    #[serde(default, skip_serializing_if = "Option::is_none")]
434    pub metadata: Option<Value>,
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
438pub struct RoutineRunRecord {
439    pub run_id: String,
440    pub routine_id: String,
441    pub trigger_type: String,
442    pub run_count: u32,
443    pub status: RoutineRunStatus,
444    pub created_at_ms: u64,
445    pub updated_at_ms: u64,
446    #[serde(default, skip_serializing_if = "Option::is_none")]
447    pub fired_at_ms: Option<u64>,
448    #[serde(default, skip_serializing_if = "Option::is_none")]
449    pub started_at_ms: Option<u64>,
450    #[serde(default, skip_serializing_if = "Option::is_none")]
451    pub finished_at_ms: Option<u64>,
452    pub requires_approval: bool,
453    #[serde(default, skip_serializing_if = "Option::is_none")]
454    pub approval_reason: Option<String>,
455    #[serde(default, skip_serializing_if = "Option::is_none")]
456    pub denial_reason: Option<String>,
457    #[serde(default, skip_serializing_if = "Option::is_none")]
458    pub paused_reason: Option<String>,
459    #[serde(default, skip_serializing_if = "Option::is_none")]
460    pub detail: Option<String>,
461    pub entrypoint: String,
462    #[serde(default)]
463    pub args: Value,
464    #[serde(default)]
465    pub allowed_tools: Vec<String>,
466    #[serde(default)]
467    pub output_targets: Vec<String>,
468    #[serde(default)]
469    pub artifacts: Vec<RoutineRunArtifact>,
470    #[serde(default)]
471    pub active_session_ids: Vec<String>,
472    #[serde(default)]
473    pub prompt_tokens: u64,
474    #[serde(default)]
475    pub completion_tokens: u64,
476    #[serde(default)]
477    pub total_tokens: u64,
478    #[serde(default)]
479    pub estimated_cost_usd: f64,
480}
481
482#[derive(Debug, Clone)]
483pub struct RoutineSessionPolicy {
484    pub session_id: String,
485    pub run_id: String,
486    pub routine_id: String,
487    pub allowed_tools: Vec<String>,
488}
489
490#[derive(Debug, Clone, Serialize)]
491pub struct RoutineTriggerPlan {
492    pub routine_id: String,
493    pub run_count: u32,
494    pub scheduled_at_ms: u64,
495    pub next_fire_at_ms: u64,
496}
497
498#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
499#[serde(rename_all = "snake_case")]
500pub enum AutomationV2Status {
501    Active,
502    Paused,
503    Draft,
504}
505
506#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
507#[serde(rename_all = "snake_case")]
508pub enum AutomationV2ScheduleType {
509    Cron,
510    Interval,
511    Manual,
512}
513
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct AutomationV2Schedule {
516    #[serde(rename = "type")]
517    pub schedule_type: AutomationV2ScheduleType,
518    #[serde(default, skip_serializing_if = "Option::is_none")]
519    pub cron_expression: Option<String>,
520    #[serde(default, skip_serializing_if = "Option::is_none")]
521    pub interval_seconds: Option<u64>,
522    pub timezone: String,
523    pub misfire_policy: RoutineMisfirePolicy,
524}
525
526#[derive(Debug, Clone, Serialize, Deserialize)]
527pub struct AutomationAgentToolPolicy {
528    #[serde(default)]
529    pub allowlist: Vec<String>,
530    #[serde(default)]
531    pub denylist: Vec<String>,
532}
533
534#[derive(Debug, Clone, Serialize, Deserialize)]
535pub struct AutomationAgentMcpPolicy {
536    #[serde(default)]
537    pub allowed_servers: Vec<String>,
538    #[serde(default, skip_serializing_if = "Option::is_none")]
539    pub allowed_tools: Option<Vec<String>>,
540}
541
542#[derive(Debug, Clone, Serialize, Deserialize)]
543pub struct AutomationAgentProfile {
544    pub agent_id: String,
545    #[serde(default, skip_serializing_if = "Option::is_none")]
546    pub template_id: Option<String>,
547    pub display_name: String,
548    #[serde(default, skip_serializing_if = "Option::is_none")]
549    pub avatar_url: Option<String>,
550    #[serde(default, skip_serializing_if = "Option::is_none")]
551    pub model_policy: Option<Value>,
552    #[serde(default)]
553    pub skills: Vec<String>,
554    pub tool_policy: AutomationAgentToolPolicy,
555    pub mcp_policy: AutomationAgentMcpPolicy,
556    #[serde(default, skip_serializing_if = "Option::is_none")]
557    pub approval_policy: Option<String>,
558}
559
560#[derive(Debug, Clone, Serialize, Deserialize)]
561pub struct AutomationFlowNode {
562    pub node_id: String,
563    pub agent_id: String,
564    pub objective: String,
565    #[serde(default)]
566    pub depends_on: Vec<String>,
567    #[serde(default, skip_serializing_if = "Option::is_none")]
568    pub retry_policy: Option<Value>,
569    #[serde(default, skip_serializing_if = "Option::is_none")]
570    pub timeout_ms: Option<u64>,
571}
572
573#[derive(Debug, Clone, Serialize, Deserialize)]
574pub struct AutomationFlowSpec {
575    #[serde(default)]
576    pub nodes: Vec<AutomationFlowNode>,
577}
578
579#[derive(Debug, Clone, Serialize, Deserialize)]
580pub struct AutomationExecutionPolicy {
581    #[serde(default, skip_serializing_if = "Option::is_none")]
582    pub max_parallel_agents: Option<u32>,
583    #[serde(default, skip_serializing_if = "Option::is_none")]
584    pub max_total_runtime_ms: Option<u64>,
585    #[serde(default, skip_serializing_if = "Option::is_none")]
586    pub max_total_tool_calls: Option<u32>,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct AutomationV2Spec {
591    pub automation_id: String,
592    pub name: String,
593    #[serde(default, skip_serializing_if = "Option::is_none")]
594    pub description: Option<String>,
595    pub status: AutomationV2Status,
596    pub schedule: AutomationV2Schedule,
597    #[serde(default)]
598    pub agents: Vec<AutomationAgentProfile>,
599    pub flow: AutomationFlowSpec,
600    pub execution: AutomationExecutionPolicy,
601    #[serde(default)]
602    pub output_targets: Vec<String>,
603    pub created_at_ms: u64,
604    pub updated_at_ms: u64,
605    pub creator_id: String,
606    #[serde(default, skip_serializing_if = "Option::is_none")]
607    pub next_fire_at_ms: Option<u64>,
608    #[serde(default, skip_serializing_if = "Option::is_none")]
609    pub last_fired_at_ms: Option<u64>,
610}
611
612#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
613#[serde(rename_all = "snake_case")]
614pub enum AutomationRunStatus {
615    Queued,
616    Running,
617    Pausing,
618    Paused,
619    Completed,
620    Failed,
621    Cancelled,
622}
623
624#[derive(Debug, Clone, Serialize, Deserialize)]
625pub struct AutomationRunCheckpoint {
626    #[serde(default)]
627    pub completed_nodes: Vec<String>,
628    #[serde(default)]
629    pub pending_nodes: Vec<String>,
630    #[serde(default)]
631    pub node_outputs: std::collections::HashMap<String, Value>,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
635pub struct AutomationV2RunRecord {
636    pub run_id: String,
637    pub automation_id: String,
638    pub trigger_type: String,
639    pub status: AutomationRunStatus,
640    pub created_at_ms: u64,
641    pub updated_at_ms: u64,
642    #[serde(default, skip_serializing_if = "Option::is_none")]
643    pub started_at_ms: Option<u64>,
644    #[serde(default, skip_serializing_if = "Option::is_none")]
645    pub finished_at_ms: Option<u64>,
646    #[serde(default)]
647    pub active_session_ids: Vec<String>,
648    #[serde(default)]
649    pub active_instance_ids: Vec<String>,
650    pub checkpoint: AutomationRunCheckpoint,
651    #[serde(default, skip_serializing_if = "Option::is_none")]
652    pub pause_reason: Option<String>,
653    #[serde(default, skip_serializing_if = "Option::is_none")]
654    pub resume_reason: Option<String>,
655    #[serde(default, skip_serializing_if = "Option::is_none")]
656    pub detail: Option<String>,
657    #[serde(default)]
658    pub prompt_tokens: u64,
659    #[serde(default)]
660    pub completion_tokens: u64,
661    #[serde(default)]
662    pub total_tokens: u64,
663    #[serde(default)]
664    pub estimated_cost_usd: f64,
665}
666
667#[derive(Debug, Clone, Serialize)]
668pub struct ResourceConflict {
669    pub key: String,
670    pub expected_rev: Option<u64>,
671    pub current_rev: Option<u64>,
672}
673
674#[derive(Debug, Clone, Serialize)]
675#[serde(tag = "type", rename_all = "snake_case")]
676pub enum ResourceStoreError {
677    InvalidKey { key: String },
678    RevisionConflict(ResourceConflict),
679    PersistFailed { message: String },
680}
681
682#[derive(Debug, Clone, Serialize)]
683#[serde(tag = "type", rename_all = "snake_case")]
684pub enum RoutineStoreError {
685    InvalidRoutineId { routine_id: String },
686    InvalidSchedule { detail: String },
687    PersistFailed { message: String },
688}
689
690#[derive(Debug, Clone)]
691pub enum StartupStatus {
692    Starting,
693    Ready,
694    Failed,
695}
696
697#[derive(Debug, Clone)]
698pub struct StartupState {
699    pub status: StartupStatus,
700    pub phase: String,
701    pub started_at_ms: u64,
702    pub attempt_id: String,
703    pub last_error: Option<String>,
704}
705
706#[derive(Debug, Clone)]
707pub struct StartupSnapshot {
708    pub status: StartupStatus,
709    pub phase: String,
710    pub started_at_ms: u64,
711    pub attempt_id: String,
712    pub last_error: Option<String>,
713    pub elapsed_ms: u64,
714}
715
716#[derive(Clone)]
717pub struct AppState {
718    pub runtime: Arc<OnceLock<RuntimeState>>,
719    pub startup: Arc<RwLock<StartupState>>,
720    pub in_process_mode: Arc<AtomicBool>,
721    pub api_token: Arc<RwLock<Option<String>>>,
722    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
723    pub run_registry: RunRegistry,
724    pub run_stale_ms: u64,
725    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
726    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
727    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
728    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
729    pub shared_resources_path: PathBuf,
730    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
731    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
732    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
733    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
734    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
735    pub routine_session_policies:
736        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
737    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
738    pub token_cost_per_1k_usd: f64,
739    pub routines_path: PathBuf,
740    pub routine_history_path: PathBuf,
741    pub routine_runs_path: PathBuf,
742    pub automations_v2_path: PathBuf,
743    pub automation_v2_runs_path: PathBuf,
744    pub agent_teams: AgentTeamRuntime,
745    pub web_ui_enabled: Arc<AtomicBool>,
746    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
747    pub server_base_url: Arc<std::sync::RwLock<String>>,
748    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
749    pub host_runtime_context: HostRuntimeContext,
750}
751
752#[derive(Debug, Clone)]
753struct StatusIndexUpdate {
754    key: String,
755    value: Value,
756}
757
758impl AppState {
759    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
760        Self {
761            runtime: Arc::new(OnceLock::new()),
762            startup: Arc::new(RwLock::new(StartupState {
763                status: StartupStatus::Starting,
764                phase: "boot".to_string(),
765                started_at_ms: now_ms(),
766                attempt_id,
767                last_error: None,
768            })),
769            in_process_mode: Arc::new(AtomicBool::new(in_process)),
770            api_token: Arc::new(RwLock::new(None)),
771            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
772            run_registry: RunRegistry::new(),
773            run_stale_ms: resolve_run_stale_ms(),
774            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
775            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
776            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
777            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
778            shared_resources_path: resolve_shared_resources_path(),
779            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
780            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
781            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
782            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
783            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
784            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
785            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
786            routines_path: resolve_routines_path(),
787            routine_history_path: resolve_routine_history_path(),
788            routine_runs_path: resolve_routine_runs_path(),
789            automations_v2_path: resolve_automations_v2_path(),
790            automation_v2_runs_path: resolve_automation_v2_runs_path(),
791            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
792            web_ui_enabled: Arc::new(AtomicBool::new(false)),
793            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
794            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
795            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
796            host_runtime_context: detect_host_runtime_context(),
797            token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
798        }
799    }
800
801    pub fn is_ready(&self) -> bool {
802        self.runtime.get().is_some()
803    }
804
805    pub fn mode_label(&self) -> &'static str {
806        if self.in_process_mode.load(Ordering::Relaxed) {
807            "in-process"
808        } else {
809            "sidecar"
810        }
811    }
812
813    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
814        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
815        if let Ok(mut guard) = self.web_ui_prefix.write() {
816            *guard = normalize_web_ui_prefix(&prefix);
817        }
818    }
819
820    pub fn web_ui_enabled(&self) -> bool {
821        self.web_ui_enabled.load(Ordering::Relaxed)
822    }
823
824    pub fn web_ui_prefix(&self) -> String {
825        self.web_ui_prefix
826            .read()
827            .map(|v| v.clone())
828            .unwrap_or_else(|_| "/admin".to_string())
829    }
830
831    pub fn set_server_base_url(&self, base_url: String) {
832        if let Ok(mut guard) = self.server_base_url.write() {
833            *guard = base_url;
834        }
835    }
836
837    pub fn server_base_url(&self) -> String {
838        self.server_base_url
839            .read()
840            .map(|v| v.clone())
841            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
842    }
843
844    pub async fn api_token(&self) -> Option<String> {
845        self.api_token.read().await.clone()
846    }
847
848    pub async fn set_api_token(&self, token: Option<String>) {
849        *self.api_token.write().await = token;
850    }
851
852    pub async fn startup_snapshot(&self) -> StartupSnapshot {
853        let state = self.startup.read().await.clone();
854        StartupSnapshot {
855            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
856            status: state.status,
857            phase: state.phase,
858            started_at_ms: state.started_at_ms,
859            attempt_id: state.attempt_id,
860            last_error: state.last_error,
861        }
862    }
863
864    pub fn host_runtime_context(&self) -> HostRuntimeContext {
865        self.runtime
866            .get()
867            .map(|runtime| runtime.host_runtime_context.clone())
868            .unwrap_or_else(|| self.host_runtime_context.clone())
869    }
870
871    pub async fn set_phase(&self, phase: impl Into<String>) {
872        let mut startup = self.startup.write().await;
873        startup.phase = phase.into();
874    }
875
876    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
877        self.runtime
878            .set(runtime)
879            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
880        self.engine_loop
881            .set_spawn_agent_hook(std::sync::Arc::new(
882                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
883            ))
884            .await;
885        self.engine_loop
886            .set_tool_policy_hook(std::sync::Arc::new(
887                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
888            ))
889            .await;
890        self.engine_loop
891            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
892                self.clone(),
893            )))
894            .await;
895        let _ = self.load_shared_resources().await;
896        let _ = self.load_routines().await;
897        let _ = self.load_routine_history().await;
898        let _ = self.load_routine_runs().await;
899        let _ = self.load_automations_v2().await;
900        let _ = self.load_automation_v2_runs().await;
901        let workspace_root = self.workspace_index.snapshot().await.root;
902        let _ = self
903            .agent_teams
904            .ensure_loaded_for_workspace(&workspace_root)
905            .await;
906        let mut startup = self.startup.write().await;
907        startup.status = StartupStatus::Ready;
908        startup.phase = "ready".to_string();
909        startup.last_error = None;
910        Ok(())
911    }
912
913    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
914        let mut startup = self.startup.write().await;
915        startup.status = StartupStatus::Failed;
916        startup.phase = phase.into();
917        startup.last_error = Some(error.into());
918    }
919
920    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
921        let runtime = self.channels_runtime.lock().await;
922        runtime.statuses.clone()
923    }
924
925    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
926        let effective = self.config.get_effective_value().await;
927        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
928        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
929
930        let mut runtime = self.channels_runtime.lock().await;
931        if let Some(listeners) = runtime.listeners.as_mut() {
932            listeners.abort_all();
933        }
934        runtime.listeners = None;
935        runtime.statuses.clear();
936
937        let mut status_map = std::collections::HashMap::new();
938        status_map.insert(
939            "telegram".to_string(),
940            ChannelStatus {
941                enabled: parsed.channels.telegram.is_some(),
942                connected: false,
943                last_error: None,
944                active_sessions: 0,
945                meta: serde_json::json!({}),
946            },
947        );
948        status_map.insert(
949            "discord".to_string(),
950            ChannelStatus {
951                enabled: parsed.channels.discord.is_some(),
952                connected: false,
953                last_error: None,
954                active_sessions: 0,
955                meta: serde_json::json!({}),
956            },
957        );
958        status_map.insert(
959            "slack".to_string(),
960            ChannelStatus {
961                enabled: parsed.channels.slack.is_some(),
962                connected: false,
963                last_error: None,
964                active_sessions: 0,
965                meta: serde_json::json!({}),
966            },
967        );
968
969        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
970            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
971            runtime.listeners = Some(listeners);
972            for status in status_map.values_mut() {
973                if status.enabled {
974                    status.connected = true;
975                }
976            }
977        }
978
979        runtime.statuses = status_map.clone();
980        drop(runtime);
981
982        self.event_bus.publish(EngineEvent::new(
983            "channel.status.changed",
984            serde_json::json!({ "channels": status_map }),
985        ));
986        Ok(())
987    }
988
989    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
990        if !self.shared_resources_path.exists() {
991            return Ok(());
992        }
993        let raw = fs::read_to_string(&self.shared_resources_path).await?;
994        let parsed =
995            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
996                .unwrap_or_default();
997        let mut guard = self.shared_resources.write().await;
998        *guard = parsed;
999        Ok(())
1000    }
1001
1002    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1003        if let Some(parent) = self.shared_resources_path.parent() {
1004            fs::create_dir_all(parent).await?;
1005        }
1006        let payload = {
1007            let guard = self.shared_resources.read().await;
1008            serde_json::to_string_pretty(&*guard)?
1009        };
1010        fs::write(&self.shared_resources_path, payload).await?;
1011        Ok(())
1012    }
1013
1014    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1015        self.shared_resources.read().await.get(key).cloned()
1016    }
1017
1018    pub async fn list_shared_resources(
1019        &self,
1020        prefix: Option<&str>,
1021        limit: usize,
1022    ) -> Vec<SharedResourceRecord> {
1023        let limit = limit.clamp(1, 500);
1024        let mut rows = self
1025            .shared_resources
1026            .read()
1027            .await
1028            .values()
1029            .filter(|record| {
1030                if let Some(prefix) = prefix {
1031                    record.key.starts_with(prefix)
1032                } else {
1033                    true
1034                }
1035            })
1036            .cloned()
1037            .collect::<Vec<_>>();
1038        rows.sort_by(|a, b| a.key.cmp(&b.key));
1039        rows.truncate(limit);
1040        rows
1041    }
1042
1043    pub async fn put_shared_resource(
1044        &self,
1045        key: String,
1046        value: Value,
1047        if_match_rev: Option<u64>,
1048        updated_by: String,
1049        ttl_ms: Option<u64>,
1050    ) -> Result<SharedResourceRecord, ResourceStoreError> {
1051        if !is_valid_resource_key(&key) {
1052            return Err(ResourceStoreError::InvalidKey { key });
1053        }
1054
1055        let now = now_ms();
1056        let mut guard = self.shared_resources.write().await;
1057        let existing = guard.get(&key).cloned();
1058
1059        if let Some(expected) = if_match_rev {
1060            let current = existing.as_ref().map(|row| row.rev);
1061            if current != Some(expected) {
1062                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1063                    key,
1064                    expected_rev: Some(expected),
1065                    current_rev: current,
1066                }));
1067            }
1068        }
1069
1070        let next_rev = existing
1071            .as_ref()
1072            .map(|row| row.rev.saturating_add(1))
1073            .unwrap_or(1);
1074
1075        let record = SharedResourceRecord {
1076            key: key.clone(),
1077            value,
1078            rev: next_rev,
1079            updated_at_ms: now,
1080            updated_by,
1081            ttl_ms,
1082        };
1083
1084        let previous = guard.insert(key.clone(), record.clone());
1085        drop(guard);
1086
1087        if let Err(error) = self.persist_shared_resources().await {
1088            let mut rollback = self.shared_resources.write().await;
1089            if let Some(previous) = previous {
1090                rollback.insert(key, previous);
1091            } else {
1092                rollback.remove(&key);
1093            }
1094            return Err(ResourceStoreError::PersistFailed {
1095                message: error.to_string(),
1096            });
1097        }
1098
1099        Ok(record)
1100    }
1101
1102    pub async fn delete_shared_resource(
1103        &self,
1104        key: &str,
1105        if_match_rev: Option<u64>,
1106    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1107        if !is_valid_resource_key(key) {
1108            return Err(ResourceStoreError::InvalidKey {
1109                key: key.to_string(),
1110            });
1111        }
1112
1113        let mut guard = self.shared_resources.write().await;
1114        let current = guard.get(key).cloned();
1115        if let Some(expected) = if_match_rev {
1116            let current_rev = current.as_ref().map(|row| row.rev);
1117            if current_rev != Some(expected) {
1118                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1119                    key: key.to_string(),
1120                    expected_rev: Some(expected),
1121                    current_rev,
1122                }));
1123            }
1124        }
1125
1126        let removed = guard.remove(key);
1127        drop(guard);
1128
1129        if let Err(error) = self.persist_shared_resources().await {
1130            if let Some(record) = removed.clone() {
1131                self.shared_resources
1132                    .write()
1133                    .await
1134                    .insert(record.key.clone(), record);
1135            }
1136            return Err(ResourceStoreError::PersistFailed {
1137                message: error.to_string(),
1138            });
1139        }
1140
1141        Ok(removed)
1142    }
1143
1144    pub async fn load_routines(&self) -> anyhow::Result<()> {
1145        if !self.routines_path.exists() {
1146            return Ok(());
1147        }
1148        let raw = fs::read_to_string(&self.routines_path).await?;
1149        let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
1150            .unwrap_or_default();
1151        let mut guard = self.routines.write().await;
1152        *guard = parsed;
1153        Ok(())
1154    }
1155
1156    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1157        if !self.routine_history_path.exists() {
1158            return Ok(());
1159        }
1160        let raw = fs::read_to_string(&self.routine_history_path).await?;
1161        let parsed = serde_json::from_str::<
1162            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1163        >(&raw)
1164        .unwrap_or_default();
1165        let mut guard = self.routine_history.write().await;
1166        *guard = parsed;
1167        Ok(())
1168    }
1169
1170    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1171        if !self.routine_runs_path.exists() {
1172            return Ok(());
1173        }
1174        let raw = fs::read_to_string(&self.routine_runs_path).await?;
1175        let parsed =
1176            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1177                .unwrap_or_default();
1178        let mut guard = self.routine_runs.write().await;
1179        *guard = parsed;
1180        Ok(())
1181    }
1182
1183    pub async fn persist_routines(&self) -> anyhow::Result<()> {
1184        if let Some(parent) = self.routines_path.parent() {
1185            fs::create_dir_all(parent).await?;
1186        }
1187        let payload = {
1188            let guard = self.routines.read().await;
1189            serde_json::to_string_pretty(&*guard)?
1190        };
1191        fs::write(&self.routines_path, payload).await?;
1192        Ok(())
1193    }
1194
1195    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1196        if let Some(parent) = self.routine_history_path.parent() {
1197            fs::create_dir_all(parent).await?;
1198        }
1199        let payload = {
1200            let guard = self.routine_history.read().await;
1201            serde_json::to_string_pretty(&*guard)?
1202        };
1203        fs::write(&self.routine_history_path, payload).await?;
1204        Ok(())
1205    }
1206
1207    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1208        if let Some(parent) = self.routine_runs_path.parent() {
1209            fs::create_dir_all(parent).await?;
1210        }
1211        let payload = {
1212            let guard = self.routine_runs.read().await;
1213            serde_json::to_string_pretty(&*guard)?
1214        };
1215        fs::write(&self.routine_runs_path, payload).await?;
1216        Ok(())
1217    }
1218
1219    pub async fn put_routine(
1220        &self,
1221        mut routine: RoutineSpec,
1222    ) -> Result<RoutineSpec, RoutineStoreError> {
1223        if routine.routine_id.trim().is_empty() {
1224            return Err(RoutineStoreError::InvalidRoutineId {
1225                routine_id: routine.routine_id,
1226            });
1227        }
1228
1229        routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1230        routine.output_targets = normalize_non_empty_list(routine.output_targets);
1231
1232        let now = now_ms();
1233        let next_schedule_fire =
1234            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1235                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1236                    detail: "invalid schedule or timezone".to_string(),
1237                })?;
1238        match routine.schedule {
1239            RoutineSchedule::IntervalSeconds { seconds } => {
1240                if seconds == 0 {
1241                    return Err(RoutineStoreError::InvalidSchedule {
1242                        detail: "interval_seconds must be > 0".to_string(),
1243                    });
1244                }
1245                let _ = seconds;
1246            }
1247            RoutineSchedule::Cron { .. } => {}
1248        }
1249        if routine.next_fire_at_ms.is_none() {
1250            routine.next_fire_at_ms = Some(next_schedule_fire);
1251        }
1252
1253        let mut guard = self.routines.write().await;
1254        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1255        drop(guard);
1256
1257        if let Err(error) = self.persist_routines().await {
1258            let mut rollback = self.routines.write().await;
1259            if let Some(previous) = previous {
1260                rollback.insert(previous.routine_id.clone(), previous);
1261            } else {
1262                rollback.remove(&routine.routine_id);
1263            }
1264            return Err(RoutineStoreError::PersistFailed {
1265                message: error.to_string(),
1266            });
1267        }
1268
1269        Ok(routine)
1270    }
1271
1272    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1273        let mut rows = self
1274            .routines
1275            .read()
1276            .await
1277            .values()
1278            .cloned()
1279            .collect::<Vec<_>>();
1280        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1281        rows
1282    }
1283
1284    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1285        self.routines.read().await.get(routine_id).cloned()
1286    }
1287
1288    pub async fn delete_routine(
1289        &self,
1290        routine_id: &str,
1291    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1292        let mut guard = self.routines.write().await;
1293        let removed = guard.remove(routine_id);
1294        drop(guard);
1295
1296        if let Err(error) = self.persist_routines().await {
1297            if let Some(removed) = removed.clone() {
1298                self.routines
1299                    .write()
1300                    .await
1301                    .insert(removed.routine_id.clone(), removed);
1302            }
1303            return Err(RoutineStoreError::PersistFailed {
1304                message: error.to_string(),
1305            });
1306        }
1307        Ok(removed)
1308    }
1309
1310    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1311        let mut plans = Vec::new();
1312        let mut guard = self.routines.write().await;
1313        for routine in guard.values_mut() {
1314            if routine.status != RoutineStatus::Active {
1315                continue;
1316            }
1317            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1318                continue;
1319            };
1320            if now_ms < next_fire_at_ms {
1321                continue;
1322            }
1323            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1324                now_ms,
1325                next_fire_at_ms,
1326                &routine.schedule,
1327                &routine.timezone,
1328                &routine.misfire_policy,
1329            );
1330            routine.next_fire_at_ms = Some(next_fire_at_ms);
1331            if run_count == 0 {
1332                continue;
1333            }
1334            plans.push(RoutineTriggerPlan {
1335                routine_id: routine.routine_id.clone(),
1336                run_count,
1337                scheduled_at_ms: now_ms,
1338                next_fire_at_ms,
1339            });
1340        }
1341        drop(guard);
1342        let _ = self.persist_routines().await;
1343        plans
1344    }
1345
1346    pub async fn mark_routine_fired(
1347        &self,
1348        routine_id: &str,
1349        fired_at_ms: u64,
1350    ) -> Option<RoutineSpec> {
1351        let mut guard = self.routines.write().await;
1352        let routine = guard.get_mut(routine_id)?;
1353        routine.last_fired_at_ms = Some(fired_at_ms);
1354        let updated = routine.clone();
1355        drop(guard);
1356        let _ = self.persist_routines().await;
1357        Some(updated)
1358    }
1359
1360    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1361        let mut history = self.routine_history.write().await;
1362        history
1363            .entry(event.routine_id.clone())
1364            .or_default()
1365            .push(event);
1366        drop(history);
1367        let _ = self.persist_routine_history().await;
1368    }
1369
1370    pub async fn list_routine_history(
1371        &self,
1372        routine_id: &str,
1373        limit: usize,
1374    ) -> Vec<RoutineHistoryEvent> {
1375        let limit = limit.clamp(1, 500);
1376        let mut rows = self
1377            .routine_history
1378            .read()
1379            .await
1380            .get(routine_id)
1381            .cloned()
1382            .unwrap_or_default();
1383        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1384        rows.truncate(limit);
1385        rows
1386    }
1387
1388    pub async fn create_routine_run(
1389        &self,
1390        routine: &RoutineSpec,
1391        trigger_type: &str,
1392        run_count: u32,
1393        status: RoutineRunStatus,
1394        detail: Option<String>,
1395    ) -> RoutineRunRecord {
1396        let now = now_ms();
1397        let record = RoutineRunRecord {
1398            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1399            routine_id: routine.routine_id.clone(),
1400            trigger_type: trigger_type.to_string(),
1401            run_count,
1402            status,
1403            created_at_ms: now,
1404            updated_at_ms: now,
1405            fired_at_ms: Some(now),
1406            started_at_ms: None,
1407            finished_at_ms: None,
1408            requires_approval: routine.requires_approval,
1409            approval_reason: None,
1410            denial_reason: None,
1411            paused_reason: None,
1412            detail,
1413            entrypoint: routine.entrypoint.clone(),
1414            args: routine.args.clone(),
1415            allowed_tools: routine.allowed_tools.clone(),
1416            output_targets: routine.output_targets.clone(),
1417            artifacts: Vec::new(),
1418            active_session_ids: Vec::new(),
1419            prompt_tokens: 0,
1420            completion_tokens: 0,
1421            total_tokens: 0,
1422            estimated_cost_usd: 0.0,
1423        };
1424        self.routine_runs
1425            .write()
1426            .await
1427            .insert(record.run_id.clone(), record.clone());
1428        let _ = self.persist_routine_runs().await;
1429        record
1430    }
1431
1432    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1433        self.routine_runs.read().await.get(run_id).cloned()
1434    }
1435
1436    pub async fn list_routine_runs(
1437        &self,
1438        routine_id: Option<&str>,
1439        limit: usize,
1440    ) -> Vec<RoutineRunRecord> {
1441        let mut rows = self
1442            .routine_runs
1443            .read()
1444            .await
1445            .values()
1446            .filter(|row| {
1447                if let Some(id) = routine_id {
1448                    row.routine_id == id
1449                } else {
1450                    true
1451                }
1452            })
1453            .cloned()
1454            .collect::<Vec<_>>();
1455        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1456        rows.truncate(limit.clamp(1, 500));
1457        rows
1458    }
1459
1460    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1461        let mut guard = self.routine_runs.write().await;
1462        let next_run_id = guard
1463            .values()
1464            .filter(|row| row.status == RoutineRunStatus::Queued)
1465            .min_by(|a, b| {
1466                a.created_at_ms
1467                    .cmp(&b.created_at_ms)
1468                    .then_with(|| a.run_id.cmp(&b.run_id))
1469            })
1470            .map(|row| row.run_id.clone())?;
1471        let now = now_ms();
1472        let row = guard.get_mut(&next_run_id)?;
1473        row.status = RoutineRunStatus::Running;
1474        row.updated_at_ms = now;
1475        row.started_at_ms = Some(now);
1476        let claimed = row.clone();
1477        drop(guard);
1478        let _ = self.persist_routine_runs().await;
1479        Some(claimed)
1480    }
1481
1482    pub async fn set_routine_session_policy(
1483        &self,
1484        session_id: String,
1485        run_id: String,
1486        routine_id: String,
1487        allowed_tools: Vec<String>,
1488    ) {
1489        let policy = RoutineSessionPolicy {
1490            session_id: session_id.clone(),
1491            run_id,
1492            routine_id,
1493            allowed_tools: normalize_allowed_tools(allowed_tools),
1494        };
1495        self.routine_session_policies
1496            .write()
1497            .await
1498            .insert(session_id, policy);
1499    }
1500
1501    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1502        self.routine_session_policies
1503            .read()
1504            .await
1505            .get(session_id)
1506            .cloned()
1507    }
1508
1509    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1510        self.routine_session_policies
1511            .write()
1512            .await
1513            .remove(session_id);
1514    }
1515
1516    pub async fn update_routine_run_status(
1517        &self,
1518        run_id: &str,
1519        status: RoutineRunStatus,
1520        reason: Option<String>,
1521    ) -> Option<RoutineRunRecord> {
1522        let mut guard = self.routine_runs.write().await;
1523        let row = guard.get_mut(run_id)?;
1524        row.status = status.clone();
1525        row.updated_at_ms = now_ms();
1526        match status {
1527            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1528            RoutineRunStatus::Running => {
1529                row.started_at_ms.get_or_insert_with(now_ms);
1530                if let Some(detail) = reason {
1531                    row.detail = Some(detail);
1532                }
1533            }
1534            RoutineRunStatus::Denied => row.denial_reason = reason,
1535            RoutineRunStatus::Paused => row.paused_reason = reason,
1536            RoutineRunStatus::Completed
1537            | RoutineRunStatus::Failed
1538            | RoutineRunStatus::Cancelled => {
1539                row.finished_at_ms = Some(now_ms());
1540                if let Some(detail) = reason {
1541                    row.detail = Some(detail);
1542                }
1543            }
1544            _ => {
1545                if let Some(detail) = reason {
1546                    row.detail = Some(detail);
1547                }
1548            }
1549        }
1550        let updated = row.clone();
1551        drop(guard);
1552        let _ = self.persist_routine_runs().await;
1553        Some(updated)
1554    }
1555
1556    pub async fn append_routine_run_artifact(
1557        &self,
1558        run_id: &str,
1559        artifact: RoutineRunArtifact,
1560    ) -> Option<RoutineRunRecord> {
1561        let mut guard = self.routine_runs.write().await;
1562        let row = guard.get_mut(run_id)?;
1563        row.updated_at_ms = now_ms();
1564        row.artifacts.push(artifact);
1565        let updated = row.clone();
1566        drop(guard);
1567        let _ = self.persist_routine_runs().await;
1568        Some(updated)
1569    }
1570
1571    pub async fn add_active_session_id(
1572        &self,
1573        run_id: &str,
1574        session_id: String,
1575    ) -> Option<RoutineRunRecord> {
1576        let mut guard = self.routine_runs.write().await;
1577        let row = guard.get_mut(run_id)?;
1578        if !row.active_session_ids.iter().any(|id| id == &session_id) {
1579            row.active_session_ids.push(session_id);
1580        }
1581        row.updated_at_ms = now_ms();
1582        let updated = row.clone();
1583        drop(guard);
1584        let _ = self.persist_routine_runs().await;
1585        Some(updated)
1586    }
1587
1588    pub async fn clear_active_session_id(
1589        &self,
1590        run_id: &str,
1591        session_id: &str,
1592    ) -> Option<RoutineRunRecord> {
1593        let mut guard = self.routine_runs.write().await;
1594        let row = guard.get_mut(run_id)?;
1595        row.active_session_ids.retain(|id| id != session_id);
1596        row.updated_at_ms = now_ms();
1597        let updated = row.clone();
1598        drop(guard);
1599        let _ = self.persist_routine_runs().await;
1600        Some(updated)
1601    }
1602
1603    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1604        if !self.automations_v2_path.exists() {
1605            return Ok(());
1606        }
1607        let raw = fs::read_to_string(&self.automations_v2_path).await?;
1608        let parsed =
1609            serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(&raw)
1610                .unwrap_or_default();
1611        *self.automations_v2.write().await = parsed;
1612        Ok(())
1613    }
1614
1615    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1616        if let Some(parent) = self.automations_v2_path.parent() {
1617            fs::create_dir_all(parent).await?;
1618        }
1619        let payload = {
1620            let guard = self.automations_v2.read().await;
1621            serde_json::to_string_pretty(&*guard)?
1622        };
1623        fs::write(&self.automations_v2_path, payload).await?;
1624        Ok(())
1625    }
1626
1627    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1628        if !self.automation_v2_runs_path.exists() {
1629            return Ok(());
1630        }
1631        let raw = fs::read_to_string(&self.automation_v2_runs_path).await?;
1632        let parsed =
1633            serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(&raw)
1634                .unwrap_or_default();
1635        *self.automation_v2_runs.write().await = parsed;
1636        Ok(())
1637    }
1638
1639    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1640        if let Some(parent) = self.automation_v2_runs_path.parent() {
1641            fs::create_dir_all(parent).await?;
1642        }
1643        let payload = {
1644            let guard = self.automation_v2_runs.read().await;
1645            serde_json::to_string_pretty(&*guard)?
1646        };
1647        fs::write(&self.automation_v2_runs_path, payload).await?;
1648        Ok(())
1649    }
1650
1651    pub async fn put_automation_v2(
1652        &self,
1653        mut automation: AutomationV2Spec,
1654    ) -> anyhow::Result<AutomationV2Spec> {
1655        if automation.automation_id.trim().is_empty() {
1656            anyhow::bail!("automation_id is required");
1657        }
1658        for agent in &mut automation.agents {
1659            if agent.display_name.trim().is_empty() {
1660                agent.display_name = auto_generated_agent_name(&agent.agent_id);
1661            }
1662            agent.tool_policy.allowlist =
1663                normalize_allowed_tools(agent.tool_policy.allowlist.clone());
1664            agent.tool_policy.denylist =
1665                normalize_allowed_tools(agent.tool_policy.denylist.clone());
1666            agent.mcp_policy.allowed_servers =
1667                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
1668            agent.mcp_policy.allowed_tools = agent
1669                .mcp_policy
1670                .allowed_tools
1671                .take()
1672                .map(normalize_allowed_tools);
1673        }
1674        let now = now_ms();
1675        if automation.created_at_ms == 0 {
1676            automation.created_at_ms = now;
1677        }
1678        automation.updated_at_ms = now;
1679        if automation.next_fire_at_ms.is_none() {
1680            automation.next_fire_at_ms =
1681                automation_schedule_next_fire_at_ms(&automation.schedule, now);
1682        }
1683        self.automations_v2
1684            .write()
1685            .await
1686            .insert(automation.automation_id.clone(), automation.clone());
1687        self.persist_automations_v2().await?;
1688        Ok(automation)
1689    }
1690
1691    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
1692        self.automations_v2.read().await.get(automation_id).cloned()
1693    }
1694
1695    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
1696        let mut rows = self
1697            .automations_v2
1698            .read()
1699            .await
1700            .values()
1701            .cloned()
1702            .collect::<Vec<_>>();
1703        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
1704        rows
1705    }
1706
1707    pub async fn delete_automation_v2(
1708        &self,
1709        automation_id: &str,
1710    ) -> anyhow::Result<Option<AutomationV2Spec>> {
1711        let removed = self.automations_v2.write().await.remove(automation_id);
1712        self.persist_automations_v2().await?;
1713        Ok(removed)
1714    }
1715
1716    pub async fn create_automation_v2_run(
1717        &self,
1718        automation: &AutomationV2Spec,
1719        trigger_type: &str,
1720    ) -> anyhow::Result<AutomationV2RunRecord> {
1721        let now = now_ms();
1722        let pending_nodes = automation
1723            .flow
1724            .nodes
1725            .iter()
1726            .map(|n| n.node_id.clone())
1727            .collect::<Vec<_>>();
1728        let run = AutomationV2RunRecord {
1729            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
1730            automation_id: automation.automation_id.clone(),
1731            trigger_type: trigger_type.to_string(),
1732            status: AutomationRunStatus::Queued,
1733            created_at_ms: now,
1734            updated_at_ms: now,
1735            started_at_ms: None,
1736            finished_at_ms: None,
1737            active_session_ids: Vec::new(),
1738            active_instance_ids: Vec::new(),
1739            checkpoint: AutomationRunCheckpoint {
1740                completed_nodes: Vec::new(),
1741                pending_nodes,
1742                node_outputs: std::collections::HashMap::new(),
1743            },
1744            pause_reason: None,
1745            resume_reason: None,
1746            detail: None,
1747            prompt_tokens: 0,
1748            completion_tokens: 0,
1749            total_tokens: 0,
1750            estimated_cost_usd: 0.0,
1751        };
1752        self.automation_v2_runs
1753            .write()
1754            .await
1755            .insert(run.run_id.clone(), run.clone());
1756        self.persist_automation_v2_runs().await?;
1757        Ok(run)
1758    }
1759
1760    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
1761        self.automation_v2_runs.read().await.get(run_id).cloned()
1762    }
1763
1764    pub async fn list_automation_v2_runs(
1765        &self,
1766        automation_id: Option<&str>,
1767        limit: usize,
1768    ) -> Vec<AutomationV2RunRecord> {
1769        let mut rows = self
1770            .automation_v2_runs
1771            .read()
1772            .await
1773            .values()
1774            .filter(|row| {
1775                if let Some(id) = automation_id {
1776                    row.automation_id == id
1777                } else {
1778                    true
1779                }
1780            })
1781            .cloned()
1782            .collect::<Vec<_>>();
1783        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1784        rows.truncate(limit.clamp(1, 500));
1785        rows
1786    }
1787
1788    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
1789        let mut guard = self.automation_v2_runs.write().await;
1790        let run_id = guard
1791            .values()
1792            .filter(|row| row.status == AutomationRunStatus::Queued)
1793            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
1794            .map(|row| row.run_id.clone())?;
1795        let now = now_ms();
1796        let run = guard.get_mut(&run_id)?;
1797        run.status = AutomationRunStatus::Running;
1798        run.updated_at_ms = now;
1799        run.started_at_ms.get_or_insert(now);
1800        let claimed = run.clone();
1801        drop(guard);
1802        let _ = self.persist_automation_v2_runs().await;
1803        Some(claimed)
1804    }
1805
1806    pub async fn update_automation_v2_run(
1807        &self,
1808        run_id: &str,
1809        update: impl FnOnce(&mut AutomationV2RunRecord),
1810    ) -> Option<AutomationV2RunRecord> {
1811        let mut guard = self.automation_v2_runs.write().await;
1812        let run = guard.get_mut(run_id)?;
1813        update(run);
1814        run.updated_at_ms = now_ms();
1815        if matches!(
1816            run.status,
1817            AutomationRunStatus::Completed
1818                | AutomationRunStatus::Failed
1819                | AutomationRunStatus::Cancelled
1820        ) {
1821            run.finished_at_ms.get_or_insert_with(now_ms);
1822        }
1823        let out = run.clone();
1824        drop(guard);
1825        let _ = self.persist_automation_v2_runs().await;
1826        Some(out)
1827    }
1828
1829    pub async fn add_automation_v2_session(
1830        &self,
1831        run_id: &str,
1832        session_id: &str,
1833    ) -> Option<AutomationV2RunRecord> {
1834        let updated = self
1835            .update_automation_v2_run(run_id, |row| {
1836                if !row.active_session_ids.iter().any(|id| id == session_id) {
1837                    row.active_session_ids.push(session_id.to_string());
1838                }
1839            })
1840            .await;
1841        self.automation_v2_session_runs
1842            .write()
1843            .await
1844            .insert(session_id.to_string(), run_id.to_string());
1845        updated
1846    }
1847
1848    pub async fn clear_automation_v2_session(
1849        &self,
1850        run_id: &str,
1851        session_id: &str,
1852    ) -> Option<AutomationV2RunRecord> {
1853        self.automation_v2_session_runs
1854            .write()
1855            .await
1856            .remove(session_id);
1857        self.update_automation_v2_run(run_id, |row| {
1858            row.active_session_ids.retain(|id| id != session_id);
1859        })
1860        .await
1861    }
1862
1863    pub async fn apply_provider_usage_to_runs(
1864        &self,
1865        session_id: &str,
1866        prompt_tokens: u64,
1867        completion_tokens: u64,
1868        total_tokens: u64,
1869    ) {
1870        if let Some(policy) = self.routine_session_policy(session_id).await {
1871            let rate = self.token_cost_per_1k_usd.max(0.0);
1872            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
1873            let mut guard = self.routine_runs.write().await;
1874            if let Some(run) = guard.get_mut(&policy.run_id) {
1875                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
1876                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
1877                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
1878                run.estimated_cost_usd += delta_cost;
1879                run.updated_at_ms = now_ms();
1880            }
1881            drop(guard);
1882            let _ = self.persist_routine_runs().await;
1883        }
1884
1885        let maybe_v2_run_id = self
1886            .automation_v2_session_runs
1887            .read()
1888            .await
1889            .get(session_id)
1890            .cloned();
1891        if let Some(run_id) = maybe_v2_run_id {
1892            let rate = self.token_cost_per_1k_usd.max(0.0);
1893            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
1894            let mut guard = self.automation_v2_runs.write().await;
1895            if let Some(run) = guard.get_mut(&run_id) {
1896                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
1897                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
1898                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
1899                run.estimated_cost_usd += delta_cost;
1900                run.updated_at_ms = now_ms();
1901            }
1902            drop(guard);
1903            let _ = self.persist_automation_v2_runs().await;
1904        }
1905    }
1906
1907    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
1908        let mut fired = Vec::new();
1909        let mut guard = self.automations_v2.write().await;
1910        for automation in guard.values_mut() {
1911            if automation.status != AutomationV2Status::Active {
1912                continue;
1913            }
1914            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
1915                automation.next_fire_at_ms =
1916                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
1917                continue;
1918            };
1919            if now_ms < next_fire_at_ms {
1920                continue;
1921            }
1922            let run_count =
1923                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
1924            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
1925            automation.next_fire_at_ms = next;
1926            automation.last_fired_at_ms = Some(now_ms);
1927            for _ in 0..run_count {
1928                fired.push(automation.automation_id.clone());
1929            }
1930        }
1931        drop(guard);
1932        let _ = self.persist_automations_v2().await;
1933        fired
1934    }
1935}
1936
1937async fn build_channels_config(
1938    state: &AppState,
1939    channels: &ChannelsConfigFile,
1940) -> Option<ChannelsConfig> {
1941    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1942        return None;
1943    }
1944    Some(ChannelsConfig {
1945        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1946            bot_token: cfg.bot_token,
1947            allowed_users: cfg.allowed_users,
1948            mention_only: cfg.mention_only,
1949            style_profile: cfg.style_profile,
1950        }),
1951        discord: channels.discord.clone().map(|cfg| DiscordConfig {
1952            bot_token: cfg.bot_token,
1953            guild_id: cfg.guild_id,
1954            allowed_users: cfg.allowed_users,
1955            mention_only: cfg.mention_only,
1956        }),
1957        slack: channels.slack.clone().map(|cfg| SlackConfig {
1958            bot_token: cfg.bot_token,
1959            channel_id: cfg.channel_id,
1960            allowed_users: cfg.allowed_users,
1961        }),
1962        server_base_url: state.server_base_url(),
1963        api_token: state.api_token().await.unwrap_or_default(),
1964        tool_policy: channels.tool_policy.clone(),
1965    })
1966}
1967
1968fn normalize_web_ui_prefix(prefix: &str) -> String {
1969    let trimmed = prefix.trim();
1970    if trimmed.is_empty() || trimmed == "/" {
1971        return "/admin".to_string();
1972    }
1973    let with_leading = if trimmed.starts_with('/') {
1974        trimmed.to_string()
1975    } else {
1976        format!("/{trimmed}")
1977    };
1978    with_leading.trim_end_matches('/').to_string()
1979}
1980
1981fn default_web_ui_prefix() -> String {
1982    "/admin".to_string()
1983}
1984
1985fn default_allow_all() -> Vec<String> {
1986    vec!["*".to_string()]
1987}
1988
1989fn default_discord_mention_only() -> bool {
1990    true
1991}
1992
1993fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
1994    normalize_non_empty_list(raw)
1995}
1996
1997fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
1998    let mut out = Vec::new();
1999    let mut seen = std::collections::HashSet::new();
2000    for item in raw {
2001        let normalized = item.trim().to_string();
2002        if normalized.is_empty() {
2003            continue;
2004        }
2005        if seen.insert(normalized.clone()) {
2006            out.push(normalized);
2007        }
2008    }
2009    out
2010}
2011
2012fn resolve_run_stale_ms() -> u64 {
2013    std::env::var("TANDEM_RUN_STALE_MS")
2014        .ok()
2015        .and_then(|v| v.trim().parse::<u64>().ok())
2016        .unwrap_or(120_000)
2017        .clamp(30_000, 600_000)
2018}
2019
2020fn resolve_token_cost_per_1k_usd() -> f64 {
2021    std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
2022        .ok()
2023        .and_then(|v| v.trim().parse::<f64>().ok())
2024        .unwrap_or(0.0)
2025        .max(0.0)
2026}
2027
2028fn resolve_shared_resources_path() -> PathBuf {
2029    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
2030        let trimmed = dir.trim();
2031        if !trimmed.is_empty() {
2032            return PathBuf::from(trimmed).join("shared_resources.json");
2033        }
2034    }
2035    default_state_dir().join("shared_resources.json")
2036}
2037
2038fn resolve_routines_path() -> PathBuf {
2039    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
2040        let trimmed = dir.trim();
2041        if !trimmed.is_empty() {
2042            return PathBuf::from(trimmed).join("routines.json");
2043        }
2044    }
2045    default_state_dir().join("routines.json")
2046}
2047
2048fn resolve_routine_history_path() -> PathBuf {
2049    if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
2050        let trimmed = root.trim();
2051        if !trimmed.is_empty() {
2052            return PathBuf::from(trimmed).join("routine_history.json");
2053        }
2054    }
2055    default_state_dir().join("routine_history.json")
2056}
2057
2058fn resolve_routine_runs_path() -> PathBuf {
2059    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2060        let trimmed = root.trim();
2061        if !trimmed.is_empty() {
2062            return PathBuf::from(trimmed).join("routine_runs.json");
2063        }
2064    }
2065    default_state_dir().join("routine_runs.json")
2066}
2067
2068fn resolve_automations_v2_path() -> PathBuf {
2069    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2070        let trimmed = root.trim();
2071        if !trimmed.is_empty() {
2072            return PathBuf::from(trimmed).join("automations_v2.json");
2073        }
2074    }
2075    default_state_dir().join("automations_v2.json")
2076}
2077
2078fn resolve_automation_v2_runs_path() -> PathBuf {
2079    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2080        let trimmed = root.trim();
2081        if !trimmed.is_empty() {
2082            return PathBuf::from(trimmed).join("automation_v2_runs.json");
2083        }
2084    }
2085    default_state_dir().join("automation_v2_runs.json")
2086}
2087
2088fn resolve_agent_team_audit_path() -> PathBuf {
2089    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
2090        let trimmed = base.trim();
2091        if !trimmed.is_empty() {
2092            return PathBuf::from(trimmed)
2093                .join("agent-team")
2094                .join("audit.log.jsonl");
2095        }
2096    }
2097    default_state_dir()
2098        .join("agent-team")
2099        .join("audit.log.jsonl")
2100}
2101
2102fn default_state_dir() -> PathBuf {
2103    if let Ok(paths) = resolve_shared_paths() {
2104        return paths.engine_state_dir;
2105    }
2106    if let Some(data_dir) = dirs::data_dir() {
2107        return data_dir.join("tandem").join("data");
2108    }
2109    dirs::home_dir()
2110        .map(|home| home.join(".tandem").join("data"))
2111        .unwrap_or_else(|| PathBuf::from(".tandem"))
2112}
2113
2114fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
2115    match schedule {
2116        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
2117        RoutineSchedule::Cron { .. } => None,
2118    }
2119}
2120
2121fn parse_timezone(timezone: &str) -> Option<Tz> {
2122    timezone.trim().parse::<Tz>().ok()
2123}
2124
2125fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
2126    let tz = parse_timezone(timezone)?;
2127    let schedule = Schedule::from_str(expression).ok()?;
2128    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
2129    let local_from = from_dt.with_timezone(&tz);
2130    let next = schedule.after(&local_from).next()?;
2131    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
2132}
2133
2134fn compute_next_schedule_fire_at_ms(
2135    schedule: &RoutineSchedule,
2136    timezone: &str,
2137    from_ms: u64,
2138) -> Option<u64> {
2139    let _ = parse_timezone(timezone)?;
2140    match schedule {
2141        RoutineSchedule::IntervalSeconds { seconds } => {
2142            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
2143        }
2144        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
2145    }
2146}
2147
2148fn compute_misfire_plan_for_schedule(
2149    now_ms: u64,
2150    next_fire_at_ms: u64,
2151    schedule: &RoutineSchedule,
2152    timezone: &str,
2153    policy: &RoutineMisfirePolicy,
2154) -> (u32, u64) {
2155    match schedule {
2156        RoutineSchedule::IntervalSeconds { .. } => {
2157            let Some(interval_ms) = routine_interval_ms(schedule) else {
2158                return (0, next_fire_at_ms);
2159            };
2160            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
2161        }
2162        RoutineSchedule::Cron { expression } => {
2163            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
2164                .unwrap_or_else(|| now_ms.saturating_add(60_000));
2165            match policy {
2166                RoutineMisfirePolicy::Skip => (0, aligned_next),
2167                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
2168                RoutineMisfirePolicy::CatchUp { max_runs } => {
2169                    let mut count = 0u32;
2170                    let mut cursor = next_fire_at_ms;
2171                    while cursor <= now_ms && count < *max_runs {
2172                        count = count.saturating_add(1);
2173                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
2174                            break;
2175                        };
2176                        if next <= cursor {
2177                            break;
2178                        }
2179                        cursor = next;
2180                    }
2181                    (count, aligned_next)
2182                }
2183            }
2184        }
2185    }
2186}
2187
2188fn compute_misfire_plan(
2189    now_ms: u64,
2190    next_fire_at_ms: u64,
2191    interval_ms: u64,
2192    policy: &RoutineMisfirePolicy,
2193) -> (u32, u64) {
2194    if now_ms < next_fire_at_ms || interval_ms == 0 {
2195        return (0, next_fire_at_ms);
2196    }
2197    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
2198    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
2199    match policy {
2200        RoutineMisfirePolicy::Skip => (0, aligned_next),
2201        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
2202        RoutineMisfirePolicy::CatchUp { max_runs } => {
2203            let count = missed.min(u64::from(*max_runs)) as u32;
2204            (count, aligned_next)
2205        }
2206    }
2207}
2208
2209fn auto_generated_agent_name(agent_id: &str) -> String {
2210    let names = [
2211        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
2212    ];
2213    let digest = Sha256::digest(agent_id.as_bytes());
2214    let idx = usize::from(digest[0]) % names.len();
2215    format!("{}-{:02x}", names[idx], digest[1])
2216}
2217
2218fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
2219    match schedule.schedule_type {
2220        AutomationV2ScheduleType::Manual => None,
2221        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
2222            seconds: schedule.interval_seconds.unwrap_or(60),
2223        }),
2224        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
2225            expression: schedule.cron_expression.clone().unwrap_or_default(),
2226        }),
2227    }
2228}
2229
2230fn automation_schedule_next_fire_at_ms(
2231    schedule: &AutomationV2Schedule,
2232    from_ms: u64,
2233) -> Option<u64> {
2234    let routine_schedule = schedule_from_automation_v2(schedule)?;
2235    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
2236}
2237
2238fn automation_schedule_due_count(
2239    schedule: &AutomationV2Schedule,
2240    now_ms: u64,
2241    next_fire_at_ms: u64,
2242) -> u32 {
2243    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
2244        return 0;
2245    };
2246    let (count, _) = compute_misfire_plan_for_schedule(
2247        now_ms,
2248        next_fire_at_ms,
2249        &routine_schedule,
2250        &schedule.timezone,
2251        &schedule.misfire_policy,
2252    );
2253    count.max(1)
2254}
2255
2256#[derive(Debug, Clone, PartialEq, Eq)]
2257pub enum RoutineExecutionDecision {
2258    Allowed,
2259    RequiresApproval { reason: String },
2260    Blocked { reason: String },
2261}
2262
2263pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
2264    let entrypoint = routine.entrypoint.to_ascii_lowercase();
2265    if entrypoint.starts_with("connector.")
2266        || entrypoint.starts_with("integration.")
2267        || entrypoint.contains("external")
2268    {
2269        return true;
2270    }
2271    routine
2272        .args
2273        .get("uses_external_integrations")
2274        .and_then(|v| v.as_bool())
2275        .unwrap_or(false)
2276        || routine
2277            .args
2278            .get("connector_id")
2279            .and_then(|v| v.as_str())
2280            .is_some()
2281}
2282
2283pub fn evaluate_routine_execution_policy(
2284    routine: &RoutineSpec,
2285    trigger_type: &str,
2286) -> RoutineExecutionDecision {
2287    if !routine_uses_external_integrations(routine) {
2288        return RoutineExecutionDecision::Allowed;
2289    }
2290    if !routine.external_integrations_allowed {
2291        return RoutineExecutionDecision::Blocked {
2292            reason: "external integrations are disabled by policy".to_string(),
2293        };
2294    }
2295    if routine.requires_approval {
2296        return RoutineExecutionDecision::RequiresApproval {
2297            reason: format!(
2298                "manual approval required before external side effects ({})",
2299                trigger_type
2300            ),
2301        };
2302    }
2303    RoutineExecutionDecision::Allowed
2304}
2305
2306fn is_valid_resource_key(key: &str) -> bool {
2307    let trimmed = key.trim();
2308    if trimmed.is_empty() {
2309        return false;
2310    }
2311    if trimmed == "swarm.active_tasks" {
2312        return true;
2313    }
2314    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
2315    if !allowed_prefix
2316        .iter()
2317        .any(|prefix| trimmed.starts_with(prefix))
2318    {
2319        return false;
2320    }
2321    !trimmed.contains("//")
2322}
2323
2324impl Deref for AppState {
2325    type Target = RuntimeState;
2326
2327    fn deref(&self) -> &Self::Target {
2328        self.runtime
2329            .get()
2330            .expect("runtime accessed before startup completion")
2331    }
2332}
2333
2334#[derive(Clone)]
2335struct ServerPromptContextHook {
2336    state: AppState,
2337}
2338
2339impl ServerPromptContextHook {
2340    fn new(state: AppState) -> Self {
2341        Self { state }
2342    }
2343
2344    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
2345        let paths = resolve_shared_paths().ok()?;
2346        MemoryDatabase::new(&paths.memory_db_path).await.ok()
2347    }
2348
2349    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
2350        let paths = resolve_shared_paths().ok()?;
2351        tandem_memory::MemoryManager::new(&paths.memory_db_path)
2352            .await
2353            .ok()
2354    }
2355
2356    fn hash_query(input: &str) -> String {
2357        let mut hasher = Sha256::new();
2358        hasher.update(input.as_bytes());
2359        format!("{:x}", hasher.finalize())
2360    }
2361
2362    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
2363        let mut out = vec!["<memory_context>".to_string()];
2364        let mut used = 0usize;
2365        for hit in hits {
2366            let text = hit
2367                .record
2368                .content
2369                .split_whitespace()
2370                .take(60)
2371                .collect::<Vec<_>>()
2372                .join(" ");
2373            let line = format!(
2374                "- [{:.3}] {} (source={}, run={})",
2375                hit.score, text, hit.record.source_type, hit.record.run_id
2376            );
2377            used = used.saturating_add(line.len());
2378            if used > 2200 {
2379                break;
2380            }
2381            out.push(line);
2382        }
2383        out.push("</memory_context>".to_string());
2384        out.join("\n")
2385    }
2386
2387    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
2388        chunk
2389            .metadata
2390            .as_ref()
2391            .and_then(|meta| meta.get("source_url"))
2392            .and_then(Value::as_str)
2393            .map(str::trim)
2394            .filter(|v| !v.is_empty())
2395            .map(ToString::to_string)
2396    }
2397
2398    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
2399        if let Some(path) = chunk
2400            .metadata
2401            .as_ref()
2402            .and_then(|meta| meta.get("relative_path"))
2403            .and_then(Value::as_str)
2404            .map(str::trim)
2405            .filter(|v| !v.is_empty())
2406        {
2407            return path.to_string();
2408        }
2409        chunk
2410            .source
2411            .strip_prefix("guide_docs:")
2412            .unwrap_or(chunk.source.as_str())
2413            .to_string()
2414    }
2415
2416    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
2417        let mut out = vec!["<docs_context>".to_string()];
2418        let mut used = 0usize;
2419        for hit in hits {
2420            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
2421            let path = Self::extract_docs_relative_path(&hit.chunk);
2422            let text = hit
2423                .chunk
2424                .content
2425                .split_whitespace()
2426                .take(70)
2427                .collect::<Vec<_>>()
2428                .join(" ");
2429            let line = format!(
2430                "- [{:.3}] {} (doc_path={}, source_url={})",
2431                hit.similarity, text, path, url
2432            );
2433            used = used.saturating_add(line.len());
2434            if used > 2800 {
2435                break;
2436            }
2437            out.push(line);
2438        }
2439        out.push("</docs_context>".to_string());
2440        out.join("\n")
2441    }
2442
2443    async fn search_embedded_docs(
2444        &self,
2445        query: &str,
2446        limit: usize,
2447    ) -> Vec<tandem_memory::types::MemorySearchResult> {
2448        let Some(manager) = self.open_memory_manager().await else {
2449            return Vec::new();
2450        };
2451        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
2452        manager
2453            .search(
2454                query,
2455                Some(MemoryTier::Global),
2456                None,
2457                None,
2458                Some(search_limit),
2459            )
2460            .await
2461            .unwrap_or_default()
2462            .into_iter()
2463            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
2464            .take(limit)
2465            .collect()
2466    }
2467
2468    fn should_skip_memory_injection(query: &str) -> bool {
2469        let trimmed = query.trim();
2470        if trimmed.is_empty() {
2471            return true;
2472        }
2473        let lower = trimmed.to_ascii_lowercase();
2474        let social = [
2475            "hi",
2476            "hello",
2477            "hey",
2478            "thanks",
2479            "thank you",
2480            "ok",
2481            "okay",
2482            "cool",
2483            "nice",
2484            "yo",
2485            "good morning",
2486            "good afternoon",
2487            "good evening",
2488        ];
2489        lower.len() <= 32 && social.contains(&lower.as_str())
2490    }
2491
2492    fn personality_preset_text(preset: &str) -> &'static str {
2493        match preset {
2494            "concise" => {
2495                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
2496            }
2497            "friendly" => {
2498                "Default style: friendly and supportive while staying technically rigorous and concrete."
2499            }
2500            "mentor" => {
2501                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
2502            }
2503            "critical" => {
2504                "Default style: critical and risk-first. Surface failure modes and assumptions early."
2505            }
2506            _ => {
2507                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
2508            }
2509        }
2510    }
2511
2512    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
2513        let allow_agent_override = agent_name
2514            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
2515            .unwrap_or(false);
2516        let legacy_bot_name = config
2517            .get("bot_name")
2518            .and_then(Value::as_str)
2519            .map(str::trim)
2520            .filter(|v| !v.is_empty());
2521        let bot_name = config
2522            .get("identity")
2523            .and_then(|identity| identity.get("bot"))
2524            .and_then(|bot| bot.get("canonical_name"))
2525            .and_then(Value::as_str)
2526            .map(str::trim)
2527            .filter(|v| !v.is_empty())
2528            .or(legacy_bot_name)
2529            .unwrap_or("Tandem");
2530
2531        let default_profile = config
2532            .get("identity")
2533            .and_then(|identity| identity.get("personality"))
2534            .and_then(|personality| personality.get("default"));
2535        let default_preset = default_profile
2536            .and_then(|profile| profile.get("preset"))
2537            .and_then(Value::as_str)
2538            .map(str::trim)
2539            .filter(|v| !v.is_empty())
2540            .unwrap_or("balanced");
2541        let default_custom = default_profile
2542            .and_then(|profile| profile.get("custom_instructions"))
2543            .and_then(Value::as_str)
2544            .map(str::trim)
2545            .filter(|v| !v.is_empty())
2546            .map(ToString::to_string);
2547        let legacy_persona = config
2548            .get("persona")
2549            .and_then(Value::as_str)
2550            .map(str::trim)
2551            .filter(|v| !v.is_empty())
2552            .map(ToString::to_string);
2553
2554        let per_agent_profile = if allow_agent_override {
2555            agent_name.and_then(|name| {
2556                config
2557                    .get("identity")
2558                    .and_then(|identity| identity.get("personality"))
2559                    .and_then(|personality| personality.get("per_agent"))
2560                    .and_then(|per_agent| per_agent.get(name))
2561            })
2562        } else {
2563            None
2564        };
2565        let preset = per_agent_profile
2566            .and_then(|profile| profile.get("preset"))
2567            .and_then(Value::as_str)
2568            .map(str::trim)
2569            .filter(|v| !v.is_empty())
2570            .unwrap_or(default_preset);
2571        let custom = per_agent_profile
2572            .and_then(|profile| profile.get("custom_instructions"))
2573            .and_then(Value::as_str)
2574            .map(str::trim)
2575            .filter(|v| !v.is_empty())
2576            .map(ToString::to_string)
2577            .or(default_custom)
2578            .or(legacy_persona);
2579
2580        let mut lines = vec![
2581            format!("You are {bot_name}, an AI assistant."),
2582            Self::personality_preset_text(preset).to_string(),
2583        ];
2584        if let Some(custom) = custom {
2585            lines.push(format!("Additional personality instructions: {custom}"));
2586        }
2587        Some(lines.join("\n"))
2588    }
2589}
2590
2591impl PromptContextHook for ServerPromptContextHook {
2592    fn augment_provider_messages(
2593        &self,
2594        ctx: PromptContextHookContext,
2595        mut messages: Vec<ChatMessage>,
2596    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
2597        let this = self.clone();
2598        Box::pin(async move {
2599            // Startup can invoke prompt plumbing before RuntimeState is installed.
2600            // Never panic from context hooks; fail-open and continue without augmentation.
2601            if !this.state.is_ready() {
2602                return Ok(messages);
2603            }
2604            let run = this.state.run_registry.get(&ctx.session_id).await;
2605            let Some(run) = run else {
2606                return Ok(messages);
2607            };
2608            let config = this.state.config.get_effective_value().await;
2609            if let Some(identity_block) =
2610                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
2611            {
2612                messages.push(ChatMessage {
2613                    role: "system".to_string(),
2614                    content: identity_block,
2615                    attachments: Vec::new(),
2616                });
2617            }
2618            let run_id = run.run_id;
2619            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
2620            let query = messages
2621                .iter()
2622                .rev()
2623                .find(|m| m.role == "user")
2624                .map(|m| m.content.clone())
2625                .unwrap_or_default();
2626            if query.trim().is_empty() {
2627                return Ok(messages);
2628            }
2629            if Self::should_skip_memory_injection(&query) {
2630                return Ok(messages);
2631            }
2632
2633            let docs_hits = this.search_embedded_docs(&query, 6).await;
2634            if !docs_hits.is_empty() {
2635                let docs_block = Self::build_docs_memory_block(&docs_hits);
2636                messages.push(ChatMessage {
2637                    role: "system".to_string(),
2638                    content: docs_block.clone(),
2639                    attachments: Vec::new(),
2640                });
2641                this.state.event_bus.publish(EngineEvent::new(
2642                    "memory.docs.context.injected",
2643                    json!({
2644                        "runID": run_id,
2645                        "sessionID": ctx.session_id,
2646                        "messageID": ctx.message_id,
2647                        "iteration": ctx.iteration,
2648                        "count": docs_hits.len(),
2649                        "tokenSizeApprox": docs_block.split_whitespace().count(),
2650                        "sourcePrefix": "guide_docs:"
2651                    }),
2652                ));
2653                return Ok(messages);
2654            }
2655
2656            let Some(db) = this.open_memory_db().await else {
2657                return Ok(messages);
2658            };
2659            let started = now_ms();
2660            let hits = db
2661                .search_global_memory(&user_id, &query, 8, None, None, None)
2662                .await
2663                .unwrap_or_default();
2664            let latency_ms = now_ms().saturating_sub(started);
2665            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
2666            this.state.event_bus.publish(EngineEvent::new(
2667                "memory.search.performed",
2668                json!({
2669                    "runID": run_id,
2670                    "sessionID": ctx.session_id,
2671                    "messageID": ctx.message_id,
2672                    "providerID": ctx.provider_id,
2673                    "modelID": ctx.model_id,
2674                    "iteration": ctx.iteration,
2675                    "queryHash": Self::hash_query(&query),
2676                    "resultCount": hits.len(),
2677                    "scoreMin": scores.iter().copied().reduce(f64::min),
2678                    "scoreMax": scores.iter().copied().reduce(f64::max),
2679                    "scores": scores,
2680                    "latencyMs": latency_ms,
2681                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
2682                }),
2683            ));
2684
2685            if hits.is_empty() {
2686                return Ok(messages);
2687            }
2688
2689            let memory_block = Self::build_memory_block(&hits);
2690            messages.push(ChatMessage {
2691                role: "system".to_string(),
2692                content: memory_block.clone(),
2693                attachments: Vec::new(),
2694            });
2695            this.state.event_bus.publish(EngineEvent::new(
2696                "memory.context.injected",
2697                json!({
2698                    "runID": run_id,
2699                    "sessionID": ctx.session_id,
2700                    "messageID": ctx.message_id,
2701                    "iteration": ctx.iteration,
2702                    "count": hits.len(),
2703                    "tokenSizeApprox": memory_block.split_whitespace().count(),
2704                }),
2705            ));
2706            Ok(messages)
2707        })
2708    }
2709}
2710
2711fn extract_event_session_id(properties: &Value) -> Option<String> {
2712    properties
2713        .get("sessionID")
2714        .or_else(|| properties.get("sessionId"))
2715        .or_else(|| properties.get("id"))
2716        .and_then(|v| v.as_str())
2717        .map(|s| s.to_string())
2718}
2719
2720fn extract_event_run_id(properties: &Value) -> Option<String> {
2721    properties
2722        .get("runID")
2723        .or_else(|| properties.get("run_id"))
2724        .and_then(|v| v.as_str())
2725        .map(|s| s.to_string())
2726}
2727
2728fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
2729    let session_id = extract_event_session_id(&event.properties)?;
2730    let run_id = extract_event_run_id(&event.properties);
2731    let key = format!("run/{session_id}/status");
2732
2733    let mut base = serde_json::Map::new();
2734    base.insert("sessionID".to_string(), Value::String(session_id));
2735    if let Some(run_id) = run_id {
2736        base.insert("runID".to_string(), Value::String(run_id));
2737    }
2738
2739    match event.event_type.as_str() {
2740        "session.run.started" => {
2741            base.insert("state".to_string(), Value::String("running".to_string()));
2742            base.insert("phase".to_string(), Value::String("run".to_string()));
2743            base.insert(
2744                "eventType".to_string(),
2745                Value::String("session.run.started".to_string()),
2746            );
2747            Some(StatusIndexUpdate {
2748                key,
2749                value: Value::Object(base),
2750            })
2751        }
2752        "session.run.finished" => {
2753            base.insert("state".to_string(), Value::String("finished".to_string()));
2754            base.insert("phase".to_string(), Value::String("run".to_string()));
2755            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
2756                base.insert("result".to_string(), Value::String(status.to_string()));
2757            }
2758            base.insert(
2759                "eventType".to_string(),
2760                Value::String("session.run.finished".to_string()),
2761            );
2762            Some(StatusIndexUpdate {
2763                key,
2764                value: Value::Object(base),
2765            })
2766        }
2767        "message.part.updated" => {
2768            let part_type = event
2769                .properties
2770                .get("part")
2771                .and_then(|v| v.get("type"))
2772                .and_then(|v| v.as_str())?;
2773            let (phase, tool_active) = match part_type {
2774                "tool-invocation" => ("tool", true),
2775                "tool-result" => ("run", false),
2776                _ => return None,
2777            };
2778            base.insert("state".to_string(), Value::String("running".to_string()));
2779            base.insert("phase".to_string(), Value::String(phase.to_string()));
2780            base.insert("toolActive".to_string(), Value::Bool(tool_active));
2781            if let Some(tool_name) = event
2782                .properties
2783                .get("part")
2784                .and_then(|v| v.get("tool"))
2785                .and_then(|v| v.as_str())
2786            {
2787                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
2788            }
2789            base.insert(
2790                "eventType".to_string(),
2791                Value::String("message.part.updated".to_string()),
2792            );
2793            Some(StatusIndexUpdate {
2794                key,
2795                value: Value::Object(base),
2796            })
2797        }
2798        _ => None,
2799    }
2800}
2801
2802pub async fn run_status_indexer(state: AppState) {
2803    let mut rx = state.event_bus.subscribe();
2804    loop {
2805        match rx.recv().await {
2806            Ok(event) => {
2807                if let Some(update) = derive_status_index_update(&event) {
2808                    if let Err(error) = state
2809                        .put_shared_resource(
2810                            update.key,
2811                            update.value,
2812                            None,
2813                            "system.status_indexer".to_string(),
2814                            None,
2815                        )
2816                        .await
2817                    {
2818                        tracing::warn!("status indexer failed to persist update: {error:?}");
2819                    }
2820                }
2821            }
2822            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2823            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2824        }
2825    }
2826}
2827
2828pub async fn run_agent_team_supervisor(state: AppState) {
2829    let mut rx = state.event_bus.subscribe();
2830    loop {
2831        match rx.recv().await {
2832            Ok(event) => {
2833                state.agent_teams.handle_engine_event(&state, &event).await;
2834            }
2835            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2836            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2837        }
2838    }
2839}
2840
2841pub async fn run_usage_aggregator(state: AppState) {
2842    let mut rx = state.event_bus.subscribe();
2843    loop {
2844        match rx.recv().await {
2845            Ok(event) => {
2846                if event.event_type != "provider.usage" {
2847                    continue;
2848                }
2849                let session_id = event
2850                    .properties
2851                    .get("sessionID")
2852                    .and_then(|v| v.as_str())
2853                    .unwrap_or("");
2854                if session_id.is_empty() {
2855                    continue;
2856                }
2857                let prompt_tokens = event
2858                    .properties
2859                    .get("promptTokens")
2860                    .and_then(|v| v.as_u64())
2861                    .unwrap_or(0);
2862                let completion_tokens = event
2863                    .properties
2864                    .get("completionTokens")
2865                    .and_then(|v| v.as_u64())
2866                    .unwrap_or(0);
2867                let total_tokens = event
2868                    .properties
2869                    .get("totalTokens")
2870                    .and_then(|v| v.as_u64())
2871                    .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
2872                state
2873                    .apply_provider_usage_to_runs(
2874                        session_id,
2875                        prompt_tokens,
2876                        completion_tokens,
2877                        total_tokens,
2878                    )
2879                    .await;
2880            }
2881            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2882            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2883        }
2884    }
2885}
2886
2887pub async fn run_routine_scheduler(state: AppState) {
2888    loop {
2889        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
2890        let now = now_ms();
2891        let plans = state.evaluate_routine_misfires(now).await;
2892        for plan in plans {
2893            let Some(routine) = state.get_routine(&plan.routine_id).await else {
2894                continue;
2895            };
2896            match evaluate_routine_execution_policy(&routine, "scheduled") {
2897                RoutineExecutionDecision::Allowed => {
2898                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
2899                    let run = state
2900                        .create_routine_run(
2901                            &routine,
2902                            "scheduled",
2903                            plan.run_count,
2904                            RoutineRunStatus::Queued,
2905                            None,
2906                        )
2907                        .await;
2908                    state
2909                        .append_routine_history(RoutineHistoryEvent {
2910                            routine_id: plan.routine_id.clone(),
2911                            trigger_type: "scheduled".to_string(),
2912                            run_count: plan.run_count,
2913                            fired_at_ms: now,
2914                            status: "queued".to_string(),
2915                            detail: None,
2916                        })
2917                        .await;
2918                    state.event_bus.publish(EngineEvent::new(
2919                        "routine.fired",
2920                        serde_json::json!({
2921                            "routineID": plan.routine_id,
2922                            "runID": run.run_id,
2923                            "runCount": plan.run_count,
2924                            "scheduledAtMs": plan.scheduled_at_ms,
2925                            "nextFireAtMs": plan.next_fire_at_ms,
2926                        }),
2927                    ));
2928                    state.event_bus.publish(EngineEvent::new(
2929                        "routine.run.created",
2930                        serde_json::json!({
2931                            "run": run,
2932                        }),
2933                    ));
2934                }
2935                RoutineExecutionDecision::RequiresApproval { reason } => {
2936                    let run = state
2937                        .create_routine_run(
2938                            &routine,
2939                            "scheduled",
2940                            plan.run_count,
2941                            RoutineRunStatus::PendingApproval,
2942                            Some(reason.clone()),
2943                        )
2944                        .await;
2945                    state
2946                        .append_routine_history(RoutineHistoryEvent {
2947                            routine_id: plan.routine_id.clone(),
2948                            trigger_type: "scheduled".to_string(),
2949                            run_count: plan.run_count,
2950                            fired_at_ms: now,
2951                            status: "pending_approval".to_string(),
2952                            detail: Some(reason.clone()),
2953                        })
2954                        .await;
2955                    state.event_bus.publish(EngineEvent::new(
2956                        "routine.approval_required",
2957                        serde_json::json!({
2958                            "routineID": plan.routine_id,
2959                            "runID": run.run_id,
2960                            "runCount": plan.run_count,
2961                            "triggerType": "scheduled",
2962                            "reason": reason,
2963                        }),
2964                    ));
2965                    state.event_bus.publish(EngineEvent::new(
2966                        "routine.run.created",
2967                        serde_json::json!({
2968                            "run": run,
2969                        }),
2970                    ));
2971                }
2972                RoutineExecutionDecision::Blocked { reason } => {
2973                    let run = state
2974                        .create_routine_run(
2975                            &routine,
2976                            "scheduled",
2977                            plan.run_count,
2978                            RoutineRunStatus::BlockedPolicy,
2979                            Some(reason.clone()),
2980                        )
2981                        .await;
2982                    state
2983                        .append_routine_history(RoutineHistoryEvent {
2984                            routine_id: plan.routine_id.clone(),
2985                            trigger_type: "scheduled".to_string(),
2986                            run_count: plan.run_count,
2987                            fired_at_ms: now,
2988                            status: "blocked_policy".to_string(),
2989                            detail: Some(reason.clone()),
2990                        })
2991                        .await;
2992                    state.event_bus.publish(EngineEvent::new(
2993                        "routine.blocked",
2994                        serde_json::json!({
2995                            "routineID": plan.routine_id,
2996                            "runID": run.run_id,
2997                            "runCount": plan.run_count,
2998                            "triggerType": "scheduled",
2999                            "reason": reason,
3000                        }),
3001                    ));
3002                    state.event_bus.publish(EngineEvent::new(
3003                        "routine.run.created",
3004                        serde_json::json!({
3005                            "run": run,
3006                        }),
3007                    ));
3008                }
3009            }
3010        }
3011    }
3012}
3013
3014pub async fn run_routine_executor(state: AppState) {
3015    loop {
3016        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
3017        let Some(run) = state.claim_next_queued_routine_run().await else {
3018            continue;
3019        };
3020
3021        state.event_bus.publish(EngineEvent::new(
3022            "routine.run.started",
3023            serde_json::json!({
3024                "runID": run.run_id,
3025                "routineID": run.routine_id,
3026                "triggerType": run.trigger_type,
3027                "startedAtMs": now_ms(),
3028            }),
3029        ));
3030
3031        let workspace_root = state.workspace_index.snapshot().await.root;
3032        let mut session = Session::new(
3033            Some(format!("Routine {}", run.routine_id)),
3034            Some(workspace_root.clone()),
3035        );
3036        let session_id = session.id.clone();
3037        session.workspace_root = Some(workspace_root);
3038
3039        if let Err(error) = state.storage.save_session(session).await {
3040            let detail = format!("failed to create routine session: {error}");
3041            let _ = state
3042                .update_routine_run_status(
3043                    &run.run_id,
3044                    RoutineRunStatus::Failed,
3045                    Some(detail.clone()),
3046                )
3047                .await;
3048            state.event_bus.publish(EngineEvent::new(
3049                "routine.run.failed",
3050                serde_json::json!({
3051                    "runID": run.run_id,
3052                    "routineID": run.routine_id,
3053                    "reason": detail,
3054                }),
3055            ));
3056            continue;
3057        }
3058
3059        state
3060            .set_routine_session_policy(
3061                session_id.clone(),
3062                run.run_id.clone(),
3063                run.routine_id.clone(),
3064                run.allowed_tools.clone(),
3065            )
3066            .await;
3067        state
3068            .add_active_session_id(&run.run_id, session_id.clone())
3069            .await;
3070        state
3071            .engine_loop
3072            .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
3073            .await;
3074
3075        let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
3076        if let Some(spec) = selected_model.as_ref() {
3077            state.event_bus.publish(EngineEvent::new(
3078                "routine.run.model_selected",
3079                serde_json::json!({
3080                    "runID": run.run_id,
3081                    "routineID": run.routine_id,
3082                    "providerID": spec.provider_id,
3083                    "modelID": spec.model_id,
3084                    "source": model_source,
3085                }),
3086            ));
3087        }
3088
3089        let request = SendMessageRequest {
3090            parts: vec![MessagePartInput::Text {
3091                text: build_routine_prompt(&state, &run).await,
3092            }],
3093            model: selected_model,
3094            agent: None,
3095            tool_mode: None,
3096            tool_allowlist: None,
3097            context_mode: None,
3098        };
3099
3100        let run_result = state
3101            .engine_loop
3102            .run_prompt_async_with_context(
3103                session_id.clone(),
3104                request,
3105                Some(format!("routine:{}", run.run_id)),
3106            )
3107            .await;
3108
3109        state.clear_routine_session_policy(&session_id).await;
3110        state
3111            .clear_active_session_id(&run.run_id, &session_id)
3112            .await;
3113        state
3114            .engine_loop
3115            .clear_session_allowed_tools(&session_id)
3116            .await;
3117
3118        match run_result {
3119            Ok(()) => {
3120                append_configured_output_artifacts(&state, &run).await;
3121                let _ = state
3122                    .update_routine_run_status(
3123                        &run.run_id,
3124                        RoutineRunStatus::Completed,
3125                        Some("routine run completed".to_string()),
3126                    )
3127                    .await;
3128                state.event_bus.publish(EngineEvent::new(
3129                    "routine.run.completed",
3130                    serde_json::json!({
3131                        "runID": run.run_id,
3132                        "routineID": run.routine_id,
3133                        "sessionID": session_id,
3134                        "finishedAtMs": now_ms(),
3135                    }),
3136                ));
3137            }
3138            Err(error) => {
3139                if let Some(latest) = state.get_routine_run(&run.run_id).await {
3140                    if latest.status == RoutineRunStatus::Paused {
3141                        state.event_bus.publish(EngineEvent::new(
3142                            "routine.run.paused",
3143                            serde_json::json!({
3144                                "runID": run.run_id,
3145                                "routineID": run.routine_id,
3146                                "sessionID": session_id,
3147                                "finishedAtMs": now_ms(),
3148                            }),
3149                        ));
3150                        continue;
3151                    }
3152                }
3153                let detail = truncate_text(&error.to_string(), 500);
3154                let _ = state
3155                    .update_routine_run_status(
3156                        &run.run_id,
3157                        RoutineRunStatus::Failed,
3158                        Some(detail.clone()),
3159                    )
3160                    .await;
3161                state.event_bus.publish(EngineEvent::new(
3162                    "routine.run.failed",
3163                    serde_json::json!({
3164                        "runID": run.run_id,
3165                        "routineID": run.routine_id,
3166                        "sessionID": session_id,
3167                        "reason": detail,
3168                        "finishedAtMs": now_ms(),
3169                    }),
3170                ));
3171            }
3172        }
3173    }
3174}
3175
3176pub async fn run_automation_v2_scheduler(state: AppState) {
3177    loop {
3178        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
3179        let now = now_ms();
3180        let due = state.evaluate_automation_v2_misfires(now).await;
3181        for automation_id in due {
3182            let Some(automation) = state.get_automation_v2(&automation_id).await else {
3183                continue;
3184            };
3185            if let Ok(run) = state
3186                .create_automation_v2_run(&automation, "scheduled")
3187                .await
3188            {
3189                state.event_bus.publish(EngineEvent::new(
3190                    "automation.v2.run.created",
3191                    serde_json::json!({
3192                        "automationID": automation_id,
3193                        "run": run,
3194                        "triggerType": "scheduled",
3195                    }),
3196                ));
3197            }
3198        }
3199    }
3200}
3201
3202async fn execute_automation_v2_node(
3203    state: &AppState,
3204    run_id: &str,
3205    automation: &AutomationV2Spec,
3206    node: &AutomationFlowNode,
3207    agent: &AutomationAgentProfile,
3208) -> anyhow::Result<Value> {
3209    let workspace_root = state.workspace_index.snapshot().await.root;
3210    let mut session = Session::new(
3211        Some(format!(
3212            "Automation {} / {}",
3213            automation.automation_id, node.node_id
3214        )),
3215        Some(workspace_root.clone()),
3216    );
3217    let session_id = session.id.clone();
3218    session.workspace_root = Some(workspace_root);
3219    state.storage.save_session(session).await?;
3220
3221    state.add_automation_v2_session(run_id, &session_id).await;
3222
3223    let mut allowlist = agent.tool_policy.allowlist.clone();
3224    if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
3225        allowlist.extend(mcp_tools.clone());
3226    }
3227    state
3228        .engine_loop
3229        .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
3230        .await;
3231
3232    let model = agent
3233        .model_policy
3234        .as_ref()
3235        .and_then(|policy| policy.get("default_model"))
3236        .and_then(parse_model_spec);
3237    let prompt = format!(
3238        "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}",
3239        automation.automation_id, run_id, node.node_id, agent.display_name, node.objective
3240    );
3241    let req = SendMessageRequest {
3242        parts: vec![MessagePartInput::Text { text: prompt }],
3243        model,
3244        agent: None,
3245        tool_mode: None,
3246        tool_allowlist: None,
3247        context_mode: None,
3248    };
3249    let result = state
3250        .engine_loop
3251        .run_prompt_async_with_context(
3252            session_id.clone(),
3253            req,
3254            Some(format!("automation-v2:{run_id}")),
3255        )
3256        .await;
3257
3258    state
3259        .engine_loop
3260        .clear_session_allowed_tools(&session_id)
3261        .await;
3262    state.clear_automation_v2_session(run_id, &session_id).await;
3263
3264    result.map(|_| {
3265        serde_json::json!({
3266            "sessionID": session_id,
3267            "status": "completed",
3268        })
3269    })
3270}
3271
3272pub async fn run_automation_v2_executor(state: AppState) {
3273    loop {
3274        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3275        let Some(run) = state.claim_next_queued_automation_v2_run().await else {
3276            continue;
3277        };
3278        let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
3279            let _ = state
3280                .update_automation_v2_run(&run.run_id, |row| {
3281                    row.status = AutomationRunStatus::Failed;
3282                    row.detail = Some("automation not found".to_string());
3283                })
3284                .await;
3285            continue;
3286        };
3287        let max_parallel = automation
3288            .execution
3289            .max_parallel_agents
3290            .unwrap_or(1)
3291            .clamp(1, 16) as usize;
3292
3293        loop {
3294            let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
3295                break;
3296            };
3297            if matches!(
3298                latest.status,
3299                AutomationRunStatus::Paused
3300                    | AutomationRunStatus::Pausing
3301                    | AutomationRunStatus::Cancelled
3302                    | AutomationRunStatus::Failed
3303                    | AutomationRunStatus::Completed
3304            ) {
3305                break;
3306            }
3307            if latest.checkpoint.pending_nodes.is_empty() {
3308                let _ = state
3309                    .update_automation_v2_run(&run.run_id, |row| {
3310                        row.status = AutomationRunStatus::Completed;
3311                        row.detail = Some("automation run completed".to_string());
3312                    })
3313                    .await;
3314                break;
3315            }
3316
3317            let completed = latest
3318                .checkpoint
3319                .completed_nodes
3320                .iter()
3321                .cloned()
3322                .collect::<std::collections::HashSet<_>>();
3323            let pending = latest.checkpoint.pending_nodes.clone();
3324            let runnable = pending
3325                .iter()
3326                .filter_map(|node_id| {
3327                    let node = automation
3328                        .flow
3329                        .nodes
3330                        .iter()
3331                        .find(|n| n.node_id == *node_id)?;
3332                    if node.depends_on.iter().all(|dep| completed.contains(dep)) {
3333                        Some(node.clone())
3334                    } else {
3335                        None
3336                    }
3337                })
3338                .take(max_parallel)
3339                .collect::<Vec<_>>();
3340
3341            if runnable.is_empty() {
3342                let _ = state
3343                    .update_automation_v2_run(&run.run_id, |row| {
3344                        row.status = AutomationRunStatus::Failed;
3345                        row.detail = Some("flow deadlock: no runnable nodes".to_string());
3346                    })
3347                    .await;
3348                break;
3349            }
3350
3351            let tasks = runnable
3352                .iter()
3353                .map(|node| {
3354                    let Some(agent) = automation
3355                        .agents
3356                        .iter()
3357                        .find(|a| a.agent_id == node.agent_id)
3358                        .cloned()
3359                    else {
3360                        return futures::future::ready((
3361                            node.node_id.clone(),
3362                            Err(anyhow::anyhow!("agent not found")),
3363                        ))
3364                        .boxed();
3365                    };
3366                    let state = state.clone();
3367                    let run_id = run.run_id.clone();
3368                    let automation = automation.clone();
3369                    let node = node.clone();
3370                    async move {
3371                        let result =
3372                            execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
3373                                .await;
3374                        (node.node_id, result)
3375                    }
3376                    .boxed()
3377                })
3378                .collect::<Vec<_>>();
3379            let outcomes = join_all(tasks).await;
3380
3381            let mut any_failed = false;
3382            for (node_id, result) in outcomes {
3383                match result {
3384                    Ok(output) => {
3385                        let _ = state
3386                            .update_automation_v2_run(&run.run_id, |row| {
3387                                row.checkpoint.pending_nodes.retain(|id| id != &node_id);
3388                                if !row
3389                                    .checkpoint
3390                                    .completed_nodes
3391                                    .iter()
3392                                    .any(|id| id == &node_id)
3393                                {
3394                                    row.checkpoint.completed_nodes.push(node_id.clone());
3395                                }
3396                                row.checkpoint.node_outputs.insert(node_id.clone(), output);
3397                            })
3398                            .await;
3399                    }
3400                    Err(error) => {
3401                        any_failed = true;
3402                        let is_paused = state
3403                            .get_automation_v2_run(&run.run_id)
3404                            .await
3405                            .map(|row| row.status == AutomationRunStatus::Paused)
3406                            .unwrap_or(false);
3407                        if is_paused {
3408                            break;
3409                        }
3410                        let detail = truncate_text(&error.to_string(), 500);
3411                        let _ = state
3412                            .update_automation_v2_run(&run.run_id, |row| {
3413                                row.status = AutomationRunStatus::Failed;
3414                                row.detail = Some(detail.clone());
3415                            })
3416                            .await;
3417                    }
3418                }
3419            }
3420            if any_failed {
3421                break;
3422            }
3423        }
3424    }
3425}
3426
3427async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
3428    let normalized_entrypoint = run.entrypoint.trim();
3429    let known_tool = state
3430        .tools
3431        .list()
3432        .await
3433        .into_iter()
3434        .any(|schema| schema.name == normalized_entrypoint);
3435    if known_tool {
3436        let args = if run.args.is_object() {
3437            run.args.clone()
3438        } else {
3439            serde_json::json!({})
3440        };
3441        return format!("/tool {} {}", normalized_entrypoint, args);
3442    }
3443
3444    if let Some(objective) = routine_objective_from_args(run) {
3445        return build_routine_mission_prompt(run, &objective);
3446    }
3447
3448    format!(
3449        "Execute routine '{}' using entrypoint '{}' with args: {}",
3450        run.routine_id, run.entrypoint, run.args
3451    )
3452}
3453
3454fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
3455    run.args
3456        .get("prompt")
3457        .and_then(|v| v.as_str())
3458        .map(str::trim)
3459        .filter(|v| !v.is_empty())
3460        .map(ToString::to_string)
3461}
3462
3463fn routine_mode_from_args(args: &Value) -> &str {
3464    args.get("mode")
3465        .and_then(|v| v.as_str())
3466        .map(str::trim)
3467        .filter(|v| !v.is_empty())
3468        .unwrap_or("standalone")
3469}
3470
3471fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
3472    args.get("success_criteria")
3473        .and_then(|v| v.as_array())
3474        .map(|rows| {
3475            rows.iter()
3476                .filter_map(|row| row.as_str())
3477                .map(str::trim)
3478                .filter(|row| !row.is_empty())
3479                .map(ToString::to_string)
3480                .collect::<Vec<_>>()
3481        })
3482        .unwrap_or_default()
3483}
3484
3485fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
3486    let mode = routine_mode_from_args(&run.args);
3487    let success_criteria = routine_success_criteria_from_args(&run.args);
3488    let orchestrator_only_tool_calls = run
3489        .args
3490        .get("orchestrator_only_tool_calls")
3491        .and_then(|v| v.as_bool())
3492        .unwrap_or(false);
3493
3494    let mut lines = vec![
3495        format!("Automation ID: {}", run.routine_id),
3496        format!("Run ID: {}", run.run_id),
3497        format!("Mode: {}", mode),
3498        format!("Mission Objective: {}", objective),
3499    ];
3500
3501    if !success_criteria.is_empty() {
3502        lines.push("Success Criteria:".to_string());
3503        for criterion in success_criteria {
3504            lines.push(format!("- {}", criterion));
3505        }
3506    }
3507
3508    if run.allowed_tools.is_empty() {
3509        lines.push("Allowed Tools: all available by current policy".to_string());
3510    } else {
3511        lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
3512    }
3513
3514    if run.output_targets.is_empty() {
3515        lines.push("Output Targets: none configured".to_string());
3516    } else {
3517        lines.push("Output Targets:".to_string());
3518        for target in &run.output_targets {
3519            lines.push(format!("- {}", target));
3520        }
3521    }
3522
3523    if mode.eq_ignore_ascii_case("orchestrated") {
3524        lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
3525        lines
3526            .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
3527        if orchestrator_only_tool_calls {
3528            lines.push(
3529                "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
3530                    .to_string(),
3531            );
3532        }
3533    } else {
3534        lines.push("Execution Pattern: Standalone mission run".to_string());
3535    }
3536
3537    lines.push(
3538        "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
3539            .to_string(),
3540    );
3541
3542    lines.join("\n")
3543}
3544
3545fn truncate_text(input: &str, max_len: usize) -> String {
3546    if input.len() <= max_len {
3547        return input.to_string();
3548    }
3549    let mut out = input[..max_len].to_string();
3550    out.push_str("...<truncated>");
3551    out
3552}
3553
3554async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
3555    if run.output_targets.is_empty() {
3556        return;
3557    }
3558    for target in &run.output_targets {
3559        let artifact = RoutineRunArtifact {
3560            artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
3561            uri: target.clone(),
3562            kind: "output_target".to_string(),
3563            label: Some("configured output target".to_string()),
3564            created_at_ms: now_ms(),
3565            metadata: Some(serde_json::json!({
3566                "source": "routine.output_targets",
3567                "runID": run.run_id,
3568                "routineID": run.routine_id,
3569            })),
3570        };
3571        let _ = state
3572            .append_routine_run_artifact(&run.run_id, artifact.clone())
3573            .await;
3574        state.event_bus.publish(EngineEvent::new(
3575            "routine.run.artifact_added",
3576            serde_json::json!({
3577                "runID": run.run_id,
3578                "routineID": run.routine_id,
3579                "artifact": artifact,
3580            }),
3581        ));
3582    }
3583}
3584
3585fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
3586    let obj = value.as_object()?;
3587    let provider_id = obj.get("provider_id")?.as_str()?.trim();
3588    let model_id = obj.get("model_id")?.as_str()?.trim();
3589    if provider_id.is_empty() || model_id.is_empty() {
3590        return None;
3591    }
3592    Some(ModelSpec {
3593        provider_id: provider_id.to_string(),
3594        model_id: model_id.to_string(),
3595    })
3596}
3597
3598fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
3599    args.get("model_policy")
3600        .and_then(|v| v.get("role_models"))
3601        .and_then(|v| v.get(role))
3602        .and_then(parse_model_spec)
3603}
3604
3605fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
3606    args.get("model_policy")
3607        .and_then(|v| v.get("default_model"))
3608        .and_then(parse_model_spec)
3609}
3610
3611fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
3612    let provider_id = config
3613        .get("default_provider")
3614        .and_then(|v| v.as_str())
3615        .map(str::trim)
3616        .filter(|v| !v.is_empty())?;
3617    let model_id = config
3618        .get("providers")
3619        .and_then(|v| v.get(provider_id))
3620        .and_then(|v| v.get("default_model"))
3621        .and_then(|v| v.as_str())
3622        .map(str::trim)
3623        .filter(|v| !v.is_empty())?;
3624    Some(ModelSpec {
3625        provider_id: provider_id.to_string(),
3626        model_id: model_id.to_string(),
3627    })
3628}
3629
3630fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
3631    providers.iter().any(|provider| {
3632        provider.id == spec.provider_id
3633            && provider
3634                .models
3635                .iter()
3636                .any(|model| model.id == spec.model_id)
3637    })
3638}
3639
3640async fn resolve_routine_model_spec_for_run(
3641    state: &AppState,
3642    run: &RoutineRunRecord,
3643) -> (Option<ModelSpec>, String) {
3644    let providers = state.providers.list().await;
3645    let mode = routine_mode_from_args(&run.args);
3646    let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
3647
3648    if mode.eq_ignore_ascii_case("orchestrated") {
3649        if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
3650            requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
3651        }
3652    }
3653    if let Some(default_model) = default_model_spec_from_args(&run.args) {
3654        requested.push((default_model, "args.model_policy.default_model"));
3655    }
3656    let effective_config = state.config.get_effective_value().await;
3657    if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
3658        requested.push((config_default, "config.default_provider"));
3659    }
3660
3661    for (candidate, source) in requested {
3662        if provider_catalog_has_model(&providers, &candidate) {
3663            return (Some(candidate), source.to_string());
3664        }
3665    }
3666
3667    let fallback = providers
3668        .into_iter()
3669        .find(|provider| !provider.models.is_empty())
3670        .and_then(|provider| {
3671            let model = provider.models.first()?;
3672            Some(ModelSpec {
3673                provider_id: provider.id,
3674                model_id: model.id.clone(),
3675            })
3676        });
3677
3678    (fallback, "provider_catalog_fallback".to_string())
3679}
3680
3681#[cfg(test)]
3682mod tests {
3683    use super::*;
3684
3685    fn test_state_with_path(path: PathBuf) -> AppState {
3686        let mut state = AppState::new_starting("test-attempt".to_string(), true);
3687        state.shared_resources_path = path;
3688        state.routines_path = tmp_routines_file("shared-state");
3689        state.routine_history_path = tmp_routines_file("routine-history");
3690        state.routine_runs_path = tmp_routines_file("routine-runs");
3691        state
3692    }
3693
3694    fn tmp_resource_file(name: &str) -> PathBuf {
3695        std::env::temp_dir().join(format!(
3696            "tandem-server-{name}-{}.json",
3697            uuid::Uuid::new_v4()
3698        ))
3699    }
3700
3701    fn tmp_routines_file(name: &str) -> PathBuf {
3702        std::env::temp_dir().join(format!(
3703            "tandem-server-routines-{name}-{}.json",
3704            uuid::Uuid::new_v4()
3705        ))
3706    }
3707
3708    #[test]
3709    fn default_model_spec_from_effective_config_reads_default_route() {
3710        let cfg = serde_json::json!({
3711            "default_provider": "openrouter",
3712            "providers": {
3713                "openrouter": {
3714                    "default_model": "google/gemini-3-flash-preview"
3715                }
3716            }
3717        });
3718        let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
3719        assert_eq!(spec.provider_id, "openrouter");
3720        assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
3721    }
3722
3723    #[test]
3724    fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
3725        let missing_provider = serde_json::json!({
3726            "providers": {
3727                "openrouter": {
3728                    "default_model": "google/gemini-3-flash-preview"
3729                }
3730            }
3731        });
3732        assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
3733
3734        let missing_model = serde_json::json!({
3735            "default_provider": "openrouter",
3736            "providers": {
3737                "openrouter": {}
3738            }
3739        });
3740        assert!(default_model_spec_from_effective_config(&missing_model).is_none());
3741    }
3742
3743    #[tokio::test]
3744    async fn shared_resource_put_increments_revision() {
3745        let path = tmp_resource_file("shared-resource-put");
3746        let state = test_state_with_path(path.clone());
3747
3748        let first = state
3749            .put_shared_resource(
3750                "project/demo/board".to_string(),
3751                serde_json::json!({"status":"todo"}),
3752                None,
3753                "agent-1".to_string(),
3754                None,
3755            )
3756            .await
3757            .expect("first put");
3758        assert_eq!(first.rev, 1);
3759
3760        let second = state
3761            .put_shared_resource(
3762                "project/demo/board".to_string(),
3763                serde_json::json!({"status":"doing"}),
3764                Some(1),
3765                "agent-2".to_string(),
3766                Some(60_000),
3767            )
3768            .await
3769            .expect("second put");
3770        assert_eq!(second.rev, 2);
3771        assert_eq!(second.updated_by, "agent-2");
3772        assert_eq!(second.ttl_ms, Some(60_000));
3773
3774        let raw = tokio::fs::read_to_string(path.clone())
3775            .await
3776            .expect("persisted");
3777        assert!(raw.contains("\"rev\": 2"));
3778        let _ = tokio::fs::remove_file(path).await;
3779    }
3780
3781    #[tokio::test]
3782    async fn shared_resource_put_detects_revision_conflict() {
3783        let path = tmp_resource_file("shared-resource-conflict");
3784        let state = test_state_with_path(path.clone());
3785
3786        let _ = state
3787            .put_shared_resource(
3788                "mission/demo/card-1".to_string(),
3789                serde_json::json!({"title":"Card 1"}),
3790                None,
3791                "agent-1".to_string(),
3792                None,
3793            )
3794            .await
3795            .expect("seed put");
3796
3797        let conflict = state
3798            .put_shared_resource(
3799                "mission/demo/card-1".to_string(),
3800                serde_json::json!({"title":"Card 1 edited"}),
3801                Some(99),
3802                "agent-2".to_string(),
3803                None,
3804            )
3805            .await
3806            .expect_err("expected conflict");
3807
3808        match conflict {
3809            ResourceStoreError::RevisionConflict(conflict) => {
3810                assert_eq!(conflict.expected_rev, Some(99));
3811                assert_eq!(conflict.current_rev, Some(1));
3812            }
3813            other => panic!("unexpected error: {other:?}"),
3814        }
3815
3816        let _ = tokio::fs::remove_file(path).await;
3817    }
3818
3819    #[tokio::test]
3820    async fn shared_resource_rejects_invalid_namespace_key() {
3821        let path = tmp_resource_file("shared-resource-invalid-key");
3822        let state = test_state_with_path(path.clone());
3823
3824        let error = state
3825            .put_shared_resource(
3826                "global/demo/key".to_string(),
3827                serde_json::json!({"x":1}),
3828                None,
3829                "agent-1".to_string(),
3830                None,
3831            )
3832            .await
3833            .expect_err("invalid key should fail");
3834
3835        match error {
3836            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
3837            other => panic!("unexpected error: {other:?}"),
3838        }
3839
3840        assert!(!path.exists());
3841    }
3842
3843    #[test]
3844    fn derive_status_index_update_for_run_started() {
3845        let event = EngineEvent::new(
3846            "session.run.started",
3847            serde_json::json!({
3848                "sessionID": "s-1",
3849                "runID": "r-1"
3850            }),
3851        );
3852        let update = derive_status_index_update(&event).expect("update");
3853        assert_eq!(update.key, "run/s-1/status");
3854        assert_eq!(
3855            update.value.get("state").and_then(|v| v.as_str()),
3856            Some("running")
3857        );
3858        assert_eq!(
3859            update.value.get("phase").and_then(|v| v.as_str()),
3860            Some("run")
3861        );
3862    }
3863
3864    #[test]
3865    fn derive_status_index_update_for_tool_invocation() {
3866        let event = EngineEvent::new(
3867            "message.part.updated",
3868            serde_json::json!({
3869                "sessionID": "s-2",
3870                "runID": "r-2",
3871                "part": { "type": "tool-invocation", "tool": "todo_write" }
3872            }),
3873        );
3874        let update = derive_status_index_update(&event).expect("update");
3875        assert_eq!(update.key, "run/s-2/status");
3876        assert_eq!(
3877            update.value.get("phase").and_then(|v| v.as_str()),
3878            Some("tool")
3879        );
3880        assert_eq!(
3881            update.value.get("toolActive").and_then(|v| v.as_bool()),
3882            Some(true)
3883        );
3884        assert_eq!(
3885            update.value.get("tool").and_then(|v| v.as_str()),
3886            Some("todo_write")
3887        );
3888    }
3889
3890    #[test]
3891    fn misfire_skip_drops_runs_and_advances_next_fire() {
3892        let (count, next_fire) =
3893            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
3894        assert_eq!(count, 0);
3895        assert_eq!(next_fire, 11_000);
3896    }
3897
3898    #[test]
3899    fn misfire_run_once_emits_single_trigger() {
3900        let (count, next_fire) =
3901            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
3902        assert_eq!(count, 1);
3903        assert_eq!(next_fire, 11_000);
3904    }
3905
3906    #[test]
3907    fn misfire_catch_up_caps_trigger_count() {
3908        let (count, next_fire) = compute_misfire_plan(
3909            25_000,
3910            5_000,
3911            1_000,
3912            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
3913        );
3914        assert_eq!(count, 3);
3915        assert_eq!(next_fire, 26_000);
3916    }
3917
3918    #[tokio::test]
3919    async fn routine_put_persists_and_loads() {
3920        let routines_path = tmp_routines_file("persist-load");
3921        let mut state = AppState::new_starting("routines-put".to_string(), true);
3922        state.routines_path = routines_path.clone();
3923
3924        let routine = RoutineSpec {
3925            routine_id: "routine-1".to_string(),
3926            name: "Digest".to_string(),
3927            status: RoutineStatus::Active,
3928            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
3929            timezone: "UTC".to_string(),
3930            misfire_policy: RoutineMisfirePolicy::RunOnce,
3931            entrypoint: "mission.default".to_string(),
3932            args: serde_json::json!({"topic":"status"}),
3933            allowed_tools: vec![],
3934            output_targets: vec![],
3935            creator_type: "user".to_string(),
3936            creator_id: "user-1".to_string(),
3937            requires_approval: true,
3938            external_integrations_allowed: false,
3939            next_fire_at_ms: Some(5_000),
3940            last_fired_at_ms: None,
3941        };
3942
3943        state.put_routine(routine).await.expect("store routine");
3944
3945        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
3946        reloaded.routines_path = routines_path.clone();
3947        reloaded.load_routines().await.expect("load routines");
3948        let list = reloaded.list_routines().await;
3949        assert_eq!(list.len(), 1);
3950        assert_eq!(list[0].routine_id, "routine-1");
3951
3952        let _ = tokio::fs::remove_file(routines_path).await;
3953    }
3954
3955    #[tokio::test]
3956    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
3957        let routines_path = tmp_routines_file("misfire-eval");
3958        let mut state = AppState::new_starting("routines-eval".to_string(), true);
3959        state.routines_path = routines_path.clone();
3960
3961        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
3962            routine_id: id.to_string(),
3963            name: id.to_string(),
3964            status: RoutineStatus::Active,
3965            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
3966            timezone: "UTC".to_string(),
3967            misfire_policy: policy,
3968            entrypoint: "mission.default".to_string(),
3969            args: serde_json::json!({}),
3970            allowed_tools: vec![],
3971            output_targets: vec![],
3972            creator_type: "user".to_string(),
3973            creator_id: "u-1".to_string(),
3974            requires_approval: false,
3975            external_integrations_allowed: false,
3976            next_fire_at_ms: Some(5_000),
3977            last_fired_at_ms: None,
3978        };
3979
3980        state
3981            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
3982            .await
3983            .expect("put skip");
3984        state
3985            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
3986            .await
3987            .expect("put once");
3988        state
3989            .put_routine(base(
3990                "routine-catch",
3991                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
3992            ))
3993            .await
3994            .expect("put catch");
3995
3996        let plans = state.evaluate_routine_misfires(10_500).await;
3997        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
3998        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
3999        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
4000
4001        assert!(plan_skip.is_none());
4002        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
4003        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
4004
4005        let stored = state.list_routines().await;
4006        let skip_next = stored
4007            .iter()
4008            .find(|r| r.routine_id == "routine-skip")
4009            .and_then(|r| r.next_fire_at_ms)
4010            .expect("skip next");
4011        assert!(skip_next > 10_500);
4012
4013        let _ = tokio::fs::remove_file(routines_path).await;
4014    }
4015
4016    #[test]
4017    fn routine_policy_blocks_external_side_effects_by_default() {
4018        let routine = RoutineSpec {
4019            routine_id: "routine-policy-1".to_string(),
4020            name: "Connector routine".to_string(),
4021            status: RoutineStatus::Active,
4022            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4023            timezone: "UTC".to_string(),
4024            misfire_policy: RoutineMisfirePolicy::RunOnce,
4025            entrypoint: "connector.email.reply".to_string(),
4026            args: serde_json::json!({}),
4027            allowed_tools: vec![],
4028            output_targets: vec![],
4029            creator_type: "user".to_string(),
4030            creator_id: "u-1".to_string(),
4031            requires_approval: true,
4032            external_integrations_allowed: false,
4033            next_fire_at_ms: None,
4034            last_fired_at_ms: None,
4035        };
4036
4037        let decision = evaluate_routine_execution_policy(&routine, "manual");
4038        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
4039    }
4040
4041    #[test]
4042    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
4043        let routine = RoutineSpec {
4044            routine_id: "routine-policy-2".to_string(),
4045            name: "Connector routine".to_string(),
4046            status: RoutineStatus::Active,
4047            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4048            timezone: "UTC".to_string(),
4049            misfire_policy: RoutineMisfirePolicy::RunOnce,
4050            entrypoint: "connector.email.reply".to_string(),
4051            args: serde_json::json!({}),
4052            allowed_tools: vec![],
4053            output_targets: vec![],
4054            creator_type: "user".to_string(),
4055            creator_id: "u-1".to_string(),
4056            requires_approval: true,
4057            external_integrations_allowed: true,
4058            next_fire_at_ms: None,
4059            last_fired_at_ms: None,
4060        };
4061
4062        let decision = evaluate_routine_execution_policy(&routine, "manual");
4063        assert!(matches!(
4064            decision,
4065            RoutineExecutionDecision::RequiresApproval { .. }
4066        ));
4067    }
4068
4069    #[test]
4070    fn routine_policy_allows_non_external_entrypoints() {
4071        let routine = RoutineSpec {
4072            routine_id: "routine-policy-3".to_string(),
4073            name: "Internal mission routine".to_string(),
4074            status: RoutineStatus::Active,
4075            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4076            timezone: "UTC".to_string(),
4077            misfire_policy: RoutineMisfirePolicy::RunOnce,
4078            entrypoint: "mission.default".to_string(),
4079            args: serde_json::json!({}),
4080            allowed_tools: vec![],
4081            output_targets: vec![],
4082            creator_type: "user".to_string(),
4083            creator_id: "u-1".to_string(),
4084            requires_approval: true,
4085            external_integrations_allowed: false,
4086            next_fire_at_ms: None,
4087            last_fired_at_ms: None,
4088        };
4089
4090        let decision = evaluate_routine_execution_policy(&routine, "manual");
4091        assert_eq!(decision, RoutineExecutionDecision::Allowed);
4092    }
4093
4094    #[tokio::test]
4095    async fn claim_next_queued_routine_run_marks_oldest_running() {
4096        let mut state = AppState::new_starting("routine-claim".to_string(), true);
4097        state.routine_runs_path = tmp_routines_file("routine-claim-runs");
4098
4099        let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
4100            run_id: run_id.to_string(),
4101            routine_id: "routine-claim".to_string(),
4102            trigger_type: "manual".to_string(),
4103            run_count: 1,
4104            status: RoutineRunStatus::Queued,
4105            created_at_ms,
4106            updated_at_ms: created_at_ms,
4107            fired_at_ms: Some(created_at_ms),
4108            started_at_ms: None,
4109            finished_at_ms: None,
4110            requires_approval: false,
4111            approval_reason: None,
4112            denial_reason: None,
4113            paused_reason: None,
4114            detail: None,
4115            entrypoint: "mission.default".to_string(),
4116            args: serde_json::json!({}),
4117            allowed_tools: vec![],
4118            output_targets: vec![],
4119            artifacts: vec![],
4120            active_session_ids: vec![],
4121            prompt_tokens: 0,
4122            completion_tokens: 0,
4123            total_tokens: 0,
4124            estimated_cost_usd: 0.0,
4125        };
4126
4127        {
4128            let mut guard = state.routine_runs.write().await;
4129            guard.insert("run-late".to_string(), mk("run-late", 2_000));
4130            guard.insert("run-early".to_string(), mk("run-early", 1_000));
4131        }
4132        state.persist_routine_runs().await.expect("persist");
4133
4134        let claimed = state
4135            .claim_next_queued_routine_run()
4136            .await
4137            .expect("claimed run");
4138        assert_eq!(claimed.run_id, "run-early");
4139        assert_eq!(claimed.status, RoutineRunStatus::Running);
4140        assert!(claimed.started_at_ms.is_some());
4141    }
4142
4143    #[tokio::test]
4144    async fn routine_session_policy_roundtrip_normalizes_tools() {
4145        let state = AppState::new_starting("routine-policy-hook".to_string(), true);
4146        state
4147            .set_routine_session_policy(
4148                "session-routine-1".to_string(),
4149                "run-1".to_string(),
4150                "routine-1".to_string(),
4151                vec![
4152                    "read".to_string(),
4153                    " mcp.arcade.search ".to_string(),
4154                    "read".to_string(),
4155                    "".to_string(),
4156                ],
4157            )
4158            .await;
4159
4160        let policy = state
4161            .routine_session_policy("session-routine-1")
4162            .await
4163            .expect("policy");
4164        assert_eq!(
4165            policy.allowed_tools,
4166            vec!["read".to_string(), "mcp.arcade.search".to_string()]
4167        );
4168    }
4169
4170    #[test]
4171    fn routine_mission_prompt_includes_orchestrated_contract() {
4172        let run = RoutineRunRecord {
4173            run_id: "run-orchestrated-1".to_string(),
4174            routine_id: "automation-orchestrated".to_string(),
4175            trigger_type: "manual".to_string(),
4176            run_count: 1,
4177            status: RoutineRunStatus::Queued,
4178            created_at_ms: 1_000,
4179            updated_at_ms: 1_000,
4180            fired_at_ms: Some(1_000),
4181            started_at_ms: None,
4182            finished_at_ms: None,
4183            requires_approval: true,
4184            approval_reason: None,
4185            denial_reason: None,
4186            paused_reason: None,
4187            detail: None,
4188            entrypoint: "mission.default".to_string(),
4189            args: serde_json::json!({
4190                "prompt": "Coordinate a multi-step release readiness check.",
4191                "mode": "orchestrated",
4192                "success_criteria": ["All blockers listed", "Output artifact written"],
4193                "orchestrator_only_tool_calls": true
4194            }),
4195            allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
4196            output_targets: vec!["file://reports/release-readiness.md".to_string()],
4197            artifacts: vec![],
4198            active_session_ids: vec![],
4199            prompt_tokens: 0,
4200            completion_tokens: 0,
4201            total_tokens: 0,
4202            estimated_cost_usd: 0.0,
4203        };
4204
4205        let objective = routine_objective_from_args(&run).expect("objective");
4206        let prompt = build_routine_mission_prompt(&run, &objective);
4207
4208        assert!(prompt.contains("Mode: orchestrated"));
4209        assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
4210        assert!(prompt.contains("only the orchestrator may execute tools"));
4211        assert!(prompt.contains("Allowed Tools: read, webfetch"));
4212        assert!(prompt.contains("file://reports/release-readiness.md"));
4213    }
4214
4215    #[test]
4216    fn routine_mission_prompt_includes_standalone_defaults() {
4217        let run = RoutineRunRecord {
4218            run_id: "run-standalone-1".to_string(),
4219            routine_id: "automation-standalone".to_string(),
4220            trigger_type: "manual".to_string(),
4221            run_count: 1,
4222            status: RoutineRunStatus::Queued,
4223            created_at_ms: 2_000,
4224            updated_at_ms: 2_000,
4225            fired_at_ms: Some(2_000),
4226            started_at_ms: None,
4227            finished_at_ms: None,
4228            requires_approval: false,
4229            approval_reason: None,
4230            denial_reason: None,
4231            paused_reason: None,
4232            detail: None,
4233            entrypoint: "mission.default".to_string(),
4234            args: serde_json::json!({
4235                "prompt": "Summarize top engineering updates.",
4236                "success_criteria": ["Three bullet summary"]
4237            }),
4238            allowed_tools: vec![],
4239            output_targets: vec![],
4240            artifacts: vec![],
4241            active_session_ids: vec![],
4242            prompt_tokens: 0,
4243            completion_tokens: 0,
4244            total_tokens: 0,
4245            estimated_cost_usd: 0.0,
4246        };
4247
4248        let objective = routine_objective_from_args(&run).expect("objective");
4249        let prompt = build_routine_mission_prompt(&run, &objective);
4250
4251        assert!(prompt.contains("Mode: standalone"));
4252        assert!(prompt.contains("Execution Pattern: Standalone mission run"));
4253        assert!(prompt.contains("Allowed Tools: all available by current policy"));
4254        assert!(prompt.contains("Output Targets: none configured"));
4255    }
4256
4257    #[test]
4258    fn shared_resource_key_validator_accepts_swarm_active_tasks() {
4259        assert!(is_valid_resource_key("swarm.active_tasks"));
4260        assert!(is_valid_resource_key("project/demo"));
4261        assert!(!is_valid_resource_key("swarm//active_tasks"));
4262        assert!(!is_valid_resource_key("misc/demo"));
4263    }
4264}