Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22    EngineEvent, HostOs, HostRuntimeContext, MessagePartInput, ModelSpec, PathStyle,
23    SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30    resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31    PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38
39mod agent_teams;
40mod capability_resolver;
41mod http;
42mod mcp_catalog;
43mod pack_manager;
44mod preset_composer;
45mod preset_registry;
46mod preset_summary;
47pub mod webui;
48
49pub use agent_teams::AgentTeamRuntime;
50pub use capability_resolver::CapabilityResolver;
51pub use http::serve;
52pub use pack_manager::PackManager;
53pub use preset_composer::PromptComposeInput;
54pub use preset_registry::PresetRegistry;
55
56#[derive(Debug, Clone, Serialize, Deserialize, Default)]
57pub struct ChannelStatus {
58    pub enabled: bool,
59    pub connected: bool,
60    pub last_error: Option<String>,
61    pub active_sessions: u64,
62    pub meta: Value,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, Default)]
66pub struct WebUiConfig {
67    #[serde(default)]
68    pub enabled: bool,
69    #[serde(default = "default_web_ui_prefix")]
70    pub path_prefix: String,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, Default)]
74pub struct ChannelsConfigFile {
75    pub telegram: Option<TelegramConfigFile>,
76    pub discord: Option<DiscordConfigFile>,
77    pub slack: Option<SlackConfigFile>,
78    #[serde(default)]
79    pub tool_policy: tandem_channels::config::ChannelToolPolicy,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct TelegramConfigFile {
84    pub bot_token: String,
85    #[serde(default = "default_allow_all")]
86    pub allowed_users: Vec<String>,
87    #[serde(default)]
88    pub mention_only: bool,
89    #[serde(default)]
90    pub style_profile: tandem_channels::config::TelegramStyleProfile,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct DiscordConfigFile {
95    pub bot_token: String,
96    #[serde(default)]
97    pub guild_id: Option<String>,
98    #[serde(default = "default_allow_all")]
99    pub allowed_users: Vec<String>,
100    #[serde(default = "default_discord_mention_only")]
101    pub mention_only: bool,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SlackConfigFile {
106    pub bot_token: String,
107    pub channel_id: String,
108    #[serde(default = "default_allow_all")]
109    pub allowed_users: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, Default)]
113struct EffectiveAppConfig {
114    #[serde(default)]
115    pub channels: ChannelsConfigFile,
116    #[serde(default)]
117    pub web_ui: WebUiConfig,
118    #[serde(default)]
119    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
120}
121
122#[derive(Default)]
123pub struct ChannelRuntime {
124    pub listeners: Option<tokio::task::JoinSet<()>>,
125    pub statuses: std::collections::HashMap<String, ChannelStatus>,
126}
127
128#[derive(Debug, Clone)]
129pub struct EngineLease {
130    pub lease_id: String,
131    pub client_id: String,
132    pub client_type: String,
133    pub acquired_at_ms: u64,
134    pub last_renewed_at_ms: u64,
135    pub ttl_ms: u64,
136}
137
138impl EngineLease {
139    pub fn is_expired(&self, now_ms: u64) -> bool {
140        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
141    }
142}
143
144#[derive(Debug, Clone, Serialize)]
145pub struct ActiveRun {
146    #[serde(rename = "runID")]
147    pub run_id: String,
148    #[serde(rename = "startedAtMs")]
149    pub started_at_ms: u64,
150    #[serde(rename = "lastActivityAtMs")]
151    pub last_activity_at_ms: u64,
152    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
153    pub client_id: Option<String>,
154    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
155    pub agent_id: Option<String>,
156    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
157    pub agent_profile: Option<String>,
158}
159
160#[derive(Clone, Default)]
161pub struct RunRegistry {
162    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
163}
164
165impl RunRegistry {
166    pub fn new() -> Self {
167        Self::default()
168    }
169
170    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
171        self.active.read().await.get(session_id).cloned()
172    }
173
174    pub async fn acquire(
175        &self,
176        session_id: &str,
177        run_id: String,
178        client_id: Option<String>,
179        agent_id: Option<String>,
180        agent_profile: Option<String>,
181    ) -> std::result::Result<ActiveRun, ActiveRun> {
182        let mut guard = self.active.write().await;
183        if let Some(existing) = guard.get(session_id).cloned() {
184            return Err(existing);
185        }
186        let now = now_ms();
187        let run = ActiveRun {
188            run_id,
189            started_at_ms: now,
190            last_activity_at_ms: now,
191            client_id,
192            agent_id,
193            agent_profile,
194        };
195        guard.insert(session_id.to_string(), run.clone());
196        Ok(run)
197    }
198
199    pub async fn touch(&self, session_id: &str, run_id: &str) {
200        let mut guard = self.active.write().await;
201        if let Some(run) = guard.get_mut(session_id) {
202            if run.run_id == run_id {
203                run.last_activity_at_ms = now_ms();
204            }
205        }
206    }
207
208    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
209        let mut guard = self.active.write().await;
210        if let Some(run) = guard.get(session_id) {
211            if run.run_id == run_id {
212                return guard.remove(session_id);
213            }
214        }
215        None
216    }
217
218    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
219        self.active.write().await.remove(session_id)
220    }
221
222    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
223        let now = now_ms();
224        let mut guard = self.active.write().await;
225        let stale_ids = guard
226            .iter()
227            .filter_map(|(session_id, run)| {
228                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
229                    Some(session_id.clone())
230                } else {
231                    None
232                }
233            })
234            .collect::<Vec<_>>();
235        let mut out = Vec::with_capacity(stale_ids.len());
236        for session_id in stale_ids {
237            if let Some(run) = guard.remove(&session_id) {
238                out.push((session_id, run));
239            }
240        }
241        out
242    }
243}
244
245pub fn now_ms() -> u64 {
246    SystemTime::now()
247        .duration_since(UNIX_EPOCH)
248        .map(|d| d.as_millis() as u64)
249        .unwrap_or(0)
250}
251
252pub fn build_id() -> String {
253    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
254        let trimmed = explicit.trim();
255        if !trimmed.is_empty() {
256            return trimmed.to_string();
257        }
258    }
259    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
260        let trimmed = git_sha.trim();
261        if !trimmed.is_empty() {
262            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
263        }
264    }
265    env!("CARGO_PKG_VERSION").to_string()
266}
267
268pub fn detect_host_runtime_context() -> HostRuntimeContext {
269    let os = if cfg!(target_os = "windows") {
270        HostOs::Windows
271    } else if cfg!(target_os = "macos") {
272        HostOs::Macos
273    } else {
274        HostOs::Linux
275    };
276    let (shell_family, path_style) = match os {
277        HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
278        HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
279    };
280    HostRuntimeContext {
281        os,
282        arch: std::env::consts::ARCH.to_string(),
283        shell_family,
284        path_style,
285    }
286}
287
288pub fn binary_path_for_health() -> Option<String> {
289    #[cfg(debug_assertions)]
290    {
291        std::env::current_exe()
292            .ok()
293            .map(|p| p.to_string_lossy().to_string())
294    }
295    #[cfg(not(debug_assertions))]
296    {
297        None
298    }
299}
300
301#[derive(Clone)]
302pub struct RuntimeState {
303    pub storage: Arc<Storage>,
304    pub config: ConfigStore,
305    pub event_bus: EventBus,
306    pub providers: ProviderRegistry,
307    pub plugins: PluginRegistry,
308    pub agents: AgentRegistry,
309    pub tools: ToolRegistry,
310    pub permissions: PermissionManager,
311    pub mcp: McpRegistry,
312    pub pty: PtyManager,
313    pub lsp: LspManager,
314    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
315    pub logs: Arc<RwLock<Vec<Value>>>,
316    pub workspace_index: WorkspaceIndex,
317    pub cancellations: CancellationRegistry,
318    pub engine_loop: EngineLoop,
319    pub host_runtime_context: HostRuntimeContext,
320}
321
322#[derive(Debug, Clone)]
323pub struct GovernedMemoryRecord {
324    pub id: String,
325    pub run_id: String,
326    pub partition: MemoryPartition,
327    pub kind: MemoryContentKind,
328    pub content: String,
329    pub artifact_refs: Vec<String>,
330    pub classification: MemoryClassification,
331    pub metadata: Option<Value>,
332    pub source_memory_id: Option<String>,
333    pub created_at_ms: u64,
334}
335
336#[derive(Debug, Clone, Serialize)]
337pub struct MemoryAuditEvent {
338    pub audit_id: String,
339    pub action: String,
340    pub run_id: String,
341    pub memory_id: Option<String>,
342    pub source_memory_id: Option<String>,
343    pub to_tier: Option<GovernedMemoryTier>,
344    pub partition_key: String,
345    pub actor: String,
346    pub status: String,
347    #[serde(skip_serializing_if = "Option::is_none")]
348    pub detail: Option<String>,
349    pub created_at_ms: u64,
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize)]
353pub struct SharedResourceRecord {
354    pub key: String,
355    pub value: Value,
356    pub rev: u64,
357    pub updated_at_ms: u64,
358    pub updated_by: String,
359    #[serde(skip_serializing_if = "Option::is_none")]
360    pub ttl_ms: Option<u64>,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
364#[serde(rename_all = "snake_case")]
365pub enum RoutineSchedule {
366    IntervalSeconds { seconds: u64 },
367    Cron { expression: String },
368}
369
370#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
371#[serde(rename_all = "snake_case", tag = "type")]
372pub enum RoutineMisfirePolicy {
373    Skip,
374    RunOnce,
375    CatchUp { max_runs: u32 },
376}
377
378#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
379#[serde(rename_all = "snake_case")]
380pub enum RoutineStatus {
381    Active,
382    Paused,
383}
384
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct RoutineSpec {
387    pub routine_id: String,
388    pub name: String,
389    pub status: RoutineStatus,
390    pub schedule: RoutineSchedule,
391    pub timezone: String,
392    pub misfire_policy: RoutineMisfirePolicy,
393    pub entrypoint: String,
394    #[serde(default)]
395    pub args: Value,
396    #[serde(default)]
397    pub allowed_tools: Vec<String>,
398    #[serde(default)]
399    pub output_targets: Vec<String>,
400    pub creator_type: String,
401    pub creator_id: String,
402    pub requires_approval: bool,
403    pub external_integrations_allowed: bool,
404    #[serde(default, skip_serializing_if = "Option::is_none")]
405    pub next_fire_at_ms: Option<u64>,
406    #[serde(default, skip_serializing_if = "Option::is_none")]
407    pub last_fired_at_ms: Option<u64>,
408}
409
410#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct RoutineHistoryEvent {
412    pub routine_id: String,
413    pub trigger_type: String,
414    pub run_count: u32,
415    pub fired_at_ms: u64,
416    pub status: String,
417    #[serde(default, skip_serializing_if = "Option::is_none")]
418    pub detail: Option<String>,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
422#[serde(rename_all = "snake_case")]
423pub enum RoutineRunStatus {
424    Queued,
425    PendingApproval,
426    Running,
427    Paused,
428    BlockedPolicy,
429    Denied,
430    Completed,
431    Failed,
432    Cancelled,
433}
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct RoutineRunArtifact {
437    pub artifact_id: String,
438    pub uri: String,
439    pub kind: String,
440    #[serde(default, skip_serializing_if = "Option::is_none")]
441    pub label: Option<String>,
442    pub created_at_ms: u64,
443    #[serde(default, skip_serializing_if = "Option::is_none")]
444    pub metadata: Option<Value>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct RoutineRunRecord {
449    pub run_id: String,
450    pub routine_id: String,
451    pub trigger_type: String,
452    pub run_count: u32,
453    pub status: RoutineRunStatus,
454    pub created_at_ms: u64,
455    pub updated_at_ms: u64,
456    #[serde(default, skip_serializing_if = "Option::is_none")]
457    pub fired_at_ms: Option<u64>,
458    #[serde(default, skip_serializing_if = "Option::is_none")]
459    pub started_at_ms: Option<u64>,
460    #[serde(default, skip_serializing_if = "Option::is_none")]
461    pub finished_at_ms: Option<u64>,
462    pub requires_approval: bool,
463    #[serde(default, skip_serializing_if = "Option::is_none")]
464    pub approval_reason: Option<String>,
465    #[serde(default, skip_serializing_if = "Option::is_none")]
466    pub denial_reason: Option<String>,
467    #[serde(default, skip_serializing_if = "Option::is_none")]
468    pub paused_reason: Option<String>,
469    #[serde(default, skip_serializing_if = "Option::is_none")]
470    pub detail: Option<String>,
471    pub entrypoint: String,
472    #[serde(default)]
473    pub args: Value,
474    #[serde(default)]
475    pub allowed_tools: Vec<String>,
476    #[serde(default)]
477    pub output_targets: Vec<String>,
478    #[serde(default)]
479    pub artifacts: Vec<RoutineRunArtifact>,
480    #[serde(default)]
481    pub active_session_ids: Vec<String>,
482    #[serde(default)]
483    pub prompt_tokens: u64,
484    #[serde(default)]
485    pub completion_tokens: u64,
486    #[serde(default)]
487    pub total_tokens: u64,
488    #[serde(default)]
489    pub estimated_cost_usd: f64,
490}
491
492#[derive(Debug, Clone)]
493pub struct RoutineSessionPolicy {
494    pub session_id: String,
495    pub run_id: String,
496    pub routine_id: String,
497    pub allowed_tools: Vec<String>,
498}
499
500#[derive(Debug, Clone, Serialize)]
501pub struct RoutineTriggerPlan {
502    pub routine_id: String,
503    pub run_count: u32,
504    pub scheduled_at_ms: u64,
505    pub next_fire_at_ms: u64,
506}
507
508#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
509#[serde(rename_all = "snake_case")]
510pub enum AutomationV2Status {
511    Active,
512    Paused,
513    Draft,
514}
515
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
517#[serde(rename_all = "snake_case")]
518pub enum AutomationV2ScheduleType {
519    Cron,
520    Interval,
521    Manual,
522}
523
524#[derive(Debug, Clone, Serialize, Deserialize)]
525pub struct AutomationV2Schedule {
526    #[serde(rename = "type")]
527    pub schedule_type: AutomationV2ScheduleType,
528    #[serde(default, skip_serializing_if = "Option::is_none")]
529    pub cron_expression: Option<String>,
530    #[serde(default, skip_serializing_if = "Option::is_none")]
531    pub interval_seconds: Option<u64>,
532    pub timezone: String,
533    pub misfire_policy: RoutineMisfirePolicy,
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize)]
537pub struct AutomationAgentToolPolicy {
538    #[serde(default)]
539    pub allowlist: Vec<String>,
540    #[serde(default)]
541    pub denylist: Vec<String>,
542}
543
544#[derive(Debug, Clone, Serialize, Deserialize)]
545pub struct AutomationAgentMcpPolicy {
546    #[serde(default)]
547    pub allowed_servers: Vec<String>,
548    #[serde(default, skip_serializing_if = "Option::is_none")]
549    pub allowed_tools: Option<Vec<String>>,
550}
551
552#[derive(Debug, Clone, Serialize, Deserialize)]
553pub struct AutomationAgentProfile {
554    pub agent_id: String,
555    #[serde(default, skip_serializing_if = "Option::is_none")]
556    pub template_id: Option<String>,
557    pub display_name: String,
558    #[serde(default, skip_serializing_if = "Option::is_none")]
559    pub avatar_url: Option<String>,
560    #[serde(default, skip_serializing_if = "Option::is_none")]
561    pub model_policy: Option<Value>,
562    #[serde(default)]
563    pub skills: Vec<String>,
564    pub tool_policy: AutomationAgentToolPolicy,
565    pub mcp_policy: AutomationAgentMcpPolicy,
566    #[serde(default, skip_serializing_if = "Option::is_none")]
567    pub approval_policy: Option<String>,
568}
569
570#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct AutomationFlowNode {
572    pub node_id: String,
573    pub agent_id: String,
574    pub objective: String,
575    #[serde(default)]
576    pub depends_on: Vec<String>,
577    #[serde(default, skip_serializing_if = "Option::is_none")]
578    pub retry_policy: Option<Value>,
579    #[serde(default, skip_serializing_if = "Option::is_none")]
580    pub timeout_ms: Option<u64>,
581}
582
583#[derive(Debug, Clone, Serialize, Deserialize)]
584pub struct AutomationFlowSpec {
585    #[serde(default)]
586    pub nodes: Vec<AutomationFlowNode>,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct AutomationExecutionPolicy {
591    #[serde(default, skip_serializing_if = "Option::is_none")]
592    pub max_parallel_agents: Option<u32>,
593    #[serde(default, skip_serializing_if = "Option::is_none")]
594    pub max_total_runtime_ms: Option<u64>,
595    #[serde(default, skip_serializing_if = "Option::is_none")]
596    pub max_total_tool_calls: Option<u32>,
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
600pub struct AutomationV2Spec {
601    pub automation_id: String,
602    pub name: String,
603    #[serde(default, skip_serializing_if = "Option::is_none")]
604    pub description: Option<String>,
605    pub status: AutomationV2Status,
606    pub schedule: AutomationV2Schedule,
607    #[serde(default)]
608    pub agents: Vec<AutomationAgentProfile>,
609    pub flow: AutomationFlowSpec,
610    pub execution: AutomationExecutionPolicy,
611    #[serde(default)]
612    pub output_targets: Vec<String>,
613    pub created_at_ms: u64,
614    pub updated_at_ms: u64,
615    pub creator_id: String,
616    #[serde(default, skip_serializing_if = "Option::is_none")]
617    pub next_fire_at_ms: Option<u64>,
618    #[serde(default, skip_serializing_if = "Option::is_none")]
619    pub last_fired_at_ms: Option<u64>,
620}
621
622#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
623#[serde(rename_all = "snake_case")]
624pub enum AutomationRunStatus {
625    Queued,
626    Running,
627    Pausing,
628    Paused,
629    Completed,
630    Failed,
631    Cancelled,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
635pub struct AutomationRunCheckpoint {
636    #[serde(default)]
637    pub completed_nodes: Vec<String>,
638    #[serde(default)]
639    pub pending_nodes: Vec<String>,
640    #[serde(default)]
641    pub node_outputs: std::collections::HashMap<String, Value>,
642}
643
644#[derive(Debug, Clone, Serialize, Deserialize)]
645pub struct AutomationV2RunRecord {
646    pub run_id: String,
647    pub automation_id: String,
648    pub trigger_type: String,
649    pub status: AutomationRunStatus,
650    pub created_at_ms: u64,
651    pub updated_at_ms: u64,
652    #[serde(default, skip_serializing_if = "Option::is_none")]
653    pub started_at_ms: Option<u64>,
654    #[serde(default, skip_serializing_if = "Option::is_none")]
655    pub finished_at_ms: Option<u64>,
656    #[serde(default)]
657    pub active_session_ids: Vec<String>,
658    #[serde(default)]
659    pub active_instance_ids: Vec<String>,
660    pub checkpoint: AutomationRunCheckpoint,
661    #[serde(default, skip_serializing_if = "Option::is_none")]
662    pub pause_reason: Option<String>,
663    #[serde(default, skip_serializing_if = "Option::is_none")]
664    pub resume_reason: Option<String>,
665    #[serde(default, skip_serializing_if = "Option::is_none")]
666    pub detail: Option<String>,
667    #[serde(default)]
668    pub prompt_tokens: u64,
669    #[serde(default)]
670    pub completion_tokens: u64,
671    #[serde(default)]
672    pub total_tokens: u64,
673    #[serde(default)]
674    pub estimated_cost_usd: f64,
675}
676
677#[derive(Debug, Clone, Serialize)]
678pub struct ResourceConflict {
679    pub key: String,
680    pub expected_rev: Option<u64>,
681    pub current_rev: Option<u64>,
682}
683
684#[derive(Debug, Clone, Serialize)]
685#[serde(tag = "type", rename_all = "snake_case")]
686pub enum ResourceStoreError {
687    InvalidKey { key: String },
688    RevisionConflict(ResourceConflict),
689    PersistFailed { message: String },
690}
691
692#[derive(Debug, Clone, Serialize)]
693#[serde(tag = "type", rename_all = "snake_case")]
694pub enum RoutineStoreError {
695    InvalidRoutineId { routine_id: String },
696    InvalidSchedule { detail: String },
697    PersistFailed { message: String },
698}
699
700#[derive(Debug, Clone)]
701pub enum StartupStatus {
702    Starting,
703    Ready,
704    Failed,
705}
706
707#[derive(Debug, Clone)]
708pub struct StartupState {
709    pub status: StartupStatus,
710    pub phase: String,
711    pub started_at_ms: u64,
712    pub attempt_id: String,
713    pub last_error: Option<String>,
714}
715
716#[derive(Debug, Clone)]
717pub struct StartupSnapshot {
718    pub status: StartupStatus,
719    pub phase: String,
720    pub started_at_ms: u64,
721    pub attempt_id: String,
722    pub last_error: Option<String>,
723    pub elapsed_ms: u64,
724}
725
726#[derive(Clone)]
727pub struct AppState {
728    pub runtime: Arc<OnceLock<RuntimeState>>,
729    pub startup: Arc<RwLock<StartupState>>,
730    pub in_process_mode: Arc<AtomicBool>,
731    pub api_token: Arc<RwLock<Option<String>>>,
732    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
733    pub run_registry: RunRegistry,
734    pub run_stale_ms: u64,
735    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
736    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
737    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
738    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
739    pub shared_resources_path: PathBuf,
740    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
741    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
742    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
743    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
744    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
745    pub routine_session_policies:
746        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
747    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
748    pub token_cost_per_1k_usd: f64,
749    pub routines_path: PathBuf,
750    pub routine_history_path: PathBuf,
751    pub routine_runs_path: PathBuf,
752    pub automations_v2_path: PathBuf,
753    pub automation_v2_runs_path: PathBuf,
754    pub agent_teams: AgentTeamRuntime,
755    pub web_ui_enabled: Arc<AtomicBool>,
756    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
757    pub server_base_url: Arc<std::sync::RwLock<String>>,
758    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
759    pub host_runtime_context: HostRuntimeContext,
760    pub pack_manager: Arc<PackManager>,
761    pub capability_resolver: Arc<CapabilityResolver>,
762    pub preset_registry: Arc<PresetRegistry>,
763}
764
765#[derive(Debug, Clone)]
766struct StatusIndexUpdate {
767    key: String,
768    value: Value,
769}
770
771impl AppState {
772    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
773        Self {
774            runtime: Arc::new(OnceLock::new()),
775            startup: Arc::new(RwLock::new(StartupState {
776                status: StartupStatus::Starting,
777                phase: "boot".to_string(),
778                started_at_ms: now_ms(),
779                attempt_id,
780                last_error: None,
781            })),
782            in_process_mode: Arc::new(AtomicBool::new(in_process)),
783            api_token: Arc::new(RwLock::new(None)),
784            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
785            run_registry: RunRegistry::new(),
786            run_stale_ms: resolve_run_stale_ms(),
787            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
788            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
789            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
790            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
791            shared_resources_path: resolve_shared_resources_path(),
792            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
793            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
794            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
795            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
796            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
797            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
798            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
799            routines_path: resolve_routines_path(),
800            routine_history_path: resolve_routine_history_path(),
801            routine_runs_path: resolve_routine_runs_path(),
802            automations_v2_path: resolve_automations_v2_path(),
803            automation_v2_runs_path: resolve_automation_v2_runs_path(),
804            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
805            web_ui_enabled: Arc::new(AtomicBool::new(false)),
806            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
807            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
808            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
809            host_runtime_context: detect_host_runtime_context(),
810            token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
811            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
812            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
813            preset_registry: Arc::new(PresetRegistry::new(
814                PackManager::default_root(),
815                resolve_shared_paths()
816                    .map(|paths| paths.canonical_root)
817                    .unwrap_or_else(|_| {
818                        dirs::home_dir()
819                            .unwrap_or_else(|| PathBuf::from("."))
820                            .join(".tandem")
821                    }),
822            )),
823        }
824    }
825
826    pub fn is_ready(&self) -> bool {
827        self.runtime.get().is_some()
828    }
829
830    pub fn mode_label(&self) -> &'static str {
831        if self.in_process_mode.load(Ordering::Relaxed) {
832            "in-process"
833        } else {
834            "sidecar"
835        }
836    }
837
838    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
839        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
840        if let Ok(mut guard) = self.web_ui_prefix.write() {
841            *guard = normalize_web_ui_prefix(&prefix);
842        }
843    }
844
845    pub fn web_ui_enabled(&self) -> bool {
846        self.web_ui_enabled.load(Ordering::Relaxed)
847    }
848
849    pub fn web_ui_prefix(&self) -> String {
850        self.web_ui_prefix
851            .read()
852            .map(|v| v.clone())
853            .unwrap_or_else(|_| "/admin".to_string())
854    }
855
856    pub fn set_server_base_url(&self, base_url: String) {
857        if let Ok(mut guard) = self.server_base_url.write() {
858            *guard = base_url;
859        }
860    }
861
862    pub fn server_base_url(&self) -> String {
863        self.server_base_url
864            .read()
865            .map(|v| v.clone())
866            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
867    }
868
869    pub async fn api_token(&self) -> Option<String> {
870        self.api_token.read().await.clone()
871    }
872
873    pub async fn set_api_token(&self, token: Option<String>) {
874        *self.api_token.write().await = token;
875    }
876
877    pub async fn startup_snapshot(&self) -> StartupSnapshot {
878        let state = self.startup.read().await.clone();
879        StartupSnapshot {
880            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
881            status: state.status,
882            phase: state.phase,
883            started_at_ms: state.started_at_ms,
884            attempt_id: state.attempt_id,
885            last_error: state.last_error,
886        }
887    }
888
889    pub fn host_runtime_context(&self) -> HostRuntimeContext {
890        self.runtime
891            .get()
892            .map(|runtime| runtime.host_runtime_context.clone())
893            .unwrap_or_else(|| self.host_runtime_context.clone())
894    }
895
896    pub async fn set_phase(&self, phase: impl Into<String>) {
897        let mut startup = self.startup.write().await;
898        startup.phase = phase.into();
899    }
900
901    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
902        self.runtime
903            .set(runtime)
904            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
905        self.engine_loop
906            .set_spawn_agent_hook(std::sync::Arc::new(
907                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
908            ))
909            .await;
910        self.engine_loop
911            .set_tool_policy_hook(std::sync::Arc::new(
912                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
913            ))
914            .await;
915        self.engine_loop
916            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
917                self.clone(),
918            )))
919            .await;
920        let _ = self.load_shared_resources().await;
921        let _ = self.load_routines().await;
922        let _ = self.load_routine_history().await;
923        let _ = self.load_routine_runs().await;
924        let _ = self.load_automations_v2().await;
925        let _ = self.load_automation_v2_runs().await;
926        let workspace_root = self.workspace_index.snapshot().await.root;
927        let _ = self
928            .agent_teams
929            .ensure_loaded_for_workspace(&workspace_root)
930            .await;
931        let mut startup = self.startup.write().await;
932        startup.status = StartupStatus::Ready;
933        startup.phase = "ready".to_string();
934        startup.last_error = None;
935        Ok(())
936    }
937
938    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
939        let mut startup = self.startup.write().await;
940        startup.status = StartupStatus::Failed;
941        startup.phase = phase.into();
942        startup.last_error = Some(error.into());
943    }
944
945    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
946        let runtime = self.channels_runtime.lock().await;
947        runtime.statuses.clone()
948    }
949
950    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
951        let effective = self.config.get_effective_value().await;
952        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
953        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
954
955        let mut runtime = self.channels_runtime.lock().await;
956        if let Some(listeners) = runtime.listeners.as_mut() {
957            listeners.abort_all();
958        }
959        runtime.listeners = None;
960        runtime.statuses.clear();
961
962        let mut status_map = std::collections::HashMap::new();
963        status_map.insert(
964            "telegram".to_string(),
965            ChannelStatus {
966                enabled: parsed.channels.telegram.is_some(),
967                connected: false,
968                last_error: None,
969                active_sessions: 0,
970                meta: serde_json::json!({}),
971            },
972        );
973        status_map.insert(
974            "discord".to_string(),
975            ChannelStatus {
976                enabled: parsed.channels.discord.is_some(),
977                connected: false,
978                last_error: None,
979                active_sessions: 0,
980                meta: serde_json::json!({}),
981            },
982        );
983        status_map.insert(
984            "slack".to_string(),
985            ChannelStatus {
986                enabled: parsed.channels.slack.is_some(),
987                connected: false,
988                last_error: None,
989                active_sessions: 0,
990                meta: serde_json::json!({}),
991            },
992        );
993
994        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
995            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
996            runtime.listeners = Some(listeners);
997            for status in status_map.values_mut() {
998                if status.enabled {
999                    status.connected = true;
1000                }
1001            }
1002        }
1003
1004        runtime.statuses = status_map.clone();
1005        drop(runtime);
1006
1007        self.event_bus.publish(EngineEvent::new(
1008            "channel.status.changed",
1009            serde_json::json!({ "channels": status_map }),
1010        ));
1011        Ok(())
1012    }
1013
1014    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
1015        if !self.shared_resources_path.exists() {
1016            return Ok(());
1017        }
1018        let raw = fs::read_to_string(&self.shared_resources_path).await?;
1019        let parsed =
1020            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
1021                .unwrap_or_default();
1022        let mut guard = self.shared_resources.write().await;
1023        *guard = parsed;
1024        Ok(())
1025    }
1026
1027    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1028        if let Some(parent) = self.shared_resources_path.parent() {
1029            fs::create_dir_all(parent).await?;
1030        }
1031        let payload = {
1032            let guard = self.shared_resources.read().await;
1033            serde_json::to_string_pretty(&*guard)?
1034        };
1035        fs::write(&self.shared_resources_path, payload).await?;
1036        Ok(())
1037    }
1038
1039    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1040        self.shared_resources.read().await.get(key).cloned()
1041    }
1042
1043    pub async fn list_shared_resources(
1044        &self,
1045        prefix: Option<&str>,
1046        limit: usize,
1047    ) -> Vec<SharedResourceRecord> {
1048        let limit = limit.clamp(1, 500);
1049        let mut rows = self
1050            .shared_resources
1051            .read()
1052            .await
1053            .values()
1054            .filter(|record| {
1055                if let Some(prefix) = prefix {
1056                    record.key.starts_with(prefix)
1057                } else {
1058                    true
1059                }
1060            })
1061            .cloned()
1062            .collect::<Vec<_>>();
1063        rows.sort_by(|a, b| a.key.cmp(&b.key));
1064        rows.truncate(limit);
1065        rows
1066    }
1067
1068    pub async fn put_shared_resource(
1069        &self,
1070        key: String,
1071        value: Value,
1072        if_match_rev: Option<u64>,
1073        updated_by: String,
1074        ttl_ms: Option<u64>,
1075    ) -> Result<SharedResourceRecord, ResourceStoreError> {
1076        if !is_valid_resource_key(&key) {
1077            return Err(ResourceStoreError::InvalidKey { key });
1078        }
1079
1080        let now = now_ms();
1081        let mut guard = self.shared_resources.write().await;
1082        let existing = guard.get(&key).cloned();
1083
1084        if let Some(expected) = if_match_rev {
1085            let current = existing.as_ref().map(|row| row.rev);
1086            if current != Some(expected) {
1087                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1088                    key,
1089                    expected_rev: Some(expected),
1090                    current_rev: current,
1091                }));
1092            }
1093        }
1094
1095        let next_rev = existing
1096            .as_ref()
1097            .map(|row| row.rev.saturating_add(1))
1098            .unwrap_or(1);
1099
1100        let record = SharedResourceRecord {
1101            key: key.clone(),
1102            value,
1103            rev: next_rev,
1104            updated_at_ms: now,
1105            updated_by,
1106            ttl_ms,
1107        };
1108
1109        let previous = guard.insert(key.clone(), record.clone());
1110        drop(guard);
1111
1112        if let Err(error) = self.persist_shared_resources().await {
1113            let mut rollback = self.shared_resources.write().await;
1114            if let Some(previous) = previous {
1115                rollback.insert(key, previous);
1116            } else {
1117                rollback.remove(&key);
1118            }
1119            return Err(ResourceStoreError::PersistFailed {
1120                message: error.to_string(),
1121            });
1122        }
1123
1124        Ok(record)
1125    }
1126
1127    pub async fn delete_shared_resource(
1128        &self,
1129        key: &str,
1130        if_match_rev: Option<u64>,
1131    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1132        if !is_valid_resource_key(key) {
1133            return Err(ResourceStoreError::InvalidKey {
1134                key: key.to_string(),
1135            });
1136        }
1137
1138        let mut guard = self.shared_resources.write().await;
1139        let current = guard.get(key).cloned();
1140        if let Some(expected) = if_match_rev {
1141            let current_rev = current.as_ref().map(|row| row.rev);
1142            if current_rev != Some(expected) {
1143                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1144                    key: key.to_string(),
1145                    expected_rev: Some(expected),
1146                    current_rev,
1147                }));
1148            }
1149        }
1150
1151        let removed = guard.remove(key);
1152        drop(guard);
1153
1154        if let Err(error) = self.persist_shared_resources().await {
1155            if let Some(record) = removed.clone() {
1156                self.shared_resources
1157                    .write()
1158                    .await
1159                    .insert(record.key.clone(), record);
1160            }
1161            return Err(ResourceStoreError::PersistFailed {
1162                message: error.to_string(),
1163            });
1164        }
1165
1166        Ok(removed)
1167    }
1168
1169    pub async fn load_routines(&self) -> anyhow::Result<()> {
1170        if !self.routines_path.exists() {
1171            return Ok(());
1172        }
1173        let raw = fs::read_to_string(&self.routines_path).await?;
1174        let parsed = serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw)
1175            .unwrap_or_default();
1176        let mut guard = self.routines.write().await;
1177        *guard = parsed;
1178        Ok(())
1179    }
1180
1181    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1182        if !self.routine_history_path.exists() {
1183            return Ok(());
1184        }
1185        let raw = fs::read_to_string(&self.routine_history_path).await?;
1186        let parsed = serde_json::from_str::<
1187            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1188        >(&raw)
1189        .unwrap_or_default();
1190        let mut guard = self.routine_history.write().await;
1191        *guard = parsed;
1192        Ok(())
1193    }
1194
1195    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1196        if !self.routine_runs_path.exists() {
1197            return Ok(());
1198        }
1199        let raw = fs::read_to_string(&self.routine_runs_path).await?;
1200        let parsed =
1201            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1202                .unwrap_or_default();
1203        let mut guard = self.routine_runs.write().await;
1204        *guard = parsed;
1205        Ok(())
1206    }
1207
1208    pub async fn persist_routines(&self) -> anyhow::Result<()> {
1209        if let Some(parent) = self.routines_path.parent() {
1210            fs::create_dir_all(parent).await?;
1211        }
1212        let payload = {
1213            let guard = self.routines.read().await;
1214            serde_json::to_string_pretty(&*guard)?
1215        };
1216        fs::write(&self.routines_path, payload).await?;
1217        Ok(())
1218    }
1219
1220    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1221        if let Some(parent) = self.routine_history_path.parent() {
1222            fs::create_dir_all(parent).await?;
1223        }
1224        let payload = {
1225            let guard = self.routine_history.read().await;
1226            serde_json::to_string_pretty(&*guard)?
1227        };
1228        fs::write(&self.routine_history_path, payload).await?;
1229        Ok(())
1230    }
1231
1232    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1233        if let Some(parent) = self.routine_runs_path.parent() {
1234            fs::create_dir_all(parent).await?;
1235        }
1236        let payload = {
1237            let guard = self.routine_runs.read().await;
1238            serde_json::to_string_pretty(&*guard)?
1239        };
1240        fs::write(&self.routine_runs_path, payload).await?;
1241        Ok(())
1242    }
1243
1244    pub async fn put_routine(
1245        &self,
1246        mut routine: RoutineSpec,
1247    ) -> Result<RoutineSpec, RoutineStoreError> {
1248        if routine.routine_id.trim().is_empty() {
1249            return Err(RoutineStoreError::InvalidRoutineId {
1250                routine_id: routine.routine_id,
1251            });
1252        }
1253
1254        routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1255        routine.output_targets = normalize_non_empty_list(routine.output_targets);
1256
1257        let now = now_ms();
1258        let next_schedule_fire =
1259            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1260                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1261                    detail: "invalid schedule or timezone".to_string(),
1262                })?;
1263        match routine.schedule {
1264            RoutineSchedule::IntervalSeconds { seconds } => {
1265                if seconds == 0 {
1266                    return Err(RoutineStoreError::InvalidSchedule {
1267                        detail: "interval_seconds must be > 0".to_string(),
1268                    });
1269                }
1270                let _ = seconds;
1271            }
1272            RoutineSchedule::Cron { .. } => {}
1273        }
1274        if routine.next_fire_at_ms.is_none() {
1275            routine.next_fire_at_ms = Some(next_schedule_fire);
1276        }
1277
1278        let mut guard = self.routines.write().await;
1279        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1280        drop(guard);
1281
1282        if let Err(error) = self.persist_routines().await {
1283            let mut rollback = self.routines.write().await;
1284            if let Some(previous) = previous {
1285                rollback.insert(previous.routine_id.clone(), previous);
1286            } else {
1287                rollback.remove(&routine.routine_id);
1288            }
1289            return Err(RoutineStoreError::PersistFailed {
1290                message: error.to_string(),
1291            });
1292        }
1293
1294        Ok(routine)
1295    }
1296
1297    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1298        let mut rows = self
1299            .routines
1300            .read()
1301            .await
1302            .values()
1303            .cloned()
1304            .collect::<Vec<_>>();
1305        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1306        rows
1307    }
1308
1309    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1310        self.routines.read().await.get(routine_id).cloned()
1311    }
1312
1313    pub async fn delete_routine(
1314        &self,
1315        routine_id: &str,
1316    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1317        let mut guard = self.routines.write().await;
1318        let removed = guard.remove(routine_id);
1319        drop(guard);
1320
1321        if let Err(error) = self.persist_routines().await {
1322            if let Some(removed) = removed.clone() {
1323                self.routines
1324                    .write()
1325                    .await
1326                    .insert(removed.routine_id.clone(), removed);
1327            }
1328            return Err(RoutineStoreError::PersistFailed {
1329                message: error.to_string(),
1330            });
1331        }
1332        Ok(removed)
1333    }
1334
1335    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1336        let mut plans = Vec::new();
1337        let mut guard = self.routines.write().await;
1338        for routine in guard.values_mut() {
1339            if routine.status != RoutineStatus::Active {
1340                continue;
1341            }
1342            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1343                continue;
1344            };
1345            if now_ms < next_fire_at_ms {
1346                continue;
1347            }
1348            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1349                now_ms,
1350                next_fire_at_ms,
1351                &routine.schedule,
1352                &routine.timezone,
1353                &routine.misfire_policy,
1354            );
1355            routine.next_fire_at_ms = Some(next_fire_at_ms);
1356            if run_count == 0 {
1357                continue;
1358            }
1359            plans.push(RoutineTriggerPlan {
1360                routine_id: routine.routine_id.clone(),
1361                run_count,
1362                scheduled_at_ms: now_ms,
1363                next_fire_at_ms,
1364            });
1365        }
1366        drop(guard);
1367        let _ = self.persist_routines().await;
1368        plans
1369    }
1370
1371    pub async fn mark_routine_fired(
1372        &self,
1373        routine_id: &str,
1374        fired_at_ms: u64,
1375    ) -> Option<RoutineSpec> {
1376        let mut guard = self.routines.write().await;
1377        let routine = guard.get_mut(routine_id)?;
1378        routine.last_fired_at_ms = Some(fired_at_ms);
1379        let updated = routine.clone();
1380        drop(guard);
1381        let _ = self.persist_routines().await;
1382        Some(updated)
1383    }
1384
1385    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1386        let mut history = self.routine_history.write().await;
1387        history
1388            .entry(event.routine_id.clone())
1389            .or_default()
1390            .push(event);
1391        drop(history);
1392        let _ = self.persist_routine_history().await;
1393    }
1394
1395    pub async fn list_routine_history(
1396        &self,
1397        routine_id: &str,
1398        limit: usize,
1399    ) -> Vec<RoutineHistoryEvent> {
1400        let limit = limit.clamp(1, 500);
1401        let mut rows = self
1402            .routine_history
1403            .read()
1404            .await
1405            .get(routine_id)
1406            .cloned()
1407            .unwrap_or_default();
1408        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1409        rows.truncate(limit);
1410        rows
1411    }
1412
1413    pub async fn create_routine_run(
1414        &self,
1415        routine: &RoutineSpec,
1416        trigger_type: &str,
1417        run_count: u32,
1418        status: RoutineRunStatus,
1419        detail: Option<String>,
1420    ) -> RoutineRunRecord {
1421        let now = now_ms();
1422        let record = RoutineRunRecord {
1423            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1424            routine_id: routine.routine_id.clone(),
1425            trigger_type: trigger_type.to_string(),
1426            run_count,
1427            status,
1428            created_at_ms: now,
1429            updated_at_ms: now,
1430            fired_at_ms: Some(now),
1431            started_at_ms: None,
1432            finished_at_ms: None,
1433            requires_approval: routine.requires_approval,
1434            approval_reason: None,
1435            denial_reason: None,
1436            paused_reason: None,
1437            detail,
1438            entrypoint: routine.entrypoint.clone(),
1439            args: routine.args.clone(),
1440            allowed_tools: routine.allowed_tools.clone(),
1441            output_targets: routine.output_targets.clone(),
1442            artifacts: Vec::new(),
1443            active_session_ids: Vec::new(),
1444            prompt_tokens: 0,
1445            completion_tokens: 0,
1446            total_tokens: 0,
1447            estimated_cost_usd: 0.0,
1448        };
1449        self.routine_runs
1450            .write()
1451            .await
1452            .insert(record.run_id.clone(), record.clone());
1453        let _ = self.persist_routine_runs().await;
1454        record
1455    }
1456
1457    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1458        self.routine_runs.read().await.get(run_id).cloned()
1459    }
1460
1461    pub async fn list_routine_runs(
1462        &self,
1463        routine_id: Option<&str>,
1464        limit: usize,
1465    ) -> Vec<RoutineRunRecord> {
1466        let mut rows = self
1467            .routine_runs
1468            .read()
1469            .await
1470            .values()
1471            .filter(|row| {
1472                if let Some(id) = routine_id {
1473                    row.routine_id == id
1474                } else {
1475                    true
1476                }
1477            })
1478            .cloned()
1479            .collect::<Vec<_>>();
1480        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1481        rows.truncate(limit.clamp(1, 500));
1482        rows
1483    }
1484
1485    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1486        let mut guard = self.routine_runs.write().await;
1487        let next_run_id = guard
1488            .values()
1489            .filter(|row| row.status == RoutineRunStatus::Queued)
1490            .min_by(|a, b| {
1491                a.created_at_ms
1492                    .cmp(&b.created_at_ms)
1493                    .then_with(|| a.run_id.cmp(&b.run_id))
1494            })
1495            .map(|row| row.run_id.clone())?;
1496        let now = now_ms();
1497        let row = guard.get_mut(&next_run_id)?;
1498        row.status = RoutineRunStatus::Running;
1499        row.updated_at_ms = now;
1500        row.started_at_ms = Some(now);
1501        let claimed = row.clone();
1502        drop(guard);
1503        let _ = self.persist_routine_runs().await;
1504        Some(claimed)
1505    }
1506
1507    pub async fn set_routine_session_policy(
1508        &self,
1509        session_id: String,
1510        run_id: String,
1511        routine_id: String,
1512        allowed_tools: Vec<String>,
1513    ) {
1514        let policy = RoutineSessionPolicy {
1515            session_id: session_id.clone(),
1516            run_id,
1517            routine_id,
1518            allowed_tools: normalize_allowed_tools(allowed_tools),
1519        };
1520        self.routine_session_policies
1521            .write()
1522            .await
1523            .insert(session_id, policy);
1524    }
1525
1526    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1527        self.routine_session_policies
1528            .read()
1529            .await
1530            .get(session_id)
1531            .cloned()
1532    }
1533
1534    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1535        self.routine_session_policies
1536            .write()
1537            .await
1538            .remove(session_id);
1539    }
1540
1541    pub async fn update_routine_run_status(
1542        &self,
1543        run_id: &str,
1544        status: RoutineRunStatus,
1545        reason: Option<String>,
1546    ) -> Option<RoutineRunRecord> {
1547        let mut guard = self.routine_runs.write().await;
1548        let row = guard.get_mut(run_id)?;
1549        row.status = status.clone();
1550        row.updated_at_ms = now_ms();
1551        match status {
1552            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1553            RoutineRunStatus::Running => {
1554                row.started_at_ms.get_or_insert_with(now_ms);
1555                if let Some(detail) = reason {
1556                    row.detail = Some(detail);
1557                }
1558            }
1559            RoutineRunStatus::Denied => row.denial_reason = reason,
1560            RoutineRunStatus::Paused => row.paused_reason = reason,
1561            RoutineRunStatus::Completed
1562            | RoutineRunStatus::Failed
1563            | RoutineRunStatus::Cancelled => {
1564                row.finished_at_ms = Some(now_ms());
1565                if let Some(detail) = reason {
1566                    row.detail = Some(detail);
1567                }
1568            }
1569            _ => {
1570                if let Some(detail) = reason {
1571                    row.detail = Some(detail);
1572                }
1573            }
1574        }
1575        let updated = row.clone();
1576        drop(guard);
1577        let _ = self.persist_routine_runs().await;
1578        Some(updated)
1579    }
1580
1581    pub async fn append_routine_run_artifact(
1582        &self,
1583        run_id: &str,
1584        artifact: RoutineRunArtifact,
1585    ) -> Option<RoutineRunRecord> {
1586        let mut guard = self.routine_runs.write().await;
1587        let row = guard.get_mut(run_id)?;
1588        row.updated_at_ms = now_ms();
1589        row.artifacts.push(artifact);
1590        let updated = row.clone();
1591        drop(guard);
1592        let _ = self.persist_routine_runs().await;
1593        Some(updated)
1594    }
1595
1596    pub async fn add_active_session_id(
1597        &self,
1598        run_id: &str,
1599        session_id: String,
1600    ) -> Option<RoutineRunRecord> {
1601        let mut guard = self.routine_runs.write().await;
1602        let row = guard.get_mut(run_id)?;
1603        if !row.active_session_ids.iter().any(|id| id == &session_id) {
1604            row.active_session_ids.push(session_id);
1605        }
1606        row.updated_at_ms = now_ms();
1607        let updated = row.clone();
1608        drop(guard);
1609        let _ = self.persist_routine_runs().await;
1610        Some(updated)
1611    }
1612
1613    pub async fn clear_active_session_id(
1614        &self,
1615        run_id: &str,
1616        session_id: &str,
1617    ) -> Option<RoutineRunRecord> {
1618        let mut guard = self.routine_runs.write().await;
1619        let row = guard.get_mut(run_id)?;
1620        row.active_session_ids.retain(|id| id != session_id);
1621        row.updated_at_ms = now_ms();
1622        let updated = row.clone();
1623        drop(guard);
1624        let _ = self.persist_routine_runs().await;
1625        Some(updated)
1626    }
1627
1628    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1629        if !self.automations_v2_path.exists() {
1630            return Ok(());
1631        }
1632        let raw = fs::read_to_string(&self.automations_v2_path).await?;
1633        let parsed =
1634            serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(&raw)
1635                .unwrap_or_default();
1636        *self.automations_v2.write().await = parsed;
1637        Ok(())
1638    }
1639
1640    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1641        if let Some(parent) = self.automations_v2_path.parent() {
1642            fs::create_dir_all(parent).await?;
1643        }
1644        let payload = {
1645            let guard = self.automations_v2.read().await;
1646            serde_json::to_string_pretty(&*guard)?
1647        };
1648        fs::write(&self.automations_v2_path, payload).await?;
1649        Ok(())
1650    }
1651
1652    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1653        if !self.automation_v2_runs_path.exists() {
1654            return Ok(());
1655        }
1656        let raw = fs::read_to_string(&self.automation_v2_runs_path).await?;
1657        let parsed =
1658            serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(&raw)
1659                .unwrap_or_default();
1660        *self.automation_v2_runs.write().await = parsed;
1661        Ok(())
1662    }
1663
1664    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1665        if let Some(parent) = self.automation_v2_runs_path.parent() {
1666            fs::create_dir_all(parent).await?;
1667        }
1668        let payload = {
1669            let guard = self.automation_v2_runs.read().await;
1670            serde_json::to_string_pretty(&*guard)?
1671        };
1672        fs::write(&self.automation_v2_runs_path, payload).await?;
1673        Ok(())
1674    }
1675
1676    pub async fn put_automation_v2(
1677        &self,
1678        mut automation: AutomationV2Spec,
1679    ) -> anyhow::Result<AutomationV2Spec> {
1680        if automation.automation_id.trim().is_empty() {
1681            anyhow::bail!("automation_id is required");
1682        }
1683        for agent in &mut automation.agents {
1684            if agent.display_name.trim().is_empty() {
1685                agent.display_name = auto_generated_agent_name(&agent.agent_id);
1686            }
1687            agent.tool_policy.allowlist =
1688                normalize_allowed_tools(agent.tool_policy.allowlist.clone());
1689            agent.tool_policy.denylist =
1690                normalize_allowed_tools(agent.tool_policy.denylist.clone());
1691            agent.mcp_policy.allowed_servers =
1692                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
1693            agent.mcp_policy.allowed_tools = agent
1694                .mcp_policy
1695                .allowed_tools
1696                .take()
1697                .map(normalize_allowed_tools);
1698        }
1699        let now = now_ms();
1700        if automation.created_at_ms == 0 {
1701            automation.created_at_ms = now;
1702        }
1703        automation.updated_at_ms = now;
1704        if automation.next_fire_at_ms.is_none() {
1705            automation.next_fire_at_ms =
1706                automation_schedule_next_fire_at_ms(&automation.schedule, now);
1707        }
1708        self.automations_v2
1709            .write()
1710            .await
1711            .insert(automation.automation_id.clone(), automation.clone());
1712        self.persist_automations_v2().await?;
1713        Ok(automation)
1714    }
1715
1716    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
1717        self.automations_v2.read().await.get(automation_id).cloned()
1718    }
1719
1720    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
1721        let mut rows = self
1722            .automations_v2
1723            .read()
1724            .await
1725            .values()
1726            .cloned()
1727            .collect::<Vec<_>>();
1728        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
1729        rows
1730    }
1731
1732    pub async fn delete_automation_v2(
1733        &self,
1734        automation_id: &str,
1735    ) -> anyhow::Result<Option<AutomationV2Spec>> {
1736        let removed = self.automations_v2.write().await.remove(automation_id);
1737        self.persist_automations_v2().await?;
1738        Ok(removed)
1739    }
1740
1741    pub async fn create_automation_v2_run(
1742        &self,
1743        automation: &AutomationV2Spec,
1744        trigger_type: &str,
1745    ) -> anyhow::Result<AutomationV2RunRecord> {
1746        let now = now_ms();
1747        let pending_nodes = automation
1748            .flow
1749            .nodes
1750            .iter()
1751            .map(|n| n.node_id.clone())
1752            .collect::<Vec<_>>();
1753        let run = AutomationV2RunRecord {
1754            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
1755            automation_id: automation.automation_id.clone(),
1756            trigger_type: trigger_type.to_string(),
1757            status: AutomationRunStatus::Queued,
1758            created_at_ms: now,
1759            updated_at_ms: now,
1760            started_at_ms: None,
1761            finished_at_ms: None,
1762            active_session_ids: Vec::new(),
1763            active_instance_ids: Vec::new(),
1764            checkpoint: AutomationRunCheckpoint {
1765                completed_nodes: Vec::new(),
1766                pending_nodes,
1767                node_outputs: std::collections::HashMap::new(),
1768            },
1769            pause_reason: None,
1770            resume_reason: None,
1771            detail: None,
1772            prompt_tokens: 0,
1773            completion_tokens: 0,
1774            total_tokens: 0,
1775            estimated_cost_usd: 0.0,
1776        };
1777        self.automation_v2_runs
1778            .write()
1779            .await
1780            .insert(run.run_id.clone(), run.clone());
1781        self.persist_automation_v2_runs().await?;
1782        Ok(run)
1783    }
1784
1785    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
1786        self.automation_v2_runs.read().await.get(run_id).cloned()
1787    }
1788
1789    pub async fn list_automation_v2_runs(
1790        &self,
1791        automation_id: Option<&str>,
1792        limit: usize,
1793    ) -> Vec<AutomationV2RunRecord> {
1794        let mut rows = self
1795            .automation_v2_runs
1796            .read()
1797            .await
1798            .values()
1799            .filter(|row| {
1800                if let Some(id) = automation_id {
1801                    row.automation_id == id
1802                } else {
1803                    true
1804                }
1805            })
1806            .cloned()
1807            .collect::<Vec<_>>();
1808        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1809        rows.truncate(limit.clamp(1, 500));
1810        rows
1811    }
1812
1813    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
1814        let mut guard = self.automation_v2_runs.write().await;
1815        let run_id = guard
1816            .values()
1817            .filter(|row| row.status == AutomationRunStatus::Queued)
1818            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
1819            .map(|row| row.run_id.clone())?;
1820        let now = now_ms();
1821        let run = guard.get_mut(&run_id)?;
1822        run.status = AutomationRunStatus::Running;
1823        run.updated_at_ms = now;
1824        run.started_at_ms.get_or_insert(now);
1825        let claimed = run.clone();
1826        drop(guard);
1827        let _ = self.persist_automation_v2_runs().await;
1828        Some(claimed)
1829    }
1830
1831    pub async fn update_automation_v2_run(
1832        &self,
1833        run_id: &str,
1834        update: impl FnOnce(&mut AutomationV2RunRecord),
1835    ) -> Option<AutomationV2RunRecord> {
1836        let mut guard = self.automation_v2_runs.write().await;
1837        let run = guard.get_mut(run_id)?;
1838        update(run);
1839        run.updated_at_ms = now_ms();
1840        if matches!(
1841            run.status,
1842            AutomationRunStatus::Completed
1843                | AutomationRunStatus::Failed
1844                | AutomationRunStatus::Cancelled
1845        ) {
1846            run.finished_at_ms.get_or_insert_with(now_ms);
1847        }
1848        let out = run.clone();
1849        drop(guard);
1850        let _ = self.persist_automation_v2_runs().await;
1851        Some(out)
1852    }
1853
1854    pub async fn add_automation_v2_session(
1855        &self,
1856        run_id: &str,
1857        session_id: &str,
1858    ) -> Option<AutomationV2RunRecord> {
1859        let updated = self
1860            .update_automation_v2_run(run_id, |row| {
1861                if !row.active_session_ids.iter().any(|id| id == session_id) {
1862                    row.active_session_ids.push(session_id.to_string());
1863                }
1864            })
1865            .await;
1866        self.automation_v2_session_runs
1867            .write()
1868            .await
1869            .insert(session_id.to_string(), run_id.to_string());
1870        updated
1871    }
1872
1873    pub async fn clear_automation_v2_session(
1874        &self,
1875        run_id: &str,
1876        session_id: &str,
1877    ) -> Option<AutomationV2RunRecord> {
1878        self.automation_v2_session_runs
1879            .write()
1880            .await
1881            .remove(session_id);
1882        self.update_automation_v2_run(run_id, |row| {
1883            row.active_session_ids.retain(|id| id != session_id);
1884        })
1885        .await
1886    }
1887
1888    pub async fn apply_provider_usage_to_runs(
1889        &self,
1890        session_id: &str,
1891        prompt_tokens: u64,
1892        completion_tokens: u64,
1893        total_tokens: u64,
1894    ) {
1895        if let Some(policy) = self.routine_session_policy(session_id).await {
1896            let rate = self.token_cost_per_1k_usd.max(0.0);
1897            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
1898            let mut guard = self.routine_runs.write().await;
1899            if let Some(run) = guard.get_mut(&policy.run_id) {
1900                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
1901                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
1902                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
1903                run.estimated_cost_usd += delta_cost;
1904                run.updated_at_ms = now_ms();
1905            }
1906            drop(guard);
1907            let _ = self.persist_routine_runs().await;
1908        }
1909
1910        let maybe_v2_run_id = self
1911            .automation_v2_session_runs
1912            .read()
1913            .await
1914            .get(session_id)
1915            .cloned();
1916        if let Some(run_id) = maybe_v2_run_id {
1917            let rate = self.token_cost_per_1k_usd.max(0.0);
1918            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
1919            let mut guard = self.automation_v2_runs.write().await;
1920            if let Some(run) = guard.get_mut(&run_id) {
1921                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
1922                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
1923                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
1924                run.estimated_cost_usd += delta_cost;
1925                run.updated_at_ms = now_ms();
1926            }
1927            drop(guard);
1928            let _ = self.persist_automation_v2_runs().await;
1929        }
1930    }
1931
1932    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
1933        let mut fired = Vec::new();
1934        let mut guard = self.automations_v2.write().await;
1935        for automation in guard.values_mut() {
1936            if automation.status != AutomationV2Status::Active {
1937                continue;
1938            }
1939            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
1940                automation.next_fire_at_ms =
1941                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
1942                continue;
1943            };
1944            if now_ms < next_fire_at_ms {
1945                continue;
1946            }
1947            let run_count =
1948                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
1949            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
1950            automation.next_fire_at_ms = next;
1951            automation.last_fired_at_ms = Some(now_ms);
1952            for _ in 0..run_count {
1953                fired.push(automation.automation_id.clone());
1954            }
1955        }
1956        drop(guard);
1957        let _ = self.persist_automations_v2().await;
1958        fired
1959    }
1960}
1961
1962async fn build_channels_config(
1963    state: &AppState,
1964    channels: &ChannelsConfigFile,
1965) -> Option<ChannelsConfig> {
1966    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
1967        return None;
1968    }
1969    Some(ChannelsConfig {
1970        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
1971            bot_token: cfg.bot_token,
1972            allowed_users: cfg.allowed_users,
1973            mention_only: cfg.mention_only,
1974            style_profile: cfg.style_profile,
1975        }),
1976        discord: channels.discord.clone().map(|cfg| DiscordConfig {
1977            bot_token: cfg.bot_token,
1978            guild_id: cfg.guild_id,
1979            allowed_users: cfg.allowed_users,
1980            mention_only: cfg.mention_only,
1981        }),
1982        slack: channels.slack.clone().map(|cfg| SlackConfig {
1983            bot_token: cfg.bot_token,
1984            channel_id: cfg.channel_id,
1985            allowed_users: cfg.allowed_users,
1986        }),
1987        server_base_url: state.server_base_url(),
1988        api_token: state.api_token().await.unwrap_or_default(),
1989        tool_policy: channels.tool_policy.clone(),
1990    })
1991}
1992
1993fn normalize_web_ui_prefix(prefix: &str) -> String {
1994    let trimmed = prefix.trim();
1995    if trimmed.is_empty() || trimmed == "/" {
1996        return "/admin".to_string();
1997    }
1998    let with_leading = if trimmed.starts_with('/') {
1999        trimmed.to_string()
2000    } else {
2001        format!("/{trimmed}")
2002    };
2003    with_leading.trim_end_matches('/').to_string()
2004}
2005
2006fn default_web_ui_prefix() -> String {
2007    "/admin".to_string()
2008}
2009
2010fn default_allow_all() -> Vec<String> {
2011    vec!["*".to_string()]
2012}
2013
2014fn default_discord_mention_only() -> bool {
2015    true
2016}
2017
2018fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
2019    normalize_non_empty_list(raw)
2020}
2021
2022fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
2023    let mut out = Vec::new();
2024    let mut seen = std::collections::HashSet::new();
2025    for item in raw {
2026        let normalized = item.trim().to_string();
2027        if normalized.is_empty() {
2028            continue;
2029        }
2030        if seen.insert(normalized.clone()) {
2031            out.push(normalized);
2032        }
2033    }
2034    out
2035}
2036
2037fn resolve_run_stale_ms() -> u64 {
2038    std::env::var("TANDEM_RUN_STALE_MS")
2039        .ok()
2040        .and_then(|v| v.trim().parse::<u64>().ok())
2041        .unwrap_or(120_000)
2042        .clamp(30_000, 600_000)
2043}
2044
2045fn resolve_token_cost_per_1k_usd() -> f64 {
2046    std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
2047        .ok()
2048        .and_then(|v| v.trim().parse::<f64>().ok())
2049        .unwrap_or(0.0)
2050        .max(0.0)
2051}
2052
2053fn resolve_shared_resources_path() -> PathBuf {
2054    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
2055        let trimmed = dir.trim();
2056        if !trimmed.is_empty() {
2057            return PathBuf::from(trimmed).join("shared_resources.json");
2058        }
2059    }
2060    default_state_dir().join("shared_resources.json")
2061}
2062
2063fn resolve_routines_path() -> PathBuf {
2064    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
2065        let trimmed = dir.trim();
2066        if !trimmed.is_empty() {
2067            return PathBuf::from(trimmed).join("routines.json");
2068        }
2069    }
2070    default_state_dir().join("routines.json")
2071}
2072
2073fn resolve_routine_history_path() -> PathBuf {
2074    if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
2075        let trimmed = root.trim();
2076        if !trimmed.is_empty() {
2077            return PathBuf::from(trimmed).join("routine_history.json");
2078        }
2079    }
2080    default_state_dir().join("routine_history.json")
2081}
2082
2083fn resolve_routine_runs_path() -> PathBuf {
2084    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2085        let trimmed = root.trim();
2086        if !trimmed.is_empty() {
2087            return PathBuf::from(trimmed).join("routine_runs.json");
2088        }
2089    }
2090    default_state_dir().join("routine_runs.json")
2091}
2092
2093fn resolve_automations_v2_path() -> PathBuf {
2094    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2095        let trimmed = root.trim();
2096        if !trimmed.is_empty() {
2097            return PathBuf::from(trimmed).join("automations_v2.json");
2098        }
2099    }
2100    default_state_dir().join("automations_v2.json")
2101}
2102
2103fn resolve_automation_v2_runs_path() -> PathBuf {
2104    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
2105        let trimmed = root.trim();
2106        if !trimmed.is_empty() {
2107            return PathBuf::from(trimmed).join("automation_v2_runs.json");
2108        }
2109    }
2110    default_state_dir().join("automation_v2_runs.json")
2111}
2112
2113fn resolve_agent_team_audit_path() -> PathBuf {
2114    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
2115        let trimmed = base.trim();
2116        if !trimmed.is_empty() {
2117            return PathBuf::from(trimmed)
2118                .join("agent-team")
2119                .join("audit.log.jsonl");
2120        }
2121    }
2122    default_state_dir()
2123        .join("agent-team")
2124        .join("audit.log.jsonl")
2125}
2126
2127fn default_state_dir() -> PathBuf {
2128    if let Ok(paths) = resolve_shared_paths() {
2129        return paths.engine_state_dir;
2130    }
2131    if let Some(data_dir) = dirs::data_dir() {
2132        return data_dir.join("tandem").join("data");
2133    }
2134    dirs::home_dir()
2135        .map(|home| home.join(".tandem").join("data"))
2136        .unwrap_or_else(|| PathBuf::from(".tandem"))
2137}
2138
2139fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
2140    match schedule {
2141        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
2142        RoutineSchedule::Cron { .. } => None,
2143    }
2144}
2145
2146fn parse_timezone(timezone: &str) -> Option<Tz> {
2147    timezone.trim().parse::<Tz>().ok()
2148}
2149
2150fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
2151    let tz = parse_timezone(timezone)?;
2152    let schedule = Schedule::from_str(expression).ok()?;
2153    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
2154    let local_from = from_dt.with_timezone(&tz);
2155    let next = schedule.after(&local_from).next()?;
2156    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
2157}
2158
2159fn compute_next_schedule_fire_at_ms(
2160    schedule: &RoutineSchedule,
2161    timezone: &str,
2162    from_ms: u64,
2163) -> Option<u64> {
2164    let _ = parse_timezone(timezone)?;
2165    match schedule {
2166        RoutineSchedule::IntervalSeconds { seconds } => {
2167            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
2168        }
2169        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
2170    }
2171}
2172
2173fn compute_misfire_plan_for_schedule(
2174    now_ms: u64,
2175    next_fire_at_ms: u64,
2176    schedule: &RoutineSchedule,
2177    timezone: &str,
2178    policy: &RoutineMisfirePolicy,
2179) -> (u32, u64) {
2180    match schedule {
2181        RoutineSchedule::IntervalSeconds { .. } => {
2182            let Some(interval_ms) = routine_interval_ms(schedule) else {
2183                return (0, next_fire_at_ms);
2184            };
2185            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
2186        }
2187        RoutineSchedule::Cron { expression } => {
2188            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
2189                .unwrap_or_else(|| now_ms.saturating_add(60_000));
2190            match policy {
2191                RoutineMisfirePolicy::Skip => (0, aligned_next),
2192                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
2193                RoutineMisfirePolicy::CatchUp { max_runs } => {
2194                    let mut count = 0u32;
2195                    let mut cursor = next_fire_at_ms;
2196                    while cursor <= now_ms && count < *max_runs {
2197                        count = count.saturating_add(1);
2198                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
2199                            break;
2200                        };
2201                        if next <= cursor {
2202                            break;
2203                        }
2204                        cursor = next;
2205                    }
2206                    (count, aligned_next)
2207                }
2208            }
2209        }
2210    }
2211}
2212
2213fn compute_misfire_plan(
2214    now_ms: u64,
2215    next_fire_at_ms: u64,
2216    interval_ms: u64,
2217    policy: &RoutineMisfirePolicy,
2218) -> (u32, u64) {
2219    if now_ms < next_fire_at_ms || interval_ms == 0 {
2220        return (0, next_fire_at_ms);
2221    }
2222    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
2223    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
2224    match policy {
2225        RoutineMisfirePolicy::Skip => (0, aligned_next),
2226        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
2227        RoutineMisfirePolicy::CatchUp { max_runs } => {
2228            let count = missed.min(u64::from(*max_runs)) as u32;
2229            (count, aligned_next)
2230        }
2231    }
2232}
2233
2234fn auto_generated_agent_name(agent_id: &str) -> String {
2235    let names = [
2236        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
2237    ];
2238    let digest = Sha256::digest(agent_id.as_bytes());
2239    let idx = usize::from(digest[0]) % names.len();
2240    format!("{}-{:02x}", names[idx], digest[1])
2241}
2242
2243fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
2244    match schedule.schedule_type {
2245        AutomationV2ScheduleType::Manual => None,
2246        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
2247            seconds: schedule.interval_seconds.unwrap_or(60),
2248        }),
2249        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
2250            expression: schedule.cron_expression.clone().unwrap_or_default(),
2251        }),
2252    }
2253}
2254
2255fn automation_schedule_next_fire_at_ms(
2256    schedule: &AutomationV2Schedule,
2257    from_ms: u64,
2258) -> Option<u64> {
2259    let routine_schedule = schedule_from_automation_v2(schedule)?;
2260    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
2261}
2262
2263fn automation_schedule_due_count(
2264    schedule: &AutomationV2Schedule,
2265    now_ms: u64,
2266    next_fire_at_ms: u64,
2267) -> u32 {
2268    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
2269        return 0;
2270    };
2271    let (count, _) = compute_misfire_plan_for_schedule(
2272        now_ms,
2273        next_fire_at_ms,
2274        &routine_schedule,
2275        &schedule.timezone,
2276        &schedule.misfire_policy,
2277    );
2278    count.max(1)
2279}
2280
2281#[derive(Debug, Clone, PartialEq, Eq)]
2282pub enum RoutineExecutionDecision {
2283    Allowed,
2284    RequiresApproval { reason: String },
2285    Blocked { reason: String },
2286}
2287
2288pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
2289    let entrypoint = routine.entrypoint.to_ascii_lowercase();
2290    if entrypoint.starts_with("connector.")
2291        || entrypoint.starts_with("integration.")
2292        || entrypoint.contains("external")
2293    {
2294        return true;
2295    }
2296    routine
2297        .args
2298        .get("uses_external_integrations")
2299        .and_then(|v| v.as_bool())
2300        .unwrap_or(false)
2301        || routine
2302            .args
2303            .get("connector_id")
2304            .and_then(|v| v.as_str())
2305            .is_some()
2306}
2307
2308pub fn evaluate_routine_execution_policy(
2309    routine: &RoutineSpec,
2310    trigger_type: &str,
2311) -> RoutineExecutionDecision {
2312    if !routine_uses_external_integrations(routine) {
2313        return RoutineExecutionDecision::Allowed;
2314    }
2315    if !routine.external_integrations_allowed {
2316        return RoutineExecutionDecision::Blocked {
2317            reason: "external integrations are disabled by policy".to_string(),
2318        };
2319    }
2320    if routine.requires_approval {
2321        return RoutineExecutionDecision::RequiresApproval {
2322            reason: format!(
2323                "manual approval required before external side effects ({})",
2324                trigger_type
2325            ),
2326        };
2327    }
2328    RoutineExecutionDecision::Allowed
2329}
2330
2331fn is_valid_resource_key(key: &str) -> bool {
2332    let trimmed = key.trim();
2333    if trimmed.is_empty() {
2334        return false;
2335    }
2336    if trimmed == "swarm.active_tasks" {
2337        return true;
2338    }
2339    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
2340    if !allowed_prefix
2341        .iter()
2342        .any(|prefix| trimmed.starts_with(prefix))
2343    {
2344        return false;
2345    }
2346    !trimmed.contains("//")
2347}
2348
2349impl Deref for AppState {
2350    type Target = RuntimeState;
2351
2352    fn deref(&self) -> &Self::Target {
2353        self.runtime
2354            .get()
2355            .expect("runtime accessed before startup completion")
2356    }
2357}
2358
2359#[derive(Clone)]
2360struct ServerPromptContextHook {
2361    state: AppState,
2362}
2363
2364impl ServerPromptContextHook {
2365    fn new(state: AppState) -> Self {
2366        Self { state }
2367    }
2368
2369    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
2370        let paths = resolve_shared_paths().ok()?;
2371        MemoryDatabase::new(&paths.memory_db_path).await.ok()
2372    }
2373
2374    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
2375        let paths = resolve_shared_paths().ok()?;
2376        tandem_memory::MemoryManager::new(&paths.memory_db_path)
2377            .await
2378            .ok()
2379    }
2380
2381    fn hash_query(input: &str) -> String {
2382        let mut hasher = Sha256::new();
2383        hasher.update(input.as_bytes());
2384        format!("{:x}", hasher.finalize())
2385    }
2386
2387    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
2388        let mut out = vec!["<memory_context>".to_string()];
2389        let mut used = 0usize;
2390        for hit in hits {
2391            let text = hit
2392                .record
2393                .content
2394                .split_whitespace()
2395                .take(60)
2396                .collect::<Vec<_>>()
2397                .join(" ");
2398            let line = format!(
2399                "- [{:.3}] {} (source={}, run={})",
2400                hit.score, text, hit.record.source_type, hit.record.run_id
2401            );
2402            used = used.saturating_add(line.len());
2403            if used > 2200 {
2404                break;
2405            }
2406            out.push(line);
2407        }
2408        out.push("</memory_context>".to_string());
2409        out.join("\n")
2410    }
2411
2412    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
2413        chunk
2414            .metadata
2415            .as_ref()
2416            .and_then(|meta| meta.get("source_url"))
2417            .and_then(Value::as_str)
2418            .map(str::trim)
2419            .filter(|v| !v.is_empty())
2420            .map(ToString::to_string)
2421    }
2422
2423    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
2424        if let Some(path) = chunk
2425            .metadata
2426            .as_ref()
2427            .and_then(|meta| meta.get("relative_path"))
2428            .and_then(Value::as_str)
2429            .map(str::trim)
2430            .filter(|v| !v.is_empty())
2431        {
2432            return path.to_string();
2433        }
2434        chunk
2435            .source
2436            .strip_prefix("guide_docs:")
2437            .unwrap_or(chunk.source.as_str())
2438            .to_string()
2439    }
2440
2441    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
2442        let mut out = vec!["<docs_context>".to_string()];
2443        let mut used = 0usize;
2444        for hit in hits {
2445            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
2446            let path = Self::extract_docs_relative_path(&hit.chunk);
2447            let text = hit
2448                .chunk
2449                .content
2450                .split_whitespace()
2451                .take(70)
2452                .collect::<Vec<_>>()
2453                .join(" ");
2454            let line = format!(
2455                "- [{:.3}] {} (doc_path={}, source_url={})",
2456                hit.similarity, text, path, url
2457            );
2458            used = used.saturating_add(line.len());
2459            if used > 2800 {
2460                break;
2461            }
2462            out.push(line);
2463        }
2464        out.push("</docs_context>".to_string());
2465        out.join("\n")
2466    }
2467
2468    async fn search_embedded_docs(
2469        &self,
2470        query: &str,
2471        limit: usize,
2472    ) -> Vec<tandem_memory::types::MemorySearchResult> {
2473        let Some(manager) = self.open_memory_manager().await else {
2474            return Vec::new();
2475        };
2476        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
2477        manager
2478            .search(
2479                query,
2480                Some(MemoryTier::Global),
2481                None,
2482                None,
2483                Some(search_limit),
2484            )
2485            .await
2486            .unwrap_or_default()
2487            .into_iter()
2488            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
2489            .take(limit)
2490            .collect()
2491    }
2492
2493    fn should_skip_memory_injection(query: &str) -> bool {
2494        let trimmed = query.trim();
2495        if trimmed.is_empty() {
2496            return true;
2497        }
2498        let lower = trimmed.to_ascii_lowercase();
2499        let social = [
2500            "hi",
2501            "hello",
2502            "hey",
2503            "thanks",
2504            "thank you",
2505            "ok",
2506            "okay",
2507            "cool",
2508            "nice",
2509            "yo",
2510            "good morning",
2511            "good afternoon",
2512            "good evening",
2513        ];
2514        lower.len() <= 32 && social.contains(&lower.as_str())
2515    }
2516
2517    fn personality_preset_text(preset: &str) -> &'static str {
2518        match preset {
2519            "concise" => {
2520                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
2521            }
2522            "friendly" => {
2523                "Default style: friendly and supportive while staying technically rigorous and concrete."
2524            }
2525            "mentor" => {
2526                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
2527            }
2528            "critical" => {
2529                "Default style: critical and risk-first. Surface failure modes and assumptions early."
2530            }
2531            _ => {
2532                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
2533            }
2534        }
2535    }
2536
2537    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
2538        let allow_agent_override = agent_name
2539            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
2540            .unwrap_or(false);
2541        let legacy_bot_name = config
2542            .get("bot_name")
2543            .and_then(Value::as_str)
2544            .map(str::trim)
2545            .filter(|v| !v.is_empty());
2546        let bot_name = config
2547            .get("identity")
2548            .and_then(|identity| identity.get("bot"))
2549            .and_then(|bot| bot.get("canonical_name"))
2550            .and_then(Value::as_str)
2551            .map(str::trim)
2552            .filter(|v| !v.is_empty())
2553            .or(legacy_bot_name)
2554            .unwrap_or("Tandem");
2555
2556        let default_profile = config
2557            .get("identity")
2558            .and_then(|identity| identity.get("personality"))
2559            .and_then(|personality| personality.get("default"));
2560        let default_preset = default_profile
2561            .and_then(|profile| profile.get("preset"))
2562            .and_then(Value::as_str)
2563            .map(str::trim)
2564            .filter(|v| !v.is_empty())
2565            .unwrap_or("balanced");
2566        let default_custom = default_profile
2567            .and_then(|profile| profile.get("custom_instructions"))
2568            .and_then(Value::as_str)
2569            .map(str::trim)
2570            .filter(|v| !v.is_empty())
2571            .map(ToString::to_string);
2572        let legacy_persona = config
2573            .get("persona")
2574            .and_then(Value::as_str)
2575            .map(str::trim)
2576            .filter(|v| !v.is_empty())
2577            .map(ToString::to_string);
2578
2579        let per_agent_profile = if allow_agent_override {
2580            agent_name.and_then(|name| {
2581                config
2582                    .get("identity")
2583                    .and_then(|identity| identity.get("personality"))
2584                    .and_then(|personality| personality.get("per_agent"))
2585                    .and_then(|per_agent| per_agent.get(name))
2586            })
2587        } else {
2588            None
2589        };
2590        let preset = per_agent_profile
2591            .and_then(|profile| profile.get("preset"))
2592            .and_then(Value::as_str)
2593            .map(str::trim)
2594            .filter(|v| !v.is_empty())
2595            .unwrap_or(default_preset);
2596        let custom = per_agent_profile
2597            .and_then(|profile| profile.get("custom_instructions"))
2598            .and_then(Value::as_str)
2599            .map(str::trim)
2600            .filter(|v| !v.is_empty())
2601            .map(ToString::to_string)
2602            .or(default_custom)
2603            .or(legacy_persona);
2604
2605        let mut lines = vec![
2606            format!("You are {bot_name}, an AI assistant."),
2607            Self::personality_preset_text(preset).to_string(),
2608        ];
2609        if let Some(custom) = custom {
2610            lines.push(format!("Additional personality instructions: {custom}"));
2611        }
2612        Some(lines.join("\n"))
2613    }
2614}
2615
2616impl PromptContextHook for ServerPromptContextHook {
2617    fn augment_provider_messages(
2618        &self,
2619        ctx: PromptContextHookContext,
2620        mut messages: Vec<ChatMessage>,
2621    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
2622        let this = self.clone();
2623        Box::pin(async move {
2624            // Startup can invoke prompt plumbing before RuntimeState is installed.
2625            // Never panic from context hooks; fail-open and continue without augmentation.
2626            if !this.state.is_ready() {
2627                return Ok(messages);
2628            }
2629            let run = this.state.run_registry.get(&ctx.session_id).await;
2630            let Some(run) = run else {
2631                return Ok(messages);
2632            };
2633            let config = this.state.config.get_effective_value().await;
2634            if let Some(identity_block) =
2635                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
2636            {
2637                messages.push(ChatMessage {
2638                    role: "system".to_string(),
2639                    content: identity_block,
2640                    attachments: Vec::new(),
2641                });
2642            }
2643            let run_id = run.run_id;
2644            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
2645            let query = messages
2646                .iter()
2647                .rev()
2648                .find(|m| m.role == "user")
2649                .map(|m| m.content.clone())
2650                .unwrap_or_default();
2651            if query.trim().is_empty() {
2652                return Ok(messages);
2653            }
2654            if Self::should_skip_memory_injection(&query) {
2655                return Ok(messages);
2656            }
2657
2658            let docs_hits = this.search_embedded_docs(&query, 6).await;
2659            if !docs_hits.is_empty() {
2660                let docs_block = Self::build_docs_memory_block(&docs_hits);
2661                messages.push(ChatMessage {
2662                    role: "system".to_string(),
2663                    content: docs_block.clone(),
2664                    attachments: Vec::new(),
2665                });
2666                this.state.event_bus.publish(EngineEvent::new(
2667                    "memory.docs.context.injected",
2668                    json!({
2669                        "runID": run_id,
2670                        "sessionID": ctx.session_id,
2671                        "messageID": ctx.message_id,
2672                        "iteration": ctx.iteration,
2673                        "count": docs_hits.len(),
2674                        "tokenSizeApprox": docs_block.split_whitespace().count(),
2675                        "sourcePrefix": "guide_docs:"
2676                    }),
2677                ));
2678                return Ok(messages);
2679            }
2680
2681            let Some(db) = this.open_memory_db().await else {
2682                return Ok(messages);
2683            };
2684            let started = now_ms();
2685            let hits = db
2686                .search_global_memory(&user_id, &query, 8, None, None, None)
2687                .await
2688                .unwrap_or_default();
2689            let latency_ms = now_ms().saturating_sub(started);
2690            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
2691            this.state.event_bus.publish(EngineEvent::new(
2692                "memory.search.performed",
2693                json!({
2694                    "runID": run_id,
2695                    "sessionID": ctx.session_id,
2696                    "messageID": ctx.message_id,
2697                    "providerID": ctx.provider_id,
2698                    "modelID": ctx.model_id,
2699                    "iteration": ctx.iteration,
2700                    "queryHash": Self::hash_query(&query),
2701                    "resultCount": hits.len(),
2702                    "scoreMin": scores.iter().copied().reduce(f64::min),
2703                    "scoreMax": scores.iter().copied().reduce(f64::max),
2704                    "scores": scores,
2705                    "latencyMs": latency_ms,
2706                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
2707                }),
2708            ));
2709
2710            if hits.is_empty() {
2711                return Ok(messages);
2712            }
2713
2714            let memory_block = Self::build_memory_block(&hits);
2715            messages.push(ChatMessage {
2716                role: "system".to_string(),
2717                content: memory_block.clone(),
2718                attachments: Vec::new(),
2719            });
2720            this.state.event_bus.publish(EngineEvent::new(
2721                "memory.context.injected",
2722                json!({
2723                    "runID": run_id,
2724                    "sessionID": ctx.session_id,
2725                    "messageID": ctx.message_id,
2726                    "iteration": ctx.iteration,
2727                    "count": hits.len(),
2728                    "tokenSizeApprox": memory_block.split_whitespace().count(),
2729                }),
2730            ));
2731            Ok(messages)
2732        })
2733    }
2734}
2735
2736fn extract_event_session_id(properties: &Value) -> Option<String> {
2737    properties
2738        .get("sessionID")
2739        .or_else(|| properties.get("sessionId"))
2740        .or_else(|| properties.get("id"))
2741        .and_then(|v| v.as_str())
2742        .map(|s| s.to_string())
2743}
2744
2745fn extract_event_run_id(properties: &Value) -> Option<String> {
2746    properties
2747        .get("runID")
2748        .or_else(|| properties.get("run_id"))
2749        .and_then(|v| v.as_str())
2750        .map(|s| s.to_string())
2751}
2752
2753fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
2754    let session_id = extract_event_session_id(&event.properties)?;
2755    let run_id = extract_event_run_id(&event.properties);
2756    let key = format!("run/{session_id}/status");
2757
2758    let mut base = serde_json::Map::new();
2759    base.insert("sessionID".to_string(), Value::String(session_id));
2760    if let Some(run_id) = run_id {
2761        base.insert("runID".to_string(), Value::String(run_id));
2762    }
2763
2764    match event.event_type.as_str() {
2765        "session.run.started" => {
2766            base.insert("state".to_string(), Value::String("running".to_string()));
2767            base.insert("phase".to_string(), Value::String("run".to_string()));
2768            base.insert(
2769                "eventType".to_string(),
2770                Value::String("session.run.started".to_string()),
2771            );
2772            Some(StatusIndexUpdate {
2773                key,
2774                value: Value::Object(base),
2775            })
2776        }
2777        "session.run.finished" => {
2778            base.insert("state".to_string(), Value::String("finished".to_string()));
2779            base.insert("phase".to_string(), Value::String("run".to_string()));
2780            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
2781                base.insert("result".to_string(), Value::String(status.to_string()));
2782            }
2783            base.insert(
2784                "eventType".to_string(),
2785                Value::String("session.run.finished".to_string()),
2786            );
2787            Some(StatusIndexUpdate {
2788                key,
2789                value: Value::Object(base),
2790            })
2791        }
2792        "message.part.updated" => {
2793            let part_type = event
2794                .properties
2795                .get("part")
2796                .and_then(|v| v.get("type"))
2797                .and_then(|v| v.as_str())?;
2798            let (phase, tool_active) = match part_type {
2799                "tool-invocation" => ("tool", true),
2800                "tool-result" => ("run", false),
2801                _ => return None,
2802            };
2803            base.insert("state".to_string(), Value::String("running".to_string()));
2804            base.insert("phase".to_string(), Value::String(phase.to_string()));
2805            base.insert("toolActive".to_string(), Value::Bool(tool_active));
2806            if let Some(tool_name) = event
2807                .properties
2808                .get("part")
2809                .and_then(|v| v.get("tool"))
2810                .and_then(|v| v.as_str())
2811            {
2812                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
2813            }
2814            base.insert(
2815                "eventType".to_string(),
2816                Value::String("message.part.updated".to_string()),
2817            );
2818            Some(StatusIndexUpdate {
2819                key,
2820                value: Value::Object(base),
2821            })
2822        }
2823        _ => None,
2824    }
2825}
2826
2827pub async fn run_status_indexer(state: AppState) {
2828    let mut rx = state.event_bus.subscribe();
2829    loop {
2830        match rx.recv().await {
2831            Ok(event) => {
2832                if let Some(update) = derive_status_index_update(&event) {
2833                    if let Err(error) = state
2834                        .put_shared_resource(
2835                            update.key,
2836                            update.value,
2837                            None,
2838                            "system.status_indexer".to_string(),
2839                            None,
2840                        )
2841                        .await
2842                    {
2843                        tracing::warn!("status indexer failed to persist update: {error:?}");
2844                    }
2845                }
2846            }
2847            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2848            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2849        }
2850    }
2851}
2852
2853pub async fn run_agent_team_supervisor(state: AppState) {
2854    let mut rx = state.event_bus.subscribe();
2855    loop {
2856        match rx.recv().await {
2857            Ok(event) => {
2858                state.agent_teams.handle_engine_event(&state, &event).await;
2859            }
2860            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2861            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2862        }
2863    }
2864}
2865
2866pub async fn run_usage_aggregator(state: AppState) {
2867    let mut rx = state.event_bus.subscribe();
2868    loop {
2869        match rx.recv().await {
2870            Ok(event) => {
2871                if event.event_type != "provider.usage" {
2872                    continue;
2873                }
2874                let session_id = event
2875                    .properties
2876                    .get("sessionID")
2877                    .and_then(|v| v.as_str())
2878                    .unwrap_or("");
2879                if session_id.is_empty() {
2880                    continue;
2881                }
2882                let prompt_tokens = event
2883                    .properties
2884                    .get("promptTokens")
2885                    .and_then(|v| v.as_u64())
2886                    .unwrap_or(0);
2887                let completion_tokens = event
2888                    .properties
2889                    .get("completionTokens")
2890                    .and_then(|v| v.as_u64())
2891                    .unwrap_or(0);
2892                let total_tokens = event
2893                    .properties
2894                    .get("totalTokens")
2895                    .and_then(|v| v.as_u64())
2896                    .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
2897                state
2898                    .apply_provider_usage_to_runs(
2899                        session_id,
2900                        prompt_tokens,
2901                        completion_tokens,
2902                        total_tokens,
2903                    )
2904                    .await;
2905            }
2906            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2907            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2908        }
2909    }
2910}
2911
2912pub async fn run_routine_scheduler(state: AppState) {
2913    loop {
2914        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
2915        let now = now_ms();
2916        let plans = state.evaluate_routine_misfires(now).await;
2917        for plan in plans {
2918            let Some(routine) = state.get_routine(&plan.routine_id).await else {
2919                continue;
2920            };
2921            match evaluate_routine_execution_policy(&routine, "scheduled") {
2922                RoutineExecutionDecision::Allowed => {
2923                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
2924                    let run = state
2925                        .create_routine_run(
2926                            &routine,
2927                            "scheduled",
2928                            plan.run_count,
2929                            RoutineRunStatus::Queued,
2930                            None,
2931                        )
2932                        .await;
2933                    state
2934                        .append_routine_history(RoutineHistoryEvent {
2935                            routine_id: plan.routine_id.clone(),
2936                            trigger_type: "scheduled".to_string(),
2937                            run_count: plan.run_count,
2938                            fired_at_ms: now,
2939                            status: "queued".to_string(),
2940                            detail: None,
2941                        })
2942                        .await;
2943                    state.event_bus.publish(EngineEvent::new(
2944                        "routine.fired",
2945                        serde_json::json!({
2946                            "routineID": plan.routine_id,
2947                            "runID": run.run_id,
2948                            "runCount": plan.run_count,
2949                            "scheduledAtMs": plan.scheduled_at_ms,
2950                            "nextFireAtMs": plan.next_fire_at_ms,
2951                        }),
2952                    ));
2953                    state.event_bus.publish(EngineEvent::new(
2954                        "routine.run.created",
2955                        serde_json::json!({
2956                            "run": run,
2957                        }),
2958                    ));
2959                }
2960                RoutineExecutionDecision::RequiresApproval { reason } => {
2961                    let run = state
2962                        .create_routine_run(
2963                            &routine,
2964                            "scheduled",
2965                            plan.run_count,
2966                            RoutineRunStatus::PendingApproval,
2967                            Some(reason.clone()),
2968                        )
2969                        .await;
2970                    state
2971                        .append_routine_history(RoutineHistoryEvent {
2972                            routine_id: plan.routine_id.clone(),
2973                            trigger_type: "scheduled".to_string(),
2974                            run_count: plan.run_count,
2975                            fired_at_ms: now,
2976                            status: "pending_approval".to_string(),
2977                            detail: Some(reason.clone()),
2978                        })
2979                        .await;
2980                    state.event_bus.publish(EngineEvent::new(
2981                        "routine.approval_required",
2982                        serde_json::json!({
2983                            "routineID": plan.routine_id,
2984                            "runID": run.run_id,
2985                            "runCount": plan.run_count,
2986                            "triggerType": "scheduled",
2987                            "reason": reason,
2988                        }),
2989                    ));
2990                    state.event_bus.publish(EngineEvent::new(
2991                        "routine.run.created",
2992                        serde_json::json!({
2993                            "run": run,
2994                        }),
2995                    ));
2996                }
2997                RoutineExecutionDecision::Blocked { reason } => {
2998                    let run = state
2999                        .create_routine_run(
3000                            &routine,
3001                            "scheduled",
3002                            plan.run_count,
3003                            RoutineRunStatus::BlockedPolicy,
3004                            Some(reason.clone()),
3005                        )
3006                        .await;
3007                    state
3008                        .append_routine_history(RoutineHistoryEvent {
3009                            routine_id: plan.routine_id.clone(),
3010                            trigger_type: "scheduled".to_string(),
3011                            run_count: plan.run_count,
3012                            fired_at_ms: now,
3013                            status: "blocked_policy".to_string(),
3014                            detail: Some(reason.clone()),
3015                        })
3016                        .await;
3017                    state.event_bus.publish(EngineEvent::new(
3018                        "routine.blocked",
3019                        serde_json::json!({
3020                            "routineID": plan.routine_id,
3021                            "runID": run.run_id,
3022                            "runCount": plan.run_count,
3023                            "triggerType": "scheduled",
3024                            "reason": reason,
3025                        }),
3026                    ));
3027                    state.event_bus.publish(EngineEvent::new(
3028                        "routine.run.created",
3029                        serde_json::json!({
3030                            "run": run,
3031                        }),
3032                    ));
3033                }
3034            }
3035        }
3036    }
3037}
3038
3039pub async fn run_routine_executor(state: AppState) {
3040    loop {
3041        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
3042        let Some(run) = state.claim_next_queued_routine_run().await else {
3043            continue;
3044        };
3045
3046        state.event_bus.publish(EngineEvent::new(
3047            "routine.run.started",
3048            serde_json::json!({
3049                "runID": run.run_id,
3050                "routineID": run.routine_id,
3051                "triggerType": run.trigger_type,
3052                "startedAtMs": now_ms(),
3053            }),
3054        ));
3055
3056        let workspace_root = state.workspace_index.snapshot().await.root;
3057        let mut session = Session::new(
3058            Some(format!("Routine {}", run.routine_id)),
3059            Some(workspace_root.clone()),
3060        );
3061        let session_id = session.id.clone();
3062        session.workspace_root = Some(workspace_root);
3063
3064        if let Err(error) = state.storage.save_session(session).await {
3065            let detail = format!("failed to create routine session: {error}");
3066            let _ = state
3067                .update_routine_run_status(
3068                    &run.run_id,
3069                    RoutineRunStatus::Failed,
3070                    Some(detail.clone()),
3071                )
3072                .await;
3073            state.event_bus.publish(EngineEvent::new(
3074                "routine.run.failed",
3075                serde_json::json!({
3076                    "runID": run.run_id,
3077                    "routineID": run.routine_id,
3078                    "reason": detail,
3079                }),
3080            ));
3081            continue;
3082        }
3083
3084        state
3085            .set_routine_session_policy(
3086                session_id.clone(),
3087                run.run_id.clone(),
3088                run.routine_id.clone(),
3089                run.allowed_tools.clone(),
3090            )
3091            .await;
3092        state
3093            .add_active_session_id(&run.run_id, session_id.clone())
3094            .await;
3095        state
3096            .engine_loop
3097            .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
3098            .await;
3099
3100        let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
3101        if let Some(spec) = selected_model.as_ref() {
3102            state.event_bus.publish(EngineEvent::new(
3103                "routine.run.model_selected",
3104                serde_json::json!({
3105                    "runID": run.run_id,
3106                    "routineID": run.routine_id,
3107                    "providerID": spec.provider_id,
3108                    "modelID": spec.model_id,
3109                    "source": model_source,
3110                }),
3111            ));
3112        }
3113
3114        let request = SendMessageRequest {
3115            parts: vec![MessagePartInput::Text {
3116                text: build_routine_prompt(&state, &run).await,
3117            }],
3118            model: selected_model,
3119            agent: None,
3120            tool_mode: None,
3121            tool_allowlist: None,
3122            context_mode: None,
3123        };
3124
3125        let run_result = state
3126            .engine_loop
3127            .run_prompt_async_with_context(
3128                session_id.clone(),
3129                request,
3130                Some(format!("routine:{}", run.run_id)),
3131            )
3132            .await;
3133
3134        state.clear_routine_session_policy(&session_id).await;
3135        state
3136            .clear_active_session_id(&run.run_id, &session_id)
3137            .await;
3138        state
3139            .engine_loop
3140            .clear_session_allowed_tools(&session_id)
3141            .await;
3142
3143        match run_result {
3144            Ok(()) => {
3145                append_configured_output_artifacts(&state, &run).await;
3146                let _ = state
3147                    .update_routine_run_status(
3148                        &run.run_id,
3149                        RoutineRunStatus::Completed,
3150                        Some("routine run completed".to_string()),
3151                    )
3152                    .await;
3153                state.event_bus.publish(EngineEvent::new(
3154                    "routine.run.completed",
3155                    serde_json::json!({
3156                        "runID": run.run_id,
3157                        "routineID": run.routine_id,
3158                        "sessionID": session_id,
3159                        "finishedAtMs": now_ms(),
3160                    }),
3161                ));
3162            }
3163            Err(error) => {
3164                if let Some(latest) = state.get_routine_run(&run.run_id).await {
3165                    if latest.status == RoutineRunStatus::Paused {
3166                        state.event_bus.publish(EngineEvent::new(
3167                            "routine.run.paused",
3168                            serde_json::json!({
3169                                "runID": run.run_id,
3170                                "routineID": run.routine_id,
3171                                "sessionID": session_id,
3172                                "finishedAtMs": now_ms(),
3173                            }),
3174                        ));
3175                        continue;
3176                    }
3177                }
3178                let detail = truncate_text(&error.to_string(), 500);
3179                let _ = state
3180                    .update_routine_run_status(
3181                        &run.run_id,
3182                        RoutineRunStatus::Failed,
3183                        Some(detail.clone()),
3184                    )
3185                    .await;
3186                state.event_bus.publish(EngineEvent::new(
3187                    "routine.run.failed",
3188                    serde_json::json!({
3189                        "runID": run.run_id,
3190                        "routineID": run.routine_id,
3191                        "sessionID": session_id,
3192                        "reason": detail,
3193                        "finishedAtMs": now_ms(),
3194                    }),
3195                ));
3196            }
3197        }
3198    }
3199}
3200
3201pub async fn run_automation_v2_scheduler(state: AppState) {
3202    loop {
3203        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
3204        let now = now_ms();
3205        let due = state.evaluate_automation_v2_misfires(now).await;
3206        for automation_id in due {
3207            let Some(automation) = state.get_automation_v2(&automation_id).await else {
3208                continue;
3209            };
3210            if let Ok(run) = state
3211                .create_automation_v2_run(&automation, "scheduled")
3212                .await
3213            {
3214                state.event_bus.publish(EngineEvent::new(
3215                    "automation.v2.run.created",
3216                    serde_json::json!({
3217                        "automationID": automation_id,
3218                        "run": run,
3219                        "triggerType": "scheduled",
3220                    }),
3221                ));
3222            }
3223        }
3224    }
3225}
3226
3227async fn execute_automation_v2_node(
3228    state: &AppState,
3229    run_id: &str,
3230    automation: &AutomationV2Spec,
3231    node: &AutomationFlowNode,
3232    agent: &AutomationAgentProfile,
3233) -> anyhow::Result<Value> {
3234    let workspace_root = state.workspace_index.snapshot().await.root;
3235    let mut session = Session::new(
3236        Some(format!(
3237            "Automation {} / {}",
3238            automation.automation_id, node.node_id
3239        )),
3240        Some(workspace_root.clone()),
3241    );
3242    let session_id = session.id.clone();
3243    session.workspace_root = Some(workspace_root);
3244    state.storage.save_session(session).await?;
3245
3246    state.add_automation_v2_session(run_id, &session_id).await;
3247
3248    let mut allowlist = agent.tool_policy.allowlist.clone();
3249    if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
3250        allowlist.extend(mcp_tools.clone());
3251    }
3252    state
3253        .engine_loop
3254        .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
3255        .await;
3256
3257    let model = agent
3258        .model_policy
3259        .as_ref()
3260        .and_then(|policy| policy.get("default_model"))
3261        .and_then(parse_model_spec);
3262    let prompt = format!(
3263        "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}",
3264        automation.automation_id, run_id, node.node_id, agent.display_name, node.objective
3265    );
3266    let req = SendMessageRequest {
3267        parts: vec![MessagePartInput::Text { text: prompt }],
3268        model,
3269        agent: None,
3270        tool_mode: None,
3271        tool_allowlist: None,
3272        context_mode: None,
3273    };
3274    let result = state
3275        .engine_loop
3276        .run_prompt_async_with_context(
3277            session_id.clone(),
3278            req,
3279            Some(format!("automation-v2:{run_id}")),
3280        )
3281        .await;
3282
3283    state
3284        .engine_loop
3285        .clear_session_allowed_tools(&session_id)
3286        .await;
3287    state.clear_automation_v2_session(run_id, &session_id).await;
3288
3289    result.map(|_| {
3290        serde_json::json!({
3291            "sessionID": session_id,
3292            "status": "completed",
3293        })
3294    })
3295}
3296
3297pub async fn run_automation_v2_executor(state: AppState) {
3298    loop {
3299        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3300        let Some(run) = state.claim_next_queued_automation_v2_run().await else {
3301            continue;
3302        };
3303        let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
3304            let _ = state
3305                .update_automation_v2_run(&run.run_id, |row| {
3306                    row.status = AutomationRunStatus::Failed;
3307                    row.detail = Some("automation not found".to_string());
3308                })
3309                .await;
3310            continue;
3311        };
3312        let max_parallel = automation
3313            .execution
3314            .max_parallel_agents
3315            .unwrap_or(1)
3316            .clamp(1, 16) as usize;
3317
3318        loop {
3319            let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
3320                break;
3321            };
3322            if matches!(
3323                latest.status,
3324                AutomationRunStatus::Paused
3325                    | AutomationRunStatus::Pausing
3326                    | AutomationRunStatus::Cancelled
3327                    | AutomationRunStatus::Failed
3328                    | AutomationRunStatus::Completed
3329            ) {
3330                break;
3331            }
3332            if latest.checkpoint.pending_nodes.is_empty() {
3333                let _ = state
3334                    .update_automation_v2_run(&run.run_id, |row| {
3335                        row.status = AutomationRunStatus::Completed;
3336                        row.detail = Some("automation run completed".to_string());
3337                    })
3338                    .await;
3339                break;
3340            }
3341
3342            let completed = latest
3343                .checkpoint
3344                .completed_nodes
3345                .iter()
3346                .cloned()
3347                .collect::<std::collections::HashSet<_>>();
3348            let pending = latest.checkpoint.pending_nodes.clone();
3349            let runnable = pending
3350                .iter()
3351                .filter_map(|node_id| {
3352                    let node = automation
3353                        .flow
3354                        .nodes
3355                        .iter()
3356                        .find(|n| n.node_id == *node_id)?;
3357                    if node.depends_on.iter().all(|dep| completed.contains(dep)) {
3358                        Some(node.clone())
3359                    } else {
3360                        None
3361                    }
3362                })
3363                .take(max_parallel)
3364                .collect::<Vec<_>>();
3365
3366            if runnable.is_empty() {
3367                let _ = state
3368                    .update_automation_v2_run(&run.run_id, |row| {
3369                        row.status = AutomationRunStatus::Failed;
3370                        row.detail = Some("flow deadlock: no runnable nodes".to_string());
3371                    })
3372                    .await;
3373                break;
3374            }
3375
3376            let tasks = runnable
3377                .iter()
3378                .map(|node| {
3379                    let Some(agent) = automation
3380                        .agents
3381                        .iter()
3382                        .find(|a| a.agent_id == node.agent_id)
3383                        .cloned()
3384                    else {
3385                        return futures::future::ready((
3386                            node.node_id.clone(),
3387                            Err(anyhow::anyhow!("agent not found")),
3388                        ))
3389                        .boxed();
3390                    };
3391                    let state = state.clone();
3392                    let run_id = run.run_id.clone();
3393                    let automation = automation.clone();
3394                    let node = node.clone();
3395                    async move {
3396                        let result =
3397                            execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
3398                                .await;
3399                        (node.node_id, result)
3400                    }
3401                    .boxed()
3402                })
3403                .collect::<Vec<_>>();
3404            let outcomes = join_all(tasks).await;
3405
3406            let mut any_failed = false;
3407            for (node_id, result) in outcomes {
3408                match result {
3409                    Ok(output) => {
3410                        let _ = state
3411                            .update_automation_v2_run(&run.run_id, |row| {
3412                                row.checkpoint.pending_nodes.retain(|id| id != &node_id);
3413                                if !row
3414                                    .checkpoint
3415                                    .completed_nodes
3416                                    .iter()
3417                                    .any(|id| id == &node_id)
3418                                {
3419                                    row.checkpoint.completed_nodes.push(node_id.clone());
3420                                }
3421                                row.checkpoint.node_outputs.insert(node_id.clone(), output);
3422                            })
3423                            .await;
3424                    }
3425                    Err(error) => {
3426                        any_failed = true;
3427                        let is_paused = state
3428                            .get_automation_v2_run(&run.run_id)
3429                            .await
3430                            .map(|row| row.status == AutomationRunStatus::Paused)
3431                            .unwrap_or(false);
3432                        if is_paused {
3433                            break;
3434                        }
3435                        let detail = truncate_text(&error.to_string(), 500);
3436                        let _ = state
3437                            .update_automation_v2_run(&run.run_id, |row| {
3438                                row.status = AutomationRunStatus::Failed;
3439                                row.detail = Some(detail.clone());
3440                            })
3441                            .await;
3442                    }
3443                }
3444            }
3445            if any_failed {
3446                break;
3447            }
3448        }
3449    }
3450}
3451
3452async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
3453    let normalized_entrypoint = run.entrypoint.trim();
3454    let known_tool = state
3455        .tools
3456        .list()
3457        .await
3458        .into_iter()
3459        .any(|schema| schema.name == normalized_entrypoint);
3460    if known_tool {
3461        let args = if run.args.is_object() {
3462            run.args.clone()
3463        } else {
3464            serde_json::json!({})
3465        };
3466        return format!("/tool {} {}", normalized_entrypoint, args);
3467    }
3468
3469    if let Some(objective) = routine_objective_from_args(run) {
3470        return build_routine_mission_prompt(run, &objective);
3471    }
3472
3473    format!(
3474        "Execute routine '{}' using entrypoint '{}' with args: {}",
3475        run.routine_id, run.entrypoint, run.args
3476    )
3477}
3478
3479fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
3480    run.args
3481        .get("prompt")
3482        .and_then(|v| v.as_str())
3483        .map(str::trim)
3484        .filter(|v| !v.is_empty())
3485        .map(ToString::to_string)
3486}
3487
3488fn routine_mode_from_args(args: &Value) -> &str {
3489    args.get("mode")
3490        .and_then(|v| v.as_str())
3491        .map(str::trim)
3492        .filter(|v| !v.is_empty())
3493        .unwrap_or("standalone")
3494}
3495
3496fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
3497    args.get("success_criteria")
3498        .and_then(|v| v.as_array())
3499        .map(|rows| {
3500            rows.iter()
3501                .filter_map(|row| row.as_str())
3502                .map(str::trim)
3503                .filter(|row| !row.is_empty())
3504                .map(ToString::to_string)
3505                .collect::<Vec<_>>()
3506        })
3507        .unwrap_or_default()
3508}
3509
3510fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
3511    let mode = routine_mode_from_args(&run.args);
3512    let success_criteria = routine_success_criteria_from_args(&run.args);
3513    let orchestrator_only_tool_calls = run
3514        .args
3515        .get("orchestrator_only_tool_calls")
3516        .and_then(|v| v.as_bool())
3517        .unwrap_or(false);
3518
3519    let mut lines = vec![
3520        format!("Automation ID: {}", run.routine_id),
3521        format!("Run ID: {}", run.run_id),
3522        format!("Mode: {}", mode),
3523        format!("Mission Objective: {}", objective),
3524    ];
3525
3526    if !success_criteria.is_empty() {
3527        lines.push("Success Criteria:".to_string());
3528        for criterion in success_criteria {
3529            lines.push(format!("- {}", criterion));
3530        }
3531    }
3532
3533    if run.allowed_tools.is_empty() {
3534        lines.push("Allowed Tools: all available by current policy".to_string());
3535    } else {
3536        lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
3537    }
3538
3539    if run.output_targets.is_empty() {
3540        lines.push("Output Targets: none configured".to_string());
3541    } else {
3542        lines.push("Output Targets:".to_string());
3543        for target in &run.output_targets {
3544            lines.push(format!("- {}", target));
3545        }
3546    }
3547
3548    if mode.eq_ignore_ascii_case("orchestrated") {
3549        lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
3550        lines
3551            .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
3552        if orchestrator_only_tool_calls {
3553            lines.push(
3554                "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
3555                    .to_string(),
3556            );
3557        }
3558    } else {
3559        lines.push("Execution Pattern: Standalone mission run".to_string());
3560    }
3561
3562    lines.push(
3563        "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
3564            .to_string(),
3565    );
3566
3567    lines.join("\n")
3568}
3569
3570fn truncate_text(input: &str, max_len: usize) -> String {
3571    if input.len() <= max_len {
3572        return input.to_string();
3573    }
3574    let mut out = input[..max_len].to_string();
3575    out.push_str("...<truncated>");
3576    out
3577}
3578
3579async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
3580    if run.output_targets.is_empty() {
3581        return;
3582    }
3583    for target in &run.output_targets {
3584        let artifact = RoutineRunArtifact {
3585            artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
3586            uri: target.clone(),
3587            kind: "output_target".to_string(),
3588            label: Some("configured output target".to_string()),
3589            created_at_ms: now_ms(),
3590            metadata: Some(serde_json::json!({
3591                "source": "routine.output_targets",
3592                "runID": run.run_id,
3593                "routineID": run.routine_id,
3594            })),
3595        };
3596        let _ = state
3597            .append_routine_run_artifact(&run.run_id, artifact.clone())
3598            .await;
3599        state.event_bus.publish(EngineEvent::new(
3600            "routine.run.artifact_added",
3601            serde_json::json!({
3602                "runID": run.run_id,
3603                "routineID": run.routine_id,
3604                "artifact": artifact,
3605            }),
3606        ));
3607    }
3608}
3609
3610fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
3611    let obj = value.as_object()?;
3612    let provider_id = obj.get("provider_id")?.as_str()?.trim();
3613    let model_id = obj.get("model_id")?.as_str()?.trim();
3614    if provider_id.is_empty() || model_id.is_empty() {
3615        return None;
3616    }
3617    Some(ModelSpec {
3618        provider_id: provider_id.to_string(),
3619        model_id: model_id.to_string(),
3620    })
3621}
3622
3623fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
3624    args.get("model_policy")
3625        .and_then(|v| v.get("role_models"))
3626        .and_then(|v| v.get(role))
3627        .and_then(parse_model_spec)
3628}
3629
3630fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
3631    args.get("model_policy")
3632        .and_then(|v| v.get("default_model"))
3633        .and_then(parse_model_spec)
3634}
3635
3636fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
3637    let provider_id = config
3638        .get("default_provider")
3639        .and_then(|v| v.as_str())
3640        .map(str::trim)
3641        .filter(|v| !v.is_empty())?;
3642    let model_id = config
3643        .get("providers")
3644        .and_then(|v| v.get(provider_id))
3645        .and_then(|v| v.get("default_model"))
3646        .and_then(|v| v.as_str())
3647        .map(str::trim)
3648        .filter(|v| !v.is_empty())?;
3649    Some(ModelSpec {
3650        provider_id: provider_id.to_string(),
3651        model_id: model_id.to_string(),
3652    })
3653}
3654
3655fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
3656    providers.iter().any(|provider| {
3657        provider.id == spec.provider_id
3658            && provider
3659                .models
3660                .iter()
3661                .any(|model| model.id == spec.model_id)
3662    })
3663}
3664
3665async fn resolve_routine_model_spec_for_run(
3666    state: &AppState,
3667    run: &RoutineRunRecord,
3668) -> (Option<ModelSpec>, String) {
3669    let providers = state.providers.list().await;
3670    let mode = routine_mode_from_args(&run.args);
3671    let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
3672
3673    if mode.eq_ignore_ascii_case("orchestrated") {
3674        if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
3675            requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
3676        }
3677    }
3678    if let Some(default_model) = default_model_spec_from_args(&run.args) {
3679        requested.push((default_model, "args.model_policy.default_model"));
3680    }
3681    let effective_config = state.config.get_effective_value().await;
3682    if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
3683        requested.push((config_default, "config.default_provider"));
3684    }
3685
3686    for (candidate, source) in requested {
3687        if provider_catalog_has_model(&providers, &candidate) {
3688            return (Some(candidate), source.to_string());
3689        }
3690    }
3691
3692    let fallback = providers
3693        .into_iter()
3694        .find(|provider| !provider.models.is_empty())
3695        .and_then(|provider| {
3696            let model = provider.models.first()?;
3697            Some(ModelSpec {
3698                provider_id: provider.id,
3699                model_id: model.id.clone(),
3700            })
3701        });
3702
3703    (fallback, "provider_catalog_fallback".to_string())
3704}
3705
3706#[cfg(test)]
3707mod tests {
3708    use super::*;
3709
3710    fn test_state_with_path(path: PathBuf) -> AppState {
3711        let mut state = AppState::new_starting("test-attempt".to_string(), true);
3712        state.shared_resources_path = path;
3713        state.routines_path = tmp_routines_file("shared-state");
3714        state.routine_history_path = tmp_routines_file("routine-history");
3715        state.routine_runs_path = tmp_routines_file("routine-runs");
3716        state
3717    }
3718
3719    fn tmp_resource_file(name: &str) -> PathBuf {
3720        std::env::temp_dir().join(format!(
3721            "tandem-server-{name}-{}.json",
3722            uuid::Uuid::new_v4()
3723        ))
3724    }
3725
3726    fn tmp_routines_file(name: &str) -> PathBuf {
3727        std::env::temp_dir().join(format!(
3728            "tandem-server-routines-{name}-{}.json",
3729            uuid::Uuid::new_v4()
3730        ))
3731    }
3732
3733    #[test]
3734    fn default_model_spec_from_effective_config_reads_default_route() {
3735        let cfg = serde_json::json!({
3736            "default_provider": "openrouter",
3737            "providers": {
3738                "openrouter": {
3739                    "default_model": "google/gemini-3-flash-preview"
3740                }
3741            }
3742        });
3743        let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
3744        assert_eq!(spec.provider_id, "openrouter");
3745        assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
3746    }
3747
3748    #[test]
3749    fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
3750        let missing_provider = serde_json::json!({
3751            "providers": {
3752                "openrouter": {
3753                    "default_model": "google/gemini-3-flash-preview"
3754                }
3755            }
3756        });
3757        assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
3758
3759        let missing_model = serde_json::json!({
3760            "default_provider": "openrouter",
3761            "providers": {
3762                "openrouter": {}
3763            }
3764        });
3765        assert!(default_model_spec_from_effective_config(&missing_model).is_none());
3766    }
3767
3768    #[tokio::test]
3769    async fn shared_resource_put_increments_revision() {
3770        let path = tmp_resource_file("shared-resource-put");
3771        let state = test_state_with_path(path.clone());
3772
3773        let first = state
3774            .put_shared_resource(
3775                "project/demo/board".to_string(),
3776                serde_json::json!({"status":"todo"}),
3777                None,
3778                "agent-1".to_string(),
3779                None,
3780            )
3781            .await
3782            .expect("first put");
3783        assert_eq!(first.rev, 1);
3784
3785        let second = state
3786            .put_shared_resource(
3787                "project/demo/board".to_string(),
3788                serde_json::json!({"status":"doing"}),
3789                Some(1),
3790                "agent-2".to_string(),
3791                Some(60_000),
3792            )
3793            .await
3794            .expect("second put");
3795        assert_eq!(second.rev, 2);
3796        assert_eq!(second.updated_by, "agent-2");
3797        assert_eq!(second.ttl_ms, Some(60_000));
3798
3799        let raw = tokio::fs::read_to_string(path.clone())
3800            .await
3801            .expect("persisted");
3802        assert!(raw.contains("\"rev\": 2"));
3803        let _ = tokio::fs::remove_file(path).await;
3804    }
3805
3806    #[tokio::test]
3807    async fn shared_resource_put_detects_revision_conflict() {
3808        let path = tmp_resource_file("shared-resource-conflict");
3809        let state = test_state_with_path(path.clone());
3810
3811        let _ = state
3812            .put_shared_resource(
3813                "mission/demo/card-1".to_string(),
3814                serde_json::json!({"title":"Card 1"}),
3815                None,
3816                "agent-1".to_string(),
3817                None,
3818            )
3819            .await
3820            .expect("seed put");
3821
3822        let conflict = state
3823            .put_shared_resource(
3824                "mission/demo/card-1".to_string(),
3825                serde_json::json!({"title":"Card 1 edited"}),
3826                Some(99),
3827                "agent-2".to_string(),
3828                None,
3829            )
3830            .await
3831            .expect_err("expected conflict");
3832
3833        match conflict {
3834            ResourceStoreError::RevisionConflict(conflict) => {
3835                assert_eq!(conflict.expected_rev, Some(99));
3836                assert_eq!(conflict.current_rev, Some(1));
3837            }
3838            other => panic!("unexpected error: {other:?}"),
3839        }
3840
3841        let _ = tokio::fs::remove_file(path).await;
3842    }
3843
3844    #[tokio::test]
3845    async fn shared_resource_rejects_invalid_namespace_key() {
3846        let path = tmp_resource_file("shared-resource-invalid-key");
3847        let state = test_state_with_path(path.clone());
3848
3849        let error = state
3850            .put_shared_resource(
3851                "global/demo/key".to_string(),
3852                serde_json::json!({"x":1}),
3853                None,
3854                "agent-1".to_string(),
3855                None,
3856            )
3857            .await
3858            .expect_err("invalid key should fail");
3859
3860        match error {
3861            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
3862            other => panic!("unexpected error: {other:?}"),
3863        }
3864
3865        assert!(!path.exists());
3866    }
3867
3868    #[test]
3869    fn derive_status_index_update_for_run_started() {
3870        let event = EngineEvent::new(
3871            "session.run.started",
3872            serde_json::json!({
3873                "sessionID": "s-1",
3874                "runID": "r-1"
3875            }),
3876        );
3877        let update = derive_status_index_update(&event).expect("update");
3878        assert_eq!(update.key, "run/s-1/status");
3879        assert_eq!(
3880            update.value.get("state").and_then(|v| v.as_str()),
3881            Some("running")
3882        );
3883        assert_eq!(
3884            update.value.get("phase").and_then(|v| v.as_str()),
3885            Some("run")
3886        );
3887    }
3888
3889    #[test]
3890    fn derive_status_index_update_for_tool_invocation() {
3891        let event = EngineEvent::new(
3892            "message.part.updated",
3893            serde_json::json!({
3894                "sessionID": "s-2",
3895                "runID": "r-2",
3896                "part": { "type": "tool-invocation", "tool": "todo_write" }
3897            }),
3898        );
3899        let update = derive_status_index_update(&event).expect("update");
3900        assert_eq!(update.key, "run/s-2/status");
3901        assert_eq!(
3902            update.value.get("phase").and_then(|v| v.as_str()),
3903            Some("tool")
3904        );
3905        assert_eq!(
3906            update.value.get("toolActive").and_then(|v| v.as_bool()),
3907            Some(true)
3908        );
3909        assert_eq!(
3910            update.value.get("tool").and_then(|v| v.as_str()),
3911            Some("todo_write")
3912        );
3913    }
3914
3915    #[test]
3916    fn misfire_skip_drops_runs_and_advances_next_fire() {
3917        let (count, next_fire) =
3918            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
3919        assert_eq!(count, 0);
3920        assert_eq!(next_fire, 11_000);
3921    }
3922
3923    #[test]
3924    fn misfire_run_once_emits_single_trigger() {
3925        let (count, next_fire) =
3926            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
3927        assert_eq!(count, 1);
3928        assert_eq!(next_fire, 11_000);
3929    }
3930
3931    #[test]
3932    fn misfire_catch_up_caps_trigger_count() {
3933        let (count, next_fire) = compute_misfire_plan(
3934            25_000,
3935            5_000,
3936            1_000,
3937            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
3938        );
3939        assert_eq!(count, 3);
3940        assert_eq!(next_fire, 26_000);
3941    }
3942
3943    #[tokio::test]
3944    async fn routine_put_persists_and_loads() {
3945        let routines_path = tmp_routines_file("persist-load");
3946        let mut state = AppState::new_starting("routines-put".to_string(), true);
3947        state.routines_path = routines_path.clone();
3948
3949        let routine = RoutineSpec {
3950            routine_id: "routine-1".to_string(),
3951            name: "Digest".to_string(),
3952            status: RoutineStatus::Active,
3953            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
3954            timezone: "UTC".to_string(),
3955            misfire_policy: RoutineMisfirePolicy::RunOnce,
3956            entrypoint: "mission.default".to_string(),
3957            args: serde_json::json!({"topic":"status"}),
3958            allowed_tools: vec![],
3959            output_targets: vec![],
3960            creator_type: "user".to_string(),
3961            creator_id: "user-1".to_string(),
3962            requires_approval: true,
3963            external_integrations_allowed: false,
3964            next_fire_at_ms: Some(5_000),
3965            last_fired_at_ms: None,
3966        };
3967
3968        state.put_routine(routine).await.expect("store routine");
3969
3970        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
3971        reloaded.routines_path = routines_path.clone();
3972        reloaded.load_routines().await.expect("load routines");
3973        let list = reloaded.list_routines().await;
3974        assert_eq!(list.len(), 1);
3975        assert_eq!(list[0].routine_id, "routine-1");
3976
3977        let _ = tokio::fs::remove_file(routines_path).await;
3978    }
3979
3980    #[tokio::test]
3981    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
3982        let routines_path = tmp_routines_file("misfire-eval");
3983        let mut state = AppState::new_starting("routines-eval".to_string(), true);
3984        state.routines_path = routines_path.clone();
3985
3986        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
3987            routine_id: id.to_string(),
3988            name: id.to_string(),
3989            status: RoutineStatus::Active,
3990            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
3991            timezone: "UTC".to_string(),
3992            misfire_policy: policy,
3993            entrypoint: "mission.default".to_string(),
3994            args: serde_json::json!({}),
3995            allowed_tools: vec![],
3996            output_targets: vec![],
3997            creator_type: "user".to_string(),
3998            creator_id: "u-1".to_string(),
3999            requires_approval: false,
4000            external_integrations_allowed: false,
4001            next_fire_at_ms: Some(5_000),
4002            last_fired_at_ms: None,
4003        };
4004
4005        state
4006            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
4007            .await
4008            .expect("put skip");
4009        state
4010            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
4011            .await
4012            .expect("put once");
4013        state
4014            .put_routine(base(
4015                "routine-catch",
4016                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
4017            ))
4018            .await
4019            .expect("put catch");
4020
4021        let plans = state.evaluate_routine_misfires(10_500).await;
4022        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
4023        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
4024        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
4025
4026        assert!(plan_skip.is_none());
4027        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
4028        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
4029
4030        let stored = state.list_routines().await;
4031        let skip_next = stored
4032            .iter()
4033            .find(|r| r.routine_id == "routine-skip")
4034            .and_then(|r| r.next_fire_at_ms)
4035            .expect("skip next");
4036        assert!(skip_next > 10_500);
4037
4038        let _ = tokio::fs::remove_file(routines_path).await;
4039    }
4040
4041    #[test]
4042    fn routine_policy_blocks_external_side_effects_by_default() {
4043        let routine = RoutineSpec {
4044            routine_id: "routine-policy-1".to_string(),
4045            name: "Connector routine".to_string(),
4046            status: RoutineStatus::Active,
4047            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4048            timezone: "UTC".to_string(),
4049            misfire_policy: RoutineMisfirePolicy::RunOnce,
4050            entrypoint: "connector.email.reply".to_string(),
4051            args: serde_json::json!({}),
4052            allowed_tools: vec![],
4053            output_targets: vec![],
4054            creator_type: "user".to_string(),
4055            creator_id: "u-1".to_string(),
4056            requires_approval: true,
4057            external_integrations_allowed: false,
4058            next_fire_at_ms: None,
4059            last_fired_at_ms: None,
4060        };
4061
4062        let decision = evaluate_routine_execution_policy(&routine, "manual");
4063        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
4064    }
4065
4066    #[test]
4067    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
4068        let routine = RoutineSpec {
4069            routine_id: "routine-policy-2".to_string(),
4070            name: "Connector routine".to_string(),
4071            status: RoutineStatus::Active,
4072            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4073            timezone: "UTC".to_string(),
4074            misfire_policy: RoutineMisfirePolicy::RunOnce,
4075            entrypoint: "connector.email.reply".to_string(),
4076            args: serde_json::json!({}),
4077            allowed_tools: vec![],
4078            output_targets: vec![],
4079            creator_type: "user".to_string(),
4080            creator_id: "u-1".to_string(),
4081            requires_approval: true,
4082            external_integrations_allowed: true,
4083            next_fire_at_ms: None,
4084            last_fired_at_ms: None,
4085        };
4086
4087        let decision = evaluate_routine_execution_policy(&routine, "manual");
4088        assert!(matches!(
4089            decision,
4090            RoutineExecutionDecision::RequiresApproval { .. }
4091        ));
4092    }
4093
4094    #[test]
4095    fn routine_policy_allows_non_external_entrypoints() {
4096        let routine = RoutineSpec {
4097            routine_id: "routine-policy-3".to_string(),
4098            name: "Internal mission routine".to_string(),
4099            status: RoutineStatus::Active,
4100            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
4101            timezone: "UTC".to_string(),
4102            misfire_policy: RoutineMisfirePolicy::RunOnce,
4103            entrypoint: "mission.default".to_string(),
4104            args: serde_json::json!({}),
4105            allowed_tools: vec![],
4106            output_targets: vec![],
4107            creator_type: "user".to_string(),
4108            creator_id: "u-1".to_string(),
4109            requires_approval: true,
4110            external_integrations_allowed: false,
4111            next_fire_at_ms: None,
4112            last_fired_at_ms: None,
4113        };
4114
4115        let decision = evaluate_routine_execution_policy(&routine, "manual");
4116        assert_eq!(decision, RoutineExecutionDecision::Allowed);
4117    }
4118
4119    #[tokio::test]
4120    async fn claim_next_queued_routine_run_marks_oldest_running() {
4121        let mut state = AppState::new_starting("routine-claim".to_string(), true);
4122        state.routine_runs_path = tmp_routines_file("routine-claim-runs");
4123
4124        let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
4125            run_id: run_id.to_string(),
4126            routine_id: "routine-claim".to_string(),
4127            trigger_type: "manual".to_string(),
4128            run_count: 1,
4129            status: RoutineRunStatus::Queued,
4130            created_at_ms,
4131            updated_at_ms: created_at_ms,
4132            fired_at_ms: Some(created_at_ms),
4133            started_at_ms: None,
4134            finished_at_ms: None,
4135            requires_approval: false,
4136            approval_reason: None,
4137            denial_reason: None,
4138            paused_reason: None,
4139            detail: None,
4140            entrypoint: "mission.default".to_string(),
4141            args: serde_json::json!({}),
4142            allowed_tools: vec![],
4143            output_targets: vec![],
4144            artifacts: vec![],
4145            active_session_ids: vec![],
4146            prompt_tokens: 0,
4147            completion_tokens: 0,
4148            total_tokens: 0,
4149            estimated_cost_usd: 0.0,
4150        };
4151
4152        {
4153            let mut guard = state.routine_runs.write().await;
4154            guard.insert("run-late".to_string(), mk("run-late", 2_000));
4155            guard.insert("run-early".to_string(), mk("run-early", 1_000));
4156        }
4157        state.persist_routine_runs().await.expect("persist");
4158
4159        let claimed = state
4160            .claim_next_queued_routine_run()
4161            .await
4162            .expect("claimed run");
4163        assert_eq!(claimed.run_id, "run-early");
4164        assert_eq!(claimed.status, RoutineRunStatus::Running);
4165        assert!(claimed.started_at_ms.is_some());
4166    }
4167
4168    #[tokio::test]
4169    async fn routine_session_policy_roundtrip_normalizes_tools() {
4170        let state = AppState::new_starting("routine-policy-hook".to_string(), true);
4171        state
4172            .set_routine_session_policy(
4173                "session-routine-1".to_string(),
4174                "run-1".to_string(),
4175                "routine-1".to_string(),
4176                vec![
4177                    "read".to_string(),
4178                    " mcp.arcade.search ".to_string(),
4179                    "read".to_string(),
4180                    "".to_string(),
4181                ],
4182            )
4183            .await;
4184
4185        let policy = state
4186            .routine_session_policy("session-routine-1")
4187            .await
4188            .expect("policy");
4189        assert_eq!(
4190            policy.allowed_tools,
4191            vec!["read".to_string(), "mcp.arcade.search".to_string()]
4192        );
4193    }
4194
4195    #[test]
4196    fn routine_mission_prompt_includes_orchestrated_contract() {
4197        let run = RoutineRunRecord {
4198            run_id: "run-orchestrated-1".to_string(),
4199            routine_id: "automation-orchestrated".to_string(),
4200            trigger_type: "manual".to_string(),
4201            run_count: 1,
4202            status: RoutineRunStatus::Queued,
4203            created_at_ms: 1_000,
4204            updated_at_ms: 1_000,
4205            fired_at_ms: Some(1_000),
4206            started_at_ms: None,
4207            finished_at_ms: None,
4208            requires_approval: true,
4209            approval_reason: None,
4210            denial_reason: None,
4211            paused_reason: None,
4212            detail: None,
4213            entrypoint: "mission.default".to_string(),
4214            args: serde_json::json!({
4215                "prompt": "Coordinate a multi-step release readiness check.",
4216                "mode": "orchestrated",
4217                "success_criteria": ["All blockers listed", "Output artifact written"],
4218                "orchestrator_only_tool_calls": true
4219            }),
4220            allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
4221            output_targets: vec!["file://reports/release-readiness.md".to_string()],
4222            artifacts: vec![],
4223            active_session_ids: vec![],
4224            prompt_tokens: 0,
4225            completion_tokens: 0,
4226            total_tokens: 0,
4227            estimated_cost_usd: 0.0,
4228        };
4229
4230        let objective = routine_objective_from_args(&run).expect("objective");
4231        let prompt = build_routine_mission_prompt(&run, &objective);
4232
4233        assert!(prompt.contains("Mode: orchestrated"));
4234        assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
4235        assert!(prompt.contains("only the orchestrator may execute tools"));
4236        assert!(prompt.contains("Allowed Tools: read, webfetch"));
4237        assert!(prompt.contains("file://reports/release-readiness.md"));
4238    }
4239
4240    #[test]
4241    fn routine_mission_prompt_includes_standalone_defaults() {
4242        let run = RoutineRunRecord {
4243            run_id: "run-standalone-1".to_string(),
4244            routine_id: "automation-standalone".to_string(),
4245            trigger_type: "manual".to_string(),
4246            run_count: 1,
4247            status: RoutineRunStatus::Queued,
4248            created_at_ms: 2_000,
4249            updated_at_ms: 2_000,
4250            fired_at_ms: Some(2_000),
4251            started_at_ms: None,
4252            finished_at_ms: None,
4253            requires_approval: false,
4254            approval_reason: None,
4255            denial_reason: None,
4256            paused_reason: None,
4257            detail: None,
4258            entrypoint: "mission.default".to_string(),
4259            args: serde_json::json!({
4260                "prompt": "Summarize top engineering updates.",
4261                "success_criteria": ["Three bullet summary"]
4262            }),
4263            allowed_tools: vec![],
4264            output_targets: vec![],
4265            artifacts: vec![],
4266            active_session_ids: vec![],
4267            prompt_tokens: 0,
4268            completion_tokens: 0,
4269            total_tokens: 0,
4270            estimated_cost_usd: 0.0,
4271        };
4272
4273        let objective = routine_objective_from_args(&run).expect("objective");
4274        let prompt = build_routine_mission_prompt(&run, &objective);
4275
4276        assert!(prompt.contains("Mode: standalone"));
4277        assert!(prompt.contains("Execution Pattern: Standalone mission run"));
4278        assert!(prompt.contains("Allowed Tools: all available by current policy"));
4279        assert!(prompt.contains("Output Targets: none configured"));
4280    }
4281
4282    #[test]
4283    fn shared_resource_key_validator_accepts_swarm_active_tasks() {
4284        assert!(is_valid_resource_key("swarm.active_tasks"));
4285        assert!(is_valid_resource_key("project/demo"));
4286        assert!(!is_valid_resource_key("swarm//active_tasks"));
4287        assert!(!is_valid_resource_key("misc/demo"));
4288    }
4289}