Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22    EngineEvent, HostOs, HostRuntimeContext, MessagePart, MessagePartInput, MessageRole, ModelSpec,
23    PathStyle, SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30    resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31    PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38use tandem_workflows::{
39    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
40    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
41    WorkflowRunStatus, WorkflowSourceKind, WorkflowSourceRef, WorkflowSpec,
42    WorkflowValidationMessage,
43};
44
45mod agent_teams;
46mod browser;
47mod bug_monitor_github;
48mod capability_resolver;
49mod http;
50mod mcp_catalog;
51mod pack_builder;
52mod pack_manager;
53mod preset_composer;
54mod preset_registry;
55mod preset_summary;
56pub mod webui;
57mod workflows;
58
59pub use agent_teams::AgentTeamRuntime;
60pub use browser::{
61    install_browser_sidecar, BrowserHealthSummary, BrowserSidecarInstallResult,
62    BrowserSmokeTestResult, BrowserSubsystem,
63};
64pub use capability_resolver::CapabilityResolver;
65pub use http::serve;
66pub use pack_manager::PackManager;
67pub use preset_composer::PromptComposeInput;
68pub use preset_registry::PresetRegistry;
69pub use workflows::{
70    canonical_workflow_event_names, dispatch_workflow_event, execute_hook_binding,
71    execute_workflow, parse_workflow_action, run_workflow_dispatcher, simulate_workflow_event,
72};
73
74pub(crate) fn normalize_absolute_workspace_root(raw: &str) -> Result<String, String> {
75    let trimmed = raw.trim();
76    if trimmed.is_empty() {
77        return Err("workspace_root is required".to_string());
78    }
79    let as_path = PathBuf::from(trimmed);
80    if !as_path.is_absolute() {
81        return Err("workspace_root must be an absolute path".to_string());
82    }
83    tandem_core::normalize_workspace_path(trimmed)
84        .ok_or_else(|| "workspace_root is invalid".to_string())
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, Default)]
88pub struct ChannelStatus {
89    pub enabled: bool,
90    pub connected: bool,
91    pub last_error: Option<String>,
92    pub active_sessions: u64,
93    pub meta: Value,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
97pub struct WebUiConfig {
98    #[serde(default)]
99    pub enabled: bool,
100    #[serde(default = "default_web_ui_prefix")]
101    pub path_prefix: String,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, Default)]
105pub struct ChannelsConfigFile {
106    pub telegram: Option<TelegramConfigFile>,
107    pub discord: Option<DiscordConfigFile>,
108    pub slack: Option<SlackConfigFile>,
109    #[serde(default)]
110    pub tool_policy: tandem_channels::config::ChannelToolPolicy,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct TelegramConfigFile {
115    pub bot_token: String,
116    #[serde(default = "default_allow_all")]
117    pub allowed_users: Vec<String>,
118    #[serde(default)]
119    pub mention_only: bool,
120    #[serde(default)]
121    pub style_profile: tandem_channels::config::TelegramStyleProfile,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DiscordConfigFile {
126    pub bot_token: String,
127    #[serde(default)]
128    pub guild_id: Option<String>,
129    #[serde(default = "default_allow_all")]
130    pub allowed_users: Vec<String>,
131    #[serde(default = "default_discord_mention_only")]
132    pub mention_only: bool,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct SlackConfigFile {
137    pub bot_token: String,
138    pub channel_id: String,
139    #[serde(default = "default_allow_all")]
140    pub allowed_users: Vec<String>,
141    #[serde(default)]
142    pub mention_only: bool,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, Default)]
146struct EffectiveAppConfig {
147    #[serde(default)]
148    pub channels: ChannelsConfigFile,
149    #[serde(default)]
150    pub web_ui: WebUiConfig,
151    #[serde(default)]
152    pub browser: tandem_core::BrowserConfig,
153    #[serde(default)]
154    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
155}
156
157#[derive(Default)]
158pub struct ChannelRuntime {
159    pub listeners: Option<tokio::task::JoinSet<()>>,
160    pub statuses: std::collections::HashMap<String, ChannelStatus>,
161}
162
163#[derive(Debug, Clone)]
164pub struct EngineLease {
165    pub lease_id: String,
166    pub client_id: String,
167    pub client_type: String,
168    pub acquired_at_ms: u64,
169    pub last_renewed_at_ms: u64,
170    pub ttl_ms: u64,
171}
172
173impl EngineLease {
174    pub fn is_expired(&self, now_ms: u64) -> bool {
175        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
176    }
177}
178
179#[derive(Debug, Clone, Serialize)]
180pub struct ActiveRun {
181    #[serde(rename = "runID")]
182    pub run_id: String,
183    #[serde(rename = "startedAtMs")]
184    pub started_at_ms: u64,
185    #[serde(rename = "lastActivityAtMs")]
186    pub last_activity_at_ms: u64,
187    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
188    pub client_id: Option<String>,
189    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
190    pub agent_id: Option<String>,
191    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
192    pub agent_profile: Option<String>,
193}
194
195#[derive(Clone, Default)]
196pub struct RunRegistry {
197    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
198}
199
200impl RunRegistry {
201    pub fn new() -> Self {
202        Self::default()
203    }
204
205    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
206        self.active.read().await.get(session_id).cloned()
207    }
208
209    pub async fn acquire(
210        &self,
211        session_id: &str,
212        run_id: String,
213        client_id: Option<String>,
214        agent_id: Option<String>,
215        agent_profile: Option<String>,
216    ) -> std::result::Result<ActiveRun, ActiveRun> {
217        let mut guard = self.active.write().await;
218        if let Some(existing) = guard.get(session_id).cloned() {
219            return Err(existing);
220        }
221        let now = now_ms();
222        let run = ActiveRun {
223            run_id,
224            started_at_ms: now,
225            last_activity_at_ms: now,
226            client_id,
227            agent_id,
228            agent_profile,
229        };
230        guard.insert(session_id.to_string(), run.clone());
231        Ok(run)
232    }
233
234    pub async fn touch(&self, session_id: &str, run_id: &str) {
235        let mut guard = self.active.write().await;
236        if let Some(run) = guard.get_mut(session_id) {
237            if run.run_id == run_id {
238                run.last_activity_at_ms = now_ms();
239            }
240        }
241    }
242
243    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
244        let mut guard = self.active.write().await;
245        if let Some(run) = guard.get(session_id) {
246            if run.run_id == run_id {
247                return guard.remove(session_id);
248            }
249        }
250        None
251    }
252
253    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
254        self.active.write().await.remove(session_id)
255    }
256
257    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
258        let now = now_ms();
259        let mut guard = self.active.write().await;
260        let stale_ids = guard
261            .iter()
262            .filter_map(|(session_id, run)| {
263                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
264                    Some(session_id.clone())
265                } else {
266                    None
267                }
268            })
269            .collect::<Vec<_>>();
270        let mut out = Vec::with_capacity(stale_ids.len());
271        for session_id in stale_ids {
272            if let Some(run) = guard.remove(&session_id) {
273                out.push((session_id, run));
274            }
275        }
276        out
277    }
278}
279
280pub fn now_ms() -> u64 {
281    SystemTime::now()
282        .duration_since(UNIX_EPOCH)
283        .map(|d| d.as_millis() as u64)
284        .unwrap_or(0)
285}
286
287pub fn build_id() -> String {
288    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
289        let trimmed = explicit.trim();
290        if !trimmed.is_empty() {
291            return trimmed.to_string();
292        }
293    }
294    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
295        let trimmed = git_sha.trim();
296        if !trimmed.is_empty() {
297            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
298        }
299    }
300    env!("CARGO_PKG_VERSION").to_string()
301}
302
303pub fn detect_host_runtime_context() -> HostRuntimeContext {
304    let os = if cfg!(target_os = "windows") {
305        HostOs::Windows
306    } else if cfg!(target_os = "macos") {
307        HostOs::Macos
308    } else {
309        HostOs::Linux
310    };
311    let (shell_family, path_style) = match os {
312        HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
313        HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
314    };
315    HostRuntimeContext {
316        os,
317        arch: std::env::consts::ARCH.to_string(),
318        shell_family,
319        path_style,
320    }
321}
322
323pub fn binary_path_for_health() -> Option<String> {
324    #[cfg(debug_assertions)]
325    {
326        std::env::current_exe()
327            .ok()
328            .map(|p| p.to_string_lossy().to_string())
329    }
330    #[cfg(not(debug_assertions))]
331    {
332        None
333    }
334}
335
336#[derive(Clone)]
337pub struct RuntimeState {
338    pub storage: Arc<Storage>,
339    pub config: ConfigStore,
340    pub event_bus: EventBus,
341    pub providers: ProviderRegistry,
342    pub plugins: PluginRegistry,
343    pub agents: AgentRegistry,
344    pub tools: ToolRegistry,
345    pub permissions: PermissionManager,
346    pub mcp: McpRegistry,
347    pub pty: PtyManager,
348    pub lsp: LspManager,
349    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
350    pub logs: Arc<RwLock<Vec<Value>>>,
351    pub workspace_index: WorkspaceIndex,
352    pub cancellations: CancellationRegistry,
353    pub engine_loop: EngineLoop,
354    pub host_runtime_context: HostRuntimeContext,
355    pub browser: BrowserSubsystem,
356}
357
358#[derive(Debug, Clone)]
359pub struct GovernedMemoryRecord {
360    pub id: String,
361    pub run_id: String,
362    pub partition: MemoryPartition,
363    pub kind: MemoryContentKind,
364    pub content: String,
365    pub artifact_refs: Vec<String>,
366    pub classification: MemoryClassification,
367    pub metadata: Option<Value>,
368    pub source_memory_id: Option<String>,
369    pub created_at_ms: u64,
370}
371
372#[derive(Debug, Clone, Serialize)]
373pub struct MemoryAuditEvent {
374    pub audit_id: String,
375    pub action: String,
376    pub run_id: String,
377    pub memory_id: Option<String>,
378    pub source_memory_id: Option<String>,
379    pub to_tier: Option<GovernedMemoryTier>,
380    pub partition_key: String,
381    pub actor: String,
382    pub status: String,
383    #[serde(skip_serializing_if = "Option::is_none")]
384    pub detail: Option<String>,
385    pub created_at_ms: u64,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct SharedResourceRecord {
390    pub key: String,
391    pub value: Value,
392    pub rev: u64,
393    pub updated_at_ms: u64,
394    pub updated_by: String,
395    #[serde(skip_serializing_if = "Option::is_none")]
396    pub ttl_ms: Option<u64>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineSchedule {
402    IntervalSeconds { seconds: u64 },
403    Cron { expression: String },
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
407#[serde(rename_all = "snake_case", tag = "type")]
408pub enum RoutineMisfirePolicy {
409    Skip,
410    RunOnce,
411    CatchUp { max_runs: u32 },
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
415#[serde(rename_all = "snake_case")]
416pub enum RoutineStatus {
417    Active,
418    Paused,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct RoutineSpec {
423    pub routine_id: String,
424    pub name: String,
425    pub status: RoutineStatus,
426    pub schedule: RoutineSchedule,
427    pub timezone: String,
428    pub misfire_policy: RoutineMisfirePolicy,
429    pub entrypoint: String,
430    #[serde(default)]
431    pub args: Value,
432    #[serde(default)]
433    pub allowed_tools: Vec<String>,
434    #[serde(default)]
435    pub output_targets: Vec<String>,
436    pub creator_type: String,
437    pub creator_id: String,
438    pub requires_approval: bool,
439    pub external_integrations_allowed: bool,
440    #[serde(default, skip_serializing_if = "Option::is_none")]
441    pub next_fire_at_ms: Option<u64>,
442    #[serde(default, skip_serializing_if = "Option::is_none")]
443    pub last_fired_at_ms: Option<u64>,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
447pub struct RoutineHistoryEvent {
448    pub routine_id: String,
449    pub trigger_type: String,
450    pub run_count: u32,
451    pub fired_at_ms: u64,
452    pub status: String,
453    #[serde(default, skip_serializing_if = "Option::is_none")]
454    pub detail: Option<String>,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(rename_all = "snake_case")]
459pub enum RoutineRunStatus {
460    Queued,
461    PendingApproval,
462    Running,
463    Paused,
464    BlockedPolicy,
465    Denied,
466    Completed,
467    Failed,
468    Cancelled,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub struct RoutineRunArtifact {
473    pub artifact_id: String,
474    pub uri: String,
475    pub kind: String,
476    #[serde(default, skip_serializing_if = "Option::is_none")]
477    pub label: Option<String>,
478    pub created_at_ms: u64,
479    #[serde(default, skip_serializing_if = "Option::is_none")]
480    pub metadata: Option<Value>,
481}
482
483#[derive(Debug, Clone, Serialize, Deserialize)]
484pub struct RoutineRunRecord {
485    pub run_id: String,
486    pub routine_id: String,
487    pub trigger_type: String,
488    pub run_count: u32,
489    pub status: RoutineRunStatus,
490    pub created_at_ms: u64,
491    pub updated_at_ms: u64,
492    #[serde(default, skip_serializing_if = "Option::is_none")]
493    pub fired_at_ms: Option<u64>,
494    #[serde(default, skip_serializing_if = "Option::is_none")]
495    pub started_at_ms: Option<u64>,
496    #[serde(default, skip_serializing_if = "Option::is_none")]
497    pub finished_at_ms: Option<u64>,
498    pub requires_approval: bool,
499    #[serde(default, skip_serializing_if = "Option::is_none")]
500    pub approval_reason: Option<String>,
501    #[serde(default, skip_serializing_if = "Option::is_none")]
502    pub denial_reason: Option<String>,
503    #[serde(default, skip_serializing_if = "Option::is_none")]
504    pub paused_reason: Option<String>,
505    #[serde(default, skip_serializing_if = "Option::is_none")]
506    pub detail: Option<String>,
507    pub entrypoint: String,
508    #[serde(default)]
509    pub args: Value,
510    #[serde(default)]
511    pub allowed_tools: Vec<String>,
512    #[serde(default)]
513    pub output_targets: Vec<String>,
514    #[serde(default)]
515    pub artifacts: Vec<RoutineRunArtifact>,
516    #[serde(default)]
517    pub active_session_ids: Vec<String>,
518    #[serde(default, skip_serializing_if = "Option::is_none")]
519    pub latest_session_id: Option<String>,
520    #[serde(default)]
521    pub prompt_tokens: u64,
522    #[serde(default)]
523    pub completion_tokens: u64,
524    #[serde(default)]
525    pub total_tokens: u64,
526    #[serde(default)]
527    pub estimated_cost_usd: f64,
528}
529
530#[derive(Debug, Clone)]
531pub struct RoutineSessionPolicy {
532    pub session_id: String,
533    pub run_id: String,
534    pub routine_id: String,
535    pub allowed_tools: Vec<String>,
536}
537
538#[derive(Debug, Clone, Serialize)]
539pub struct RoutineTriggerPlan {
540    pub routine_id: String,
541    pub run_count: u32,
542    pub scheduled_at_ms: u64,
543    pub next_fire_at_ms: u64,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
547#[serde(rename_all = "snake_case")]
548pub enum AutomationV2Status {
549    Active,
550    Paused,
551    Draft,
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
555#[serde(rename_all = "snake_case")]
556pub enum AutomationV2ScheduleType {
557    Cron,
558    Interval,
559    Manual,
560}
561
562#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
563pub struct AutomationV2Schedule {
564    #[serde(rename = "type")]
565    pub schedule_type: AutomationV2ScheduleType,
566    #[serde(default, skip_serializing_if = "Option::is_none")]
567    pub cron_expression: Option<String>,
568    #[serde(default, skip_serializing_if = "Option::is_none")]
569    pub interval_seconds: Option<u64>,
570    pub timezone: String,
571    pub misfire_policy: RoutineMisfirePolicy,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub struct AutomationAgentToolPolicy {
576    #[serde(default)]
577    pub allowlist: Vec<String>,
578    #[serde(default)]
579    pub denylist: Vec<String>,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct AutomationAgentMcpPolicy {
584    #[serde(default)]
585    pub allowed_servers: Vec<String>,
586    #[serde(default, skip_serializing_if = "Option::is_none")]
587    pub allowed_tools: Option<Vec<String>>,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize)]
591pub struct AutomationAgentProfile {
592    pub agent_id: String,
593    #[serde(default, skip_serializing_if = "Option::is_none")]
594    pub template_id: Option<String>,
595    pub display_name: String,
596    #[serde(default, skip_serializing_if = "Option::is_none")]
597    pub avatar_url: Option<String>,
598    #[serde(default, skip_serializing_if = "Option::is_none")]
599    pub model_policy: Option<Value>,
600    #[serde(default)]
601    pub skills: Vec<String>,
602    pub tool_policy: AutomationAgentToolPolicy,
603    pub mcp_policy: AutomationAgentMcpPolicy,
604    #[serde(default, skip_serializing_if = "Option::is_none")]
605    pub approval_policy: Option<String>,
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct AutomationFlowNode {
610    pub node_id: String,
611    pub agent_id: String,
612    pub objective: String,
613    #[serde(default)]
614    pub depends_on: Vec<String>,
615    #[serde(default)]
616    pub input_refs: Vec<AutomationFlowInputRef>,
617    #[serde(default, skip_serializing_if = "Option::is_none")]
618    pub output_contract: Option<AutomationFlowOutputContract>,
619    #[serde(default, skip_serializing_if = "Option::is_none")]
620    pub retry_policy: Option<Value>,
621    #[serde(default, skip_serializing_if = "Option::is_none")]
622    pub timeout_ms: Option<u64>,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct AutomationFlowInputRef {
627    pub from_step_id: String,
628    pub alias: String,
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct AutomationFlowOutputContract {
633    pub kind: String,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct AutomationFlowSpec {
638    #[serde(default)]
639    pub nodes: Vec<AutomationFlowNode>,
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize)]
643pub struct AutomationExecutionPolicy {
644    #[serde(default, skip_serializing_if = "Option::is_none")]
645    pub max_parallel_agents: Option<u32>,
646    #[serde(default, skip_serializing_if = "Option::is_none")]
647    pub max_total_runtime_ms: Option<u64>,
648    #[serde(default, skip_serializing_if = "Option::is_none")]
649    pub max_total_tool_calls: Option<u32>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct AutomationV2Spec {
654    pub automation_id: String,
655    pub name: String,
656    #[serde(default, skip_serializing_if = "Option::is_none")]
657    pub description: Option<String>,
658    pub status: AutomationV2Status,
659    pub schedule: AutomationV2Schedule,
660    #[serde(default)]
661    pub agents: Vec<AutomationAgentProfile>,
662    pub flow: AutomationFlowSpec,
663    pub execution: AutomationExecutionPolicy,
664    #[serde(default)]
665    pub output_targets: Vec<String>,
666    pub created_at_ms: u64,
667    pub updated_at_ms: u64,
668    pub creator_id: String,
669    #[serde(default, skip_serializing_if = "Option::is_none")]
670    pub workspace_root: Option<String>,
671    #[serde(default, skip_serializing_if = "Option::is_none")]
672    pub metadata: Option<Value>,
673    #[serde(default, skip_serializing_if = "Option::is_none")]
674    pub next_fire_at_ms: Option<u64>,
675    #[serde(default, skip_serializing_if = "Option::is_none")]
676    pub last_fired_at_ms: Option<u64>,
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct WorkflowPlanStep {
681    pub step_id: String,
682    pub kind: String,
683    pub objective: String,
684    #[serde(default)]
685    pub depends_on: Vec<String>,
686    pub agent_role: String,
687    #[serde(default)]
688    pub input_refs: Vec<AutomationFlowInputRef>,
689    #[serde(default, skip_serializing_if = "Option::is_none")]
690    pub output_contract: Option<AutomationFlowOutputContract>,
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct WorkflowPlan {
695    pub plan_id: String,
696    pub planner_version: String,
697    pub plan_source: String,
698    pub original_prompt: String,
699    pub normalized_prompt: String,
700    pub confidence: String,
701    pub title: String,
702    #[serde(default, skip_serializing_if = "Option::is_none")]
703    pub description: Option<String>,
704    pub schedule: AutomationV2Schedule,
705    pub execution_target: String,
706    pub workspace_root: String,
707    #[serde(default)]
708    pub steps: Vec<WorkflowPlanStep>,
709    #[serde(default)]
710    pub requires_integrations: Vec<String>,
711    #[serde(default)]
712    pub allowed_mcp_servers: Vec<String>,
713    #[serde(default, skip_serializing_if = "Option::is_none")]
714    pub operator_preferences: Option<Value>,
715    pub save_options: Value,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct WorkflowPlanChatMessage {
720    pub role: String,
721    pub text: String,
722    pub created_at_ms: u64,
723}
724
725#[derive(Debug, Clone, Serialize, Deserialize)]
726pub struct WorkflowPlanConversation {
727    pub conversation_id: String,
728    pub plan_id: String,
729    pub created_at_ms: u64,
730    pub updated_at_ms: u64,
731    #[serde(default)]
732    pub messages: Vec<WorkflowPlanChatMessage>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736pub struct WorkflowPlanDraftRecord {
737    pub initial_plan: WorkflowPlan,
738    pub current_plan: WorkflowPlan,
739    pub conversation: WorkflowPlanConversation,
740    #[serde(default, skip_serializing_if = "Option::is_none")]
741    pub planner_diagnostics: Option<Value>,
742}
743
744#[derive(Debug, Clone, Serialize, Deserialize)]
745pub struct AutomationNodeOutput {
746    pub contract_kind: String,
747    pub summary: String,
748    pub content: Value,
749    pub created_at_ms: u64,
750    pub node_id: String,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
754#[serde(rename_all = "snake_case")]
755pub enum AutomationRunStatus {
756    Queued,
757    Running,
758    Pausing,
759    Paused,
760    Completed,
761    Failed,
762    Cancelled,
763}
764
765#[derive(Debug, Clone, Serialize, Deserialize)]
766pub struct AutomationRunCheckpoint {
767    #[serde(default)]
768    pub completed_nodes: Vec<String>,
769    #[serde(default)]
770    pub pending_nodes: Vec<String>,
771    #[serde(default)]
772    pub node_outputs: std::collections::HashMap<String, Value>,
773    #[serde(default)]
774    pub node_attempts: std::collections::HashMap<String, u32>,
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
778pub struct AutomationV2RunRecord {
779    pub run_id: String,
780    pub automation_id: String,
781    pub trigger_type: String,
782    pub status: AutomationRunStatus,
783    pub created_at_ms: u64,
784    pub updated_at_ms: u64,
785    #[serde(default, skip_serializing_if = "Option::is_none")]
786    pub started_at_ms: Option<u64>,
787    #[serde(default, skip_serializing_if = "Option::is_none")]
788    pub finished_at_ms: Option<u64>,
789    #[serde(default)]
790    pub active_session_ids: Vec<String>,
791    #[serde(default)]
792    pub active_instance_ids: Vec<String>,
793    pub checkpoint: AutomationRunCheckpoint,
794    #[serde(default, skip_serializing_if = "Option::is_none")]
795    pub automation_snapshot: Option<AutomationV2Spec>,
796    #[serde(default, skip_serializing_if = "Option::is_none")]
797    pub pause_reason: Option<String>,
798    #[serde(default, skip_serializing_if = "Option::is_none")]
799    pub resume_reason: Option<String>,
800    #[serde(default, skip_serializing_if = "Option::is_none")]
801    pub detail: Option<String>,
802    #[serde(default)]
803    pub prompt_tokens: u64,
804    #[serde(default)]
805    pub completion_tokens: u64,
806    #[serde(default)]
807    pub total_tokens: u64,
808    #[serde(default)]
809    pub estimated_cost_usd: f64,
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
813#[serde(rename_all = "snake_case")]
814pub enum BugMonitorProviderPreference {
815    Auto,
816    OfficialGithub,
817    Composio,
818    Arcade,
819}
820
821#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
822#[serde(rename_all = "snake_case")]
823pub enum BugMonitorLabelMode {
824    ReporterOnly,
825}
826
827impl Default for BugMonitorLabelMode {
828    fn default() -> Self {
829        Self::ReporterOnly
830    }
831}
832
833impl Default for BugMonitorProviderPreference {
834    fn default() -> Self {
835        Self::Auto
836    }
837}
838
839#[derive(Debug, Clone, Serialize, Deserialize)]
840pub struct BugMonitorConfig {
841    #[serde(default)]
842    pub enabled: bool,
843    #[serde(default)]
844    pub paused: bool,
845    #[serde(default, skip_serializing_if = "Option::is_none")]
846    pub workspace_root: Option<String>,
847    #[serde(default, skip_serializing_if = "Option::is_none")]
848    pub repo: Option<String>,
849    #[serde(default, skip_serializing_if = "Option::is_none")]
850    pub mcp_server: Option<String>,
851    #[serde(default)]
852    pub provider_preference: BugMonitorProviderPreference,
853    #[serde(default, skip_serializing_if = "Option::is_none")]
854    pub model_policy: Option<Value>,
855    #[serde(default = "default_true")]
856    pub auto_create_new_issues: bool,
857    #[serde(default)]
858    pub require_approval_for_new_issues: bool,
859    #[serde(default = "default_true")]
860    pub auto_comment_on_matched_open_issues: bool,
861    #[serde(default)]
862    pub label_mode: BugMonitorLabelMode,
863    #[serde(default)]
864    pub updated_at_ms: u64,
865}
866
867impl Default for BugMonitorConfig {
868    fn default() -> Self {
869        Self {
870            enabled: false,
871            paused: false,
872            workspace_root: None,
873            repo: None,
874            mcp_server: None,
875            provider_preference: BugMonitorProviderPreference::Auto,
876            model_policy: None,
877            auto_create_new_issues: true,
878            require_approval_for_new_issues: false,
879            auto_comment_on_matched_open_issues: true,
880            label_mode: BugMonitorLabelMode::ReporterOnly,
881            updated_at_ms: 0,
882        }
883    }
884}
885
886#[derive(Debug, Clone, Serialize, Deserialize, Default)]
887pub struct BugMonitorDraftRecord {
888    pub draft_id: String,
889    pub fingerprint: String,
890    pub repo: String,
891    pub status: String,
892    pub created_at_ms: u64,
893    #[serde(default, skip_serializing_if = "Option::is_none")]
894    pub triage_run_id: Option<String>,
895    #[serde(default, skip_serializing_if = "Option::is_none")]
896    pub issue_number: Option<u64>,
897    #[serde(default, skip_serializing_if = "Option::is_none")]
898    pub title: Option<String>,
899    #[serde(default, skip_serializing_if = "Option::is_none")]
900    pub detail: Option<String>,
901    #[serde(default, skip_serializing_if = "Option::is_none")]
902    pub github_status: Option<String>,
903    #[serde(default, skip_serializing_if = "Option::is_none")]
904    pub github_issue_url: Option<String>,
905    #[serde(default, skip_serializing_if = "Option::is_none")]
906    pub github_comment_url: Option<String>,
907    #[serde(default, skip_serializing_if = "Option::is_none")]
908    pub github_posted_at_ms: Option<u64>,
909    #[serde(default, skip_serializing_if = "Option::is_none")]
910    pub matched_issue_number: Option<u64>,
911    #[serde(default, skip_serializing_if = "Option::is_none")]
912    pub matched_issue_state: Option<String>,
913    #[serde(default, skip_serializing_if = "Option::is_none")]
914    pub evidence_digest: Option<String>,
915    #[serde(default, skip_serializing_if = "Option::is_none")]
916    pub last_post_error: Option<String>,
917}
918
919#[derive(Debug, Clone, Serialize, Deserialize, Default)]
920pub struct BugMonitorPostRecord {
921    pub post_id: String,
922    pub draft_id: String,
923    #[serde(default, skip_serializing_if = "Option::is_none")]
924    pub incident_id: Option<String>,
925    pub fingerprint: String,
926    pub repo: String,
927    pub operation: String,
928    pub status: String,
929    #[serde(default, skip_serializing_if = "Option::is_none")]
930    pub issue_number: Option<u64>,
931    #[serde(default, skip_serializing_if = "Option::is_none")]
932    pub issue_url: Option<String>,
933    #[serde(default, skip_serializing_if = "Option::is_none")]
934    pub comment_id: Option<String>,
935    #[serde(default, skip_serializing_if = "Option::is_none")]
936    pub comment_url: Option<String>,
937    #[serde(default, skip_serializing_if = "Option::is_none")]
938    pub evidence_digest: Option<String>,
939    pub idempotency_key: String,
940    #[serde(default, skip_serializing_if = "Option::is_none")]
941    pub response_excerpt: Option<String>,
942    #[serde(default, skip_serializing_if = "Option::is_none")]
943    pub error: Option<String>,
944    pub created_at_ms: u64,
945    pub updated_at_ms: u64,
946}
947
948#[derive(Debug, Clone, Serialize, Deserialize, Default)]
949pub struct BugMonitorIncidentRecord {
950    pub incident_id: String,
951    pub fingerprint: String,
952    pub event_type: String,
953    pub status: String,
954    pub repo: String,
955    pub workspace_root: String,
956    pub title: String,
957    #[serde(default, skip_serializing_if = "Option::is_none")]
958    pub detail: Option<String>,
959    #[serde(default)]
960    pub excerpt: Vec<String>,
961    #[serde(default, skip_serializing_if = "Option::is_none")]
962    pub source: Option<String>,
963    #[serde(default, skip_serializing_if = "Option::is_none")]
964    pub run_id: Option<String>,
965    #[serde(default, skip_serializing_if = "Option::is_none")]
966    pub session_id: Option<String>,
967    #[serde(default, skip_serializing_if = "Option::is_none")]
968    pub correlation_id: Option<String>,
969    #[serde(default, skip_serializing_if = "Option::is_none")]
970    pub component: Option<String>,
971    #[serde(default, skip_serializing_if = "Option::is_none")]
972    pub level: Option<String>,
973    #[serde(default)]
974    pub occurrence_count: u64,
975    pub created_at_ms: u64,
976    pub updated_at_ms: u64,
977    #[serde(default, skip_serializing_if = "Option::is_none")]
978    pub last_seen_at_ms: Option<u64>,
979    #[serde(default, skip_serializing_if = "Option::is_none")]
980    pub draft_id: Option<String>,
981    #[serde(default, skip_serializing_if = "Option::is_none")]
982    pub triage_run_id: Option<String>,
983    #[serde(default, skip_serializing_if = "Option::is_none")]
984    pub last_error: Option<String>,
985    #[serde(default, skip_serializing_if = "Option::is_none")]
986    pub duplicate_summary: Option<Value>,
987    #[serde(default, skip_serializing_if = "Option::is_none")]
988    pub duplicate_matches: Option<Vec<Value>>,
989    #[serde(default, skip_serializing_if = "Option::is_none")]
990    pub event_payload: Option<Value>,
991}
992
993#[derive(Debug, Clone, Serialize, Deserialize, Default)]
994pub struct BugMonitorRuntimeStatus {
995    #[serde(default)]
996    pub monitoring_active: bool,
997    #[serde(default)]
998    pub paused: bool,
999    #[serde(default)]
1000    pub pending_incidents: usize,
1001    #[serde(default)]
1002    pub total_incidents: usize,
1003    #[serde(default, skip_serializing_if = "Option::is_none")]
1004    pub last_processed_at_ms: Option<u64>,
1005    #[serde(default, skip_serializing_if = "Option::is_none")]
1006    pub last_incident_event_type: Option<String>,
1007    #[serde(default, skip_serializing_if = "Option::is_none")]
1008    pub last_runtime_error: Option<String>,
1009    #[serde(default, skip_serializing_if = "Option::is_none")]
1010    pub last_post_result: Option<String>,
1011    #[serde(default)]
1012    pub pending_posts: usize,
1013}
1014
1015#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1016pub struct BugMonitorSubmission {
1017    #[serde(default, skip_serializing_if = "Option::is_none")]
1018    pub repo: Option<String>,
1019    #[serde(default, skip_serializing_if = "Option::is_none")]
1020    pub title: Option<String>,
1021    #[serde(default, skip_serializing_if = "Option::is_none")]
1022    pub detail: Option<String>,
1023    #[serde(default, skip_serializing_if = "Option::is_none")]
1024    pub source: Option<String>,
1025    #[serde(default, skip_serializing_if = "Option::is_none")]
1026    pub run_id: Option<String>,
1027    #[serde(default, skip_serializing_if = "Option::is_none")]
1028    pub session_id: Option<String>,
1029    #[serde(default, skip_serializing_if = "Option::is_none")]
1030    pub correlation_id: Option<String>,
1031    #[serde(default, skip_serializing_if = "Option::is_none")]
1032    pub file_name: Option<String>,
1033    #[serde(default, skip_serializing_if = "Option::is_none")]
1034    pub process: Option<String>,
1035    #[serde(default, skip_serializing_if = "Option::is_none")]
1036    pub component: Option<String>,
1037    #[serde(default, skip_serializing_if = "Option::is_none")]
1038    pub event: Option<String>,
1039    #[serde(default, skip_serializing_if = "Option::is_none")]
1040    pub level: Option<String>,
1041    #[serde(default)]
1042    pub excerpt: Vec<String>,
1043    #[serde(default, skip_serializing_if = "Option::is_none")]
1044    pub fingerprint: Option<String>,
1045}
1046
1047#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1048pub struct BugMonitorCapabilityReadiness {
1049    #[serde(default)]
1050    pub github_list_issues: bool,
1051    #[serde(default)]
1052    pub github_get_issue: bool,
1053    #[serde(default)]
1054    pub github_create_issue: bool,
1055    #[serde(default)]
1056    pub github_comment_on_issue: bool,
1057}
1058
1059#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1060pub struct BugMonitorCapabilityMatch {
1061    pub capability_id: String,
1062    pub provider: String,
1063    pub tool_name: String,
1064    pub binding_index: usize,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1068pub struct BugMonitorBindingCandidate {
1069    pub capability_id: String,
1070    pub binding_tool_name: String,
1071    #[serde(default)]
1072    pub aliases: Vec<String>,
1073    #[serde(default)]
1074    pub matched: bool,
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1078pub struct BugMonitorReadiness {
1079    #[serde(default)]
1080    pub config_valid: bool,
1081    #[serde(default)]
1082    pub repo_valid: bool,
1083    #[serde(default)]
1084    pub mcp_server_present: bool,
1085    #[serde(default)]
1086    pub mcp_connected: bool,
1087    #[serde(default)]
1088    pub github_read_ready: bool,
1089    #[serde(default)]
1090    pub github_write_ready: bool,
1091    #[serde(default)]
1092    pub selected_model_ready: bool,
1093    #[serde(default)]
1094    pub ingest_ready: bool,
1095    #[serde(default)]
1096    pub publish_ready: bool,
1097    #[serde(default)]
1098    pub runtime_ready: bool,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1102pub struct BugMonitorStatus {
1103    pub config: BugMonitorConfig,
1104    pub readiness: BugMonitorReadiness,
1105    #[serde(default)]
1106    pub runtime: BugMonitorRuntimeStatus,
1107    pub required_capabilities: BugMonitorCapabilityReadiness,
1108    #[serde(default)]
1109    pub missing_required_capabilities: Vec<String>,
1110    #[serde(default)]
1111    pub resolved_capabilities: Vec<BugMonitorCapabilityMatch>,
1112    #[serde(default)]
1113    pub discovered_mcp_tools: Vec<String>,
1114    #[serde(default)]
1115    pub selected_server_binding_candidates: Vec<BugMonitorBindingCandidate>,
1116    #[serde(default, skip_serializing_if = "Option::is_none")]
1117    pub binding_source_version: Option<String>,
1118    #[serde(default, skip_serializing_if = "Option::is_none")]
1119    pub bindings_last_merged_at_ms: Option<u64>,
1120    #[serde(default, skip_serializing_if = "Option::is_none")]
1121    pub selected_model: Option<ModelSpec>,
1122    #[serde(default)]
1123    pub pending_drafts: usize,
1124    #[serde(default)]
1125    pub pending_posts: usize,
1126    #[serde(default, skip_serializing_if = "Option::is_none")]
1127    pub last_activity_at_ms: Option<u64>,
1128    #[serde(default, skip_serializing_if = "Option::is_none")]
1129    pub last_error: Option<String>,
1130}
1131
1132#[derive(Debug, Clone, Serialize)]
1133pub struct ResourceConflict {
1134    pub key: String,
1135    pub expected_rev: Option<u64>,
1136    pub current_rev: Option<u64>,
1137}
1138
1139#[derive(Debug, Clone, Serialize)]
1140#[serde(tag = "type", rename_all = "snake_case")]
1141pub enum ResourceStoreError {
1142    InvalidKey { key: String },
1143    RevisionConflict(ResourceConflict),
1144    PersistFailed { message: String },
1145}
1146
1147#[derive(Debug, Clone, Serialize)]
1148#[serde(tag = "type", rename_all = "snake_case")]
1149pub enum RoutineStoreError {
1150    InvalidRoutineId { routine_id: String },
1151    InvalidSchedule { detail: String },
1152    PersistFailed { message: String },
1153}
1154
1155#[derive(Debug, Clone)]
1156pub enum StartupStatus {
1157    Starting,
1158    Ready,
1159    Failed,
1160}
1161
1162#[derive(Debug, Clone)]
1163pub struct StartupState {
1164    pub status: StartupStatus,
1165    pub phase: String,
1166    pub started_at_ms: u64,
1167    pub attempt_id: String,
1168    pub last_error: Option<String>,
1169}
1170
1171#[derive(Debug, Clone)]
1172pub struct StartupSnapshot {
1173    pub status: StartupStatus,
1174    pub phase: String,
1175    pub started_at_ms: u64,
1176    pub attempt_id: String,
1177    pub last_error: Option<String>,
1178    pub elapsed_ms: u64,
1179}
1180
1181#[derive(Clone)]
1182pub struct AppState {
1183    pub runtime: Arc<OnceLock<RuntimeState>>,
1184    pub startup: Arc<RwLock<StartupState>>,
1185    pub in_process_mode: Arc<AtomicBool>,
1186    pub api_token: Arc<RwLock<Option<String>>>,
1187    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
1188    pub run_registry: RunRegistry,
1189    pub run_stale_ms: u64,
1190    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
1191    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
1192    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
1193    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
1194    pub shared_resources_path: PathBuf,
1195    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
1196    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
1197    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
1198    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
1199    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
1200    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
1201    pub workflow_plan_drafts:
1202        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
1203    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
1204    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
1205    pub bug_monitor_incidents:
1206        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
1207    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
1208    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
1209    pub workflows: Arc<RwLock<WorkflowRegistry>>,
1210    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
1211    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
1212    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
1213    pub routine_session_policies:
1214        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
1215    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
1216    pub token_cost_per_1k_usd: f64,
1217    pub routines_path: PathBuf,
1218    pub routine_history_path: PathBuf,
1219    pub routine_runs_path: PathBuf,
1220    pub automations_v2_path: PathBuf,
1221    pub automation_v2_runs_path: PathBuf,
1222    pub bug_monitor_config_path: PathBuf,
1223    pub bug_monitor_drafts_path: PathBuf,
1224    pub bug_monitor_incidents_path: PathBuf,
1225    pub bug_monitor_posts_path: PathBuf,
1226    pub workflow_runs_path: PathBuf,
1227    pub workflow_hook_overrides_path: PathBuf,
1228    pub agent_teams: AgentTeamRuntime,
1229    pub web_ui_enabled: Arc<AtomicBool>,
1230    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
1231    pub server_base_url: Arc<std::sync::RwLock<String>>,
1232    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
1233    pub host_runtime_context: HostRuntimeContext,
1234    pub pack_manager: Arc<PackManager>,
1235    pub capability_resolver: Arc<CapabilityResolver>,
1236    pub preset_registry: Arc<PresetRegistry>,
1237}
1238
1239#[derive(Debug, Clone)]
1240struct StatusIndexUpdate {
1241    key: String,
1242    value: Value,
1243}
1244
1245impl AppState {
1246    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
1247        Self {
1248            runtime: Arc::new(OnceLock::new()),
1249            startup: Arc::new(RwLock::new(StartupState {
1250                status: StartupStatus::Starting,
1251                phase: "boot".to_string(),
1252                started_at_ms: now_ms(),
1253                attempt_id,
1254                last_error: None,
1255            })),
1256            in_process_mode: Arc::new(AtomicBool::new(in_process)),
1257            api_token: Arc::new(RwLock::new(None)),
1258            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
1259            run_registry: RunRegistry::new(),
1260            run_stale_ms: resolve_run_stale_ms(),
1261            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
1262            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
1263            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
1264            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
1265            shared_resources_path: resolve_shared_resources_path(),
1266            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
1267            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
1268            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1269            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
1270            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1271            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
1272            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1273            bug_monitor_config: Arc::new(RwLock::new(resolve_bug_monitor_env_config())),
1274            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1275            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
1276            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1277            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
1278            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
1279            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1280            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
1281            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
1282            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
1283            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1284            routines_path: resolve_routines_path(),
1285            routine_history_path: resolve_routine_history_path(),
1286            routine_runs_path: resolve_routine_runs_path(),
1287            automations_v2_path: resolve_automations_v2_path(),
1288            automation_v2_runs_path: resolve_automation_v2_runs_path(),
1289            bug_monitor_config_path: resolve_bug_monitor_config_path(),
1290            bug_monitor_drafts_path: resolve_bug_monitor_drafts_path(),
1291            bug_monitor_incidents_path: resolve_bug_monitor_incidents_path(),
1292            bug_monitor_posts_path: resolve_bug_monitor_posts_path(),
1293            workflow_runs_path: resolve_workflow_runs_path(),
1294            workflow_hook_overrides_path: resolve_workflow_hook_overrides_path(),
1295            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
1296            web_ui_enabled: Arc::new(AtomicBool::new(false)),
1297            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
1298            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
1299            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
1300            host_runtime_context: detect_host_runtime_context(),
1301            token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
1302            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
1303            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
1304            preset_registry: Arc::new(PresetRegistry::new(
1305                PackManager::default_root(),
1306                resolve_shared_paths()
1307                    .map(|paths| paths.canonical_root)
1308                    .unwrap_or_else(|_| {
1309                        dirs::home_dir()
1310                            .unwrap_or_else(|| PathBuf::from("."))
1311                            .join(".tandem")
1312                    }),
1313            )),
1314        }
1315    }
1316
1317    pub fn is_ready(&self) -> bool {
1318        self.runtime.get().is_some()
1319    }
1320
1321    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
1322        for _ in 0..attempts {
1323            if self.is_ready() {
1324                return true;
1325            }
1326            let startup = self.startup_snapshot().await;
1327            if matches!(startup.status, StartupStatus::Failed) {
1328                return false;
1329            }
1330            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
1331        }
1332        self.is_ready()
1333    }
1334
1335    pub fn mode_label(&self) -> &'static str {
1336        if self.in_process_mode.load(Ordering::Relaxed) {
1337            "in-process"
1338        } else {
1339            "sidecar"
1340        }
1341    }
1342
1343    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
1344        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
1345        if let Ok(mut guard) = self.web_ui_prefix.write() {
1346            *guard = normalize_web_ui_prefix(&prefix);
1347        }
1348    }
1349
1350    pub fn web_ui_enabled(&self) -> bool {
1351        self.web_ui_enabled.load(Ordering::Relaxed)
1352    }
1353
1354    pub fn web_ui_prefix(&self) -> String {
1355        self.web_ui_prefix
1356            .read()
1357            .map(|v| v.clone())
1358            .unwrap_or_else(|_| "/admin".to_string())
1359    }
1360
1361    pub fn set_server_base_url(&self, base_url: String) {
1362        if let Ok(mut guard) = self.server_base_url.write() {
1363            *guard = base_url;
1364        }
1365    }
1366
1367    pub fn server_base_url(&self) -> String {
1368        self.server_base_url
1369            .read()
1370            .map(|v| v.clone())
1371            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
1372    }
1373
1374    pub async fn api_token(&self) -> Option<String> {
1375        self.api_token.read().await.clone()
1376    }
1377
1378    pub async fn set_api_token(&self, token: Option<String>) {
1379        *self.api_token.write().await = token;
1380    }
1381
1382    pub async fn startup_snapshot(&self) -> StartupSnapshot {
1383        let state = self.startup.read().await.clone();
1384        StartupSnapshot {
1385            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
1386            status: state.status,
1387            phase: state.phase,
1388            started_at_ms: state.started_at_ms,
1389            attempt_id: state.attempt_id,
1390            last_error: state.last_error,
1391        }
1392    }
1393
1394    pub fn host_runtime_context(&self) -> HostRuntimeContext {
1395        self.runtime
1396            .get()
1397            .map(|runtime| runtime.host_runtime_context.clone())
1398            .unwrap_or_else(|| self.host_runtime_context.clone())
1399    }
1400
1401    pub async fn set_phase(&self, phase: impl Into<String>) {
1402        let mut startup = self.startup.write().await;
1403        startup.phase = phase.into();
1404    }
1405
1406    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
1407        self.runtime
1408            .set(runtime)
1409            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
1410        self.register_browser_tools().await?;
1411        self.tools
1412            .register_tool(
1413                "pack_builder".to_string(),
1414                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
1415            )
1416            .await;
1417        self.engine_loop
1418            .set_spawn_agent_hook(std::sync::Arc::new(
1419                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
1420            ))
1421            .await;
1422        self.engine_loop
1423            .set_tool_policy_hook(std::sync::Arc::new(
1424                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
1425            ))
1426            .await;
1427        self.engine_loop
1428            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
1429                self.clone(),
1430            )))
1431            .await;
1432        let _ = self.load_shared_resources().await;
1433        self.load_routines().await?;
1434        let _ = self.load_routine_history().await;
1435        let _ = self.load_routine_runs().await;
1436        self.load_automations_v2().await?;
1437        let _ = self.load_automation_v2_runs().await;
1438        let _ = self.load_bug_monitor_config().await;
1439        let _ = self.load_bug_monitor_drafts().await;
1440        let _ = self.load_bug_monitor_incidents().await;
1441        let _ = self.load_bug_monitor_posts().await;
1442        let _ = self.load_workflow_runs().await;
1443        let _ = self.load_workflow_hook_overrides().await;
1444        let _ = self.reload_workflows().await;
1445        let workspace_root = self.workspace_index.snapshot().await.root;
1446        let _ = self
1447            .agent_teams
1448            .ensure_loaded_for_workspace(&workspace_root)
1449            .await;
1450        let mut startup = self.startup.write().await;
1451        startup.status = StartupStatus::Ready;
1452        startup.phase = "ready".to_string();
1453        startup.last_error = None;
1454        Ok(())
1455    }
1456
1457    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
1458        let mut startup = self.startup.write().await;
1459        startup.status = StartupStatus::Failed;
1460        startup.phase = phase.into();
1461        startup.last_error = Some(error.into());
1462    }
1463
1464    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
1465        let runtime = self.channels_runtime.lock().await;
1466        runtime.statuses.clone()
1467    }
1468
1469    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
1470        let effective = self.config.get_effective_value().await;
1471        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
1472        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
1473
1474        let mut runtime = self.channels_runtime.lock().await;
1475        if let Some(listeners) = runtime.listeners.as_mut() {
1476            listeners.abort_all();
1477        }
1478        runtime.listeners = None;
1479        runtime.statuses.clear();
1480
1481        let mut status_map = std::collections::HashMap::new();
1482        status_map.insert(
1483            "telegram".to_string(),
1484            ChannelStatus {
1485                enabled: parsed.channels.telegram.is_some(),
1486                connected: false,
1487                last_error: None,
1488                active_sessions: 0,
1489                meta: serde_json::json!({}),
1490            },
1491        );
1492        status_map.insert(
1493            "discord".to_string(),
1494            ChannelStatus {
1495                enabled: parsed.channels.discord.is_some(),
1496                connected: false,
1497                last_error: None,
1498                active_sessions: 0,
1499                meta: serde_json::json!({}),
1500            },
1501        );
1502        status_map.insert(
1503            "slack".to_string(),
1504            ChannelStatus {
1505                enabled: parsed.channels.slack.is_some(),
1506                connected: false,
1507                last_error: None,
1508                active_sessions: 0,
1509                meta: serde_json::json!({}),
1510            },
1511        );
1512
1513        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
1514            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
1515            runtime.listeners = Some(listeners);
1516            for status in status_map.values_mut() {
1517                if status.enabled {
1518                    status.connected = true;
1519                }
1520            }
1521        }
1522
1523        runtime.statuses = status_map.clone();
1524        drop(runtime);
1525
1526        self.event_bus.publish(EngineEvent::new(
1527            "channel.status.changed",
1528            serde_json::json!({ "channels": status_map }),
1529        ));
1530        Ok(())
1531    }
1532
1533    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
1534        if !self.shared_resources_path.exists() {
1535            return Ok(());
1536        }
1537        let raw = fs::read_to_string(&self.shared_resources_path).await?;
1538        let parsed =
1539            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
1540                .unwrap_or_default();
1541        let mut guard = self.shared_resources.write().await;
1542        *guard = parsed;
1543        Ok(())
1544    }
1545
1546    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1547        if let Some(parent) = self.shared_resources_path.parent() {
1548            fs::create_dir_all(parent).await?;
1549        }
1550        let payload = {
1551            let guard = self.shared_resources.read().await;
1552            serde_json::to_string_pretty(&*guard)?
1553        };
1554        fs::write(&self.shared_resources_path, payload).await?;
1555        Ok(())
1556    }
1557
1558    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1559        self.shared_resources.read().await.get(key).cloned()
1560    }
1561
1562    pub async fn list_shared_resources(
1563        &self,
1564        prefix: Option<&str>,
1565        limit: usize,
1566    ) -> Vec<SharedResourceRecord> {
1567        let limit = limit.clamp(1, 500);
1568        let mut rows = self
1569            .shared_resources
1570            .read()
1571            .await
1572            .values()
1573            .filter(|record| {
1574                if let Some(prefix) = prefix {
1575                    record.key.starts_with(prefix)
1576                } else {
1577                    true
1578                }
1579            })
1580            .cloned()
1581            .collect::<Vec<_>>();
1582        rows.sort_by(|a, b| a.key.cmp(&b.key));
1583        rows.truncate(limit);
1584        rows
1585    }
1586
1587    pub async fn put_shared_resource(
1588        &self,
1589        key: String,
1590        value: Value,
1591        if_match_rev: Option<u64>,
1592        updated_by: String,
1593        ttl_ms: Option<u64>,
1594    ) -> Result<SharedResourceRecord, ResourceStoreError> {
1595        if !is_valid_resource_key(&key) {
1596            return Err(ResourceStoreError::InvalidKey { key });
1597        }
1598
1599        let now = now_ms();
1600        let mut guard = self.shared_resources.write().await;
1601        let existing = guard.get(&key).cloned();
1602
1603        if let Some(expected) = if_match_rev {
1604            let current = existing.as_ref().map(|row| row.rev);
1605            if current != Some(expected) {
1606                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1607                    key,
1608                    expected_rev: Some(expected),
1609                    current_rev: current,
1610                }));
1611            }
1612        }
1613
1614        let next_rev = existing
1615            .as_ref()
1616            .map(|row| row.rev.saturating_add(1))
1617            .unwrap_or(1);
1618
1619        let record = SharedResourceRecord {
1620            key: key.clone(),
1621            value,
1622            rev: next_rev,
1623            updated_at_ms: now,
1624            updated_by,
1625            ttl_ms,
1626        };
1627
1628        let previous = guard.insert(key.clone(), record.clone());
1629        drop(guard);
1630
1631        if let Err(error) = self.persist_shared_resources().await {
1632            let mut rollback = self.shared_resources.write().await;
1633            if let Some(previous) = previous {
1634                rollback.insert(key, previous);
1635            } else {
1636                rollback.remove(&key);
1637            }
1638            return Err(ResourceStoreError::PersistFailed {
1639                message: error.to_string(),
1640            });
1641        }
1642
1643        Ok(record)
1644    }
1645
1646    pub async fn delete_shared_resource(
1647        &self,
1648        key: &str,
1649        if_match_rev: Option<u64>,
1650    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1651        if !is_valid_resource_key(key) {
1652            return Err(ResourceStoreError::InvalidKey {
1653                key: key.to_string(),
1654            });
1655        }
1656
1657        let mut guard = self.shared_resources.write().await;
1658        let current = guard.get(key).cloned();
1659        if let Some(expected) = if_match_rev {
1660            let current_rev = current.as_ref().map(|row| row.rev);
1661            if current_rev != Some(expected) {
1662                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1663                    key: key.to_string(),
1664                    expected_rev: Some(expected),
1665                    current_rev,
1666                }));
1667            }
1668        }
1669
1670        let removed = guard.remove(key);
1671        drop(guard);
1672
1673        if let Err(error) = self.persist_shared_resources().await {
1674            if let Some(record) = removed.clone() {
1675                self.shared_resources
1676                    .write()
1677                    .await
1678                    .insert(record.key.clone(), record);
1679            }
1680            return Err(ResourceStoreError::PersistFailed {
1681                message: error.to_string(),
1682            });
1683        }
1684
1685        Ok(removed)
1686    }
1687
1688    pub async fn load_routines(&self) -> anyhow::Result<()> {
1689        if !self.routines_path.exists() {
1690            return Ok(());
1691        }
1692        let raw = fs::read_to_string(&self.routines_path).await?;
1693        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
1694            Ok(parsed) => {
1695                let mut guard = self.routines.write().await;
1696                *guard = parsed;
1697                Ok(())
1698            }
1699            Err(primary_err) => {
1700                let backup_path = sibling_backup_path(&self.routines_path);
1701                if backup_path.exists() {
1702                    let backup_raw = fs::read_to_string(&backup_path).await?;
1703                    if let Ok(parsed_backup) = serde_json::from_str::<
1704                        std::collections::HashMap<String, RoutineSpec>,
1705                    >(&backup_raw)
1706                    {
1707                        let mut guard = self.routines.write().await;
1708                        *guard = parsed_backup;
1709                        return Ok(());
1710                    }
1711                }
1712                Err(anyhow::anyhow!(
1713                    "failed to parse routines store {}: {primary_err}",
1714                    self.routines_path.display()
1715                ))
1716            }
1717        }
1718    }
1719
1720    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1721        if !self.routine_history_path.exists() {
1722            return Ok(());
1723        }
1724        let raw = fs::read_to_string(&self.routine_history_path).await?;
1725        let parsed = serde_json::from_str::<
1726            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1727        >(&raw)
1728        .unwrap_or_default();
1729        let mut guard = self.routine_history.write().await;
1730        *guard = parsed;
1731        Ok(())
1732    }
1733
1734    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1735        if !self.routine_runs_path.exists() {
1736            return Ok(());
1737        }
1738        let raw = fs::read_to_string(&self.routine_runs_path).await?;
1739        let parsed =
1740            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1741                .unwrap_or_default();
1742        let mut guard = self.routine_runs.write().await;
1743        *guard = parsed;
1744        Ok(())
1745    }
1746
1747    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
1748        if let Some(parent) = self.routines_path.parent() {
1749            fs::create_dir_all(parent).await?;
1750        }
1751        let (payload, is_empty) = {
1752            let guard = self.routines.read().await;
1753            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
1754        };
1755        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
1756            let existing_raw = fs::read_to_string(&self.routines_path)
1757                .await
1758                .unwrap_or_default();
1759            let existing_has_rows = serde_json::from_str::<
1760                std::collections::HashMap<String, RoutineSpec>,
1761            >(&existing_raw)
1762            .map(|rows| !rows.is_empty())
1763            .unwrap_or(true);
1764            if existing_has_rows {
1765                return Err(anyhow::anyhow!(
1766                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
1767                    self.routines_path.display()
1768                ));
1769            }
1770        }
1771        let backup_path = sibling_backup_path(&self.routines_path);
1772        if self.routines_path.exists() {
1773            let _ = fs::copy(&self.routines_path, &backup_path).await;
1774        }
1775        let tmp_path = sibling_tmp_path(&self.routines_path);
1776        fs::write(&tmp_path, payload).await?;
1777        fs::rename(&tmp_path, &self.routines_path).await?;
1778        Ok(())
1779    }
1780
1781    pub async fn persist_routines(&self) -> anyhow::Result<()> {
1782        self.persist_routines_inner(false).await
1783    }
1784
1785    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1786        if let Some(parent) = self.routine_history_path.parent() {
1787            fs::create_dir_all(parent).await?;
1788        }
1789        let payload = {
1790            let guard = self.routine_history.read().await;
1791            serde_json::to_string_pretty(&*guard)?
1792        };
1793        fs::write(&self.routine_history_path, payload).await?;
1794        Ok(())
1795    }
1796
1797    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1798        if let Some(parent) = self.routine_runs_path.parent() {
1799            fs::create_dir_all(parent).await?;
1800        }
1801        let payload = {
1802            let guard = self.routine_runs.read().await;
1803            serde_json::to_string_pretty(&*guard)?
1804        };
1805        fs::write(&self.routine_runs_path, payload).await?;
1806        Ok(())
1807    }
1808
1809    pub async fn put_routine(
1810        &self,
1811        mut routine: RoutineSpec,
1812    ) -> Result<RoutineSpec, RoutineStoreError> {
1813        if routine.routine_id.trim().is_empty() {
1814            return Err(RoutineStoreError::InvalidRoutineId {
1815                routine_id: routine.routine_id,
1816            });
1817        }
1818
1819        routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1820        routine.output_targets = normalize_non_empty_list(routine.output_targets);
1821
1822        let now = now_ms();
1823        let next_schedule_fire =
1824            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1825                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1826                    detail: "invalid schedule or timezone".to_string(),
1827                })?;
1828        match routine.schedule {
1829            RoutineSchedule::IntervalSeconds { seconds } => {
1830                if seconds == 0 {
1831                    return Err(RoutineStoreError::InvalidSchedule {
1832                        detail: "interval_seconds must be > 0".to_string(),
1833                    });
1834                }
1835                let _ = seconds;
1836            }
1837            RoutineSchedule::Cron { .. } => {}
1838        }
1839        if routine.next_fire_at_ms.is_none() {
1840            routine.next_fire_at_ms = Some(next_schedule_fire);
1841        }
1842
1843        let mut guard = self.routines.write().await;
1844        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1845        drop(guard);
1846
1847        if let Err(error) = self.persist_routines().await {
1848            let mut rollback = self.routines.write().await;
1849            if let Some(previous) = previous {
1850                rollback.insert(previous.routine_id.clone(), previous);
1851            } else {
1852                rollback.remove(&routine.routine_id);
1853            }
1854            return Err(RoutineStoreError::PersistFailed {
1855                message: error.to_string(),
1856            });
1857        }
1858
1859        Ok(routine)
1860    }
1861
1862    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1863        let mut rows = self
1864            .routines
1865            .read()
1866            .await
1867            .values()
1868            .cloned()
1869            .collect::<Vec<_>>();
1870        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1871        rows
1872    }
1873
1874    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1875        self.routines.read().await.get(routine_id).cloned()
1876    }
1877
1878    pub async fn delete_routine(
1879        &self,
1880        routine_id: &str,
1881    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1882        let mut guard = self.routines.write().await;
1883        let removed = guard.remove(routine_id);
1884        drop(guard);
1885
1886        let allow_empty_overwrite = self.routines.read().await.is_empty();
1887        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
1888            if let Some(removed) = removed.clone() {
1889                self.routines
1890                    .write()
1891                    .await
1892                    .insert(removed.routine_id.clone(), removed);
1893            }
1894            return Err(RoutineStoreError::PersistFailed {
1895                message: error.to_string(),
1896            });
1897        }
1898        Ok(removed)
1899    }
1900
1901    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1902        let mut plans = Vec::new();
1903        let mut guard = self.routines.write().await;
1904        for routine in guard.values_mut() {
1905            if routine.status != RoutineStatus::Active {
1906                continue;
1907            }
1908            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1909                continue;
1910            };
1911            if now_ms < next_fire_at_ms {
1912                continue;
1913            }
1914            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1915                now_ms,
1916                next_fire_at_ms,
1917                &routine.schedule,
1918                &routine.timezone,
1919                &routine.misfire_policy,
1920            );
1921            routine.next_fire_at_ms = Some(next_fire_at_ms);
1922            if run_count == 0 {
1923                continue;
1924            }
1925            plans.push(RoutineTriggerPlan {
1926                routine_id: routine.routine_id.clone(),
1927                run_count,
1928                scheduled_at_ms: now_ms,
1929                next_fire_at_ms,
1930            });
1931        }
1932        drop(guard);
1933        let _ = self.persist_routines().await;
1934        plans
1935    }
1936
1937    pub async fn mark_routine_fired(
1938        &self,
1939        routine_id: &str,
1940        fired_at_ms: u64,
1941    ) -> Option<RoutineSpec> {
1942        let mut guard = self.routines.write().await;
1943        let routine = guard.get_mut(routine_id)?;
1944        routine.last_fired_at_ms = Some(fired_at_ms);
1945        let updated = routine.clone();
1946        drop(guard);
1947        let _ = self.persist_routines().await;
1948        Some(updated)
1949    }
1950
1951    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1952        let mut history = self.routine_history.write().await;
1953        history
1954            .entry(event.routine_id.clone())
1955            .or_default()
1956            .push(event);
1957        drop(history);
1958        let _ = self.persist_routine_history().await;
1959    }
1960
1961    pub async fn list_routine_history(
1962        &self,
1963        routine_id: &str,
1964        limit: usize,
1965    ) -> Vec<RoutineHistoryEvent> {
1966        let limit = limit.clamp(1, 500);
1967        let mut rows = self
1968            .routine_history
1969            .read()
1970            .await
1971            .get(routine_id)
1972            .cloned()
1973            .unwrap_or_default();
1974        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1975        rows.truncate(limit);
1976        rows
1977    }
1978
1979    pub async fn create_routine_run(
1980        &self,
1981        routine: &RoutineSpec,
1982        trigger_type: &str,
1983        run_count: u32,
1984        status: RoutineRunStatus,
1985        detail: Option<String>,
1986    ) -> RoutineRunRecord {
1987        let now = now_ms();
1988        let record = RoutineRunRecord {
1989            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1990            routine_id: routine.routine_id.clone(),
1991            trigger_type: trigger_type.to_string(),
1992            run_count,
1993            status,
1994            created_at_ms: now,
1995            updated_at_ms: now,
1996            fired_at_ms: Some(now),
1997            started_at_ms: None,
1998            finished_at_ms: None,
1999            requires_approval: routine.requires_approval,
2000            approval_reason: None,
2001            denial_reason: None,
2002            paused_reason: None,
2003            detail,
2004            entrypoint: routine.entrypoint.clone(),
2005            args: routine.args.clone(),
2006            allowed_tools: routine.allowed_tools.clone(),
2007            output_targets: routine.output_targets.clone(),
2008            artifacts: Vec::new(),
2009            active_session_ids: Vec::new(),
2010            latest_session_id: None,
2011            prompt_tokens: 0,
2012            completion_tokens: 0,
2013            total_tokens: 0,
2014            estimated_cost_usd: 0.0,
2015        };
2016        self.routine_runs
2017            .write()
2018            .await
2019            .insert(record.run_id.clone(), record.clone());
2020        let _ = self.persist_routine_runs().await;
2021        record
2022    }
2023
2024    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
2025        self.routine_runs.read().await.get(run_id).cloned()
2026    }
2027
2028    pub async fn list_routine_runs(
2029        &self,
2030        routine_id: Option<&str>,
2031        limit: usize,
2032    ) -> Vec<RoutineRunRecord> {
2033        let mut rows = self
2034            .routine_runs
2035            .read()
2036            .await
2037            .values()
2038            .filter(|row| {
2039                if let Some(id) = routine_id {
2040                    row.routine_id == id
2041                } else {
2042                    true
2043                }
2044            })
2045            .cloned()
2046            .collect::<Vec<_>>();
2047        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2048        rows.truncate(limit.clamp(1, 500));
2049        rows
2050    }
2051
2052    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
2053        let mut guard = self.routine_runs.write().await;
2054        let next_run_id = guard
2055            .values()
2056            .filter(|row| row.status == RoutineRunStatus::Queued)
2057            .min_by(|a, b| {
2058                a.created_at_ms
2059                    .cmp(&b.created_at_ms)
2060                    .then_with(|| a.run_id.cmp(&b.run_id))
2061            })
2062            .map(|row| row.run_id.clone())?;
2063        let now = now_ms();
2064        let row = guard.get_mut(&next_run_id)?;
2065        row.status = RoutineRunStatus::Running;
2066        row.updated_at_ms = now;
2067        row.started_at_ms = Some(now);
2068        let claimed = row.clone();
2069        drop(guard);
2070        let _ = self.persist_routine_runs().await;
2071        Some(claimed)
2072    }
2073
2074    pub async fn set_routine_session_policy(
2075        &self,
2076        session_id: String,
2077        run_id: String,
2078        routine_id: String,
2079        allowed_tools: Vec<String>,
2080    ) {
2081        let policy = RoutineSessionPolicy {
2082            session_id: session_id.clone(),
2083            run_id,
2084            routine_id,
2085            allowed_tools: normalize_allowed_tools(allowed_tools),
2086        };
2087        self.routine_session_policies
2088            .write()
2089            .await
2090            .insert(session_id, policy);
2091    }
2092
2093    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
2094        self.routine_session_policies
2095            .read()
2096            .await
2097            .get(session_id)
2098            .cloned()
2099    }
2100
2101    pub async fn clear_routine_session_policy(&self, session_id: &str) {
2102        self.routine_session_policies
2103            .write()
2104            .await
2105            .remove(session_id);
2106    }
2107
2108    pub async fn update_routine_run_status(
2109        &self,
2110        run_id: &str,
2111        status: RoutineRunStatus,
2112        reason: Option<String>,
2113    ) -> Option<RoutineRunRecord> {
2114        let mut guard = self.routine_runs.write().await;
2115        let row = guard.get_mut(run_id)?;
2116        row.status = status.clone();
2117        row.updated_at_ms = now_ms();
2118        match status {
2119            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
2120            RoutineRunStatus::Running => {
2121                row.started_at_ms.get_or_insert_with(now_ms);
2122                if let Some(detail) = reason {
2123                    row.detail = Some(detail);
2124                }
2125            }
2126            RoutineRunStatus::Denied => row.denial_reason = reason,
2127            RoutineRunStatus::Paused => row.paused_reason = reason,
2128            RoutineRunStatus::Completed
2129            | RoutineRunStatus::Failed
2130            | RoutineRunStatus::Cancelled => {
2131                row.finished_at_ms = Some(now_ms());
2132                if let Some(detail) = reason {
2133                    row.detail = Some(detail);
2134                }
2135            }
2136            _ => {
2137                if let Some(detail) = reason {
2138                    row.detail = Some(detail);
2139                }
2140            }
2141        }
2142        let updated = row.clone();
2143        drop(guard);
2144        let _ = self.persist_routine_runs().await;
2145        Some(updated)
2146    }
2147
2148    pub async fn append_routine_run_artifact(
2149        &self,
2150        run_id: &str,
2151        artifact: RoutineRunArtifact,
2152    ) -> Option<RoutineRunRecord> {
2153        let mut guard = self.routine_runs.write().await;
2154        let row = guard.get_mut(run_id)?;
2155        row.updated_at_ms = now_ms();
2156        row.artifacts.push(artifact);
2157        let updated = row.clone();
2158        drop(guard);
2159        let _ = self.persist_routine_runs().await;
2160        Some(updated)
2161    }
2162
2163    pub async fn add_active_session_id(
2164        &self,
2165        run_id: &str,
2166        session_id: String,
2167    ) -> Option<RoutineRunRecord> {
2168        let mut guard = self.routine_runs.write().await;
2169        let row = guard.get_mut(run_id)?;
2170        if !row.active_session_ids.iter().any(|id| id == &session_id) {
2171            row.active_session_ids.push(session_id);
2172        }
2173        row.latest_session_id = row.active_session_ids.last().cloned();
2174        row.updated_at_ms = now_ms();
2175        let updated = row.clone();
2176        drop(guard);
2177        let _ = self.persist_routine_runs().await;
2178        Some(updated)
2179    }
2180
2181    pub async fn clear_active_session_id(
2182        &self,
2183        run_id: &str,
2184        session_id: &str,
2185    ) -> Option<RoutineRunRecord> {
2186        let mut guard = self.routine_runs.write().await;
2187        let row = guard.get_mut(run_id)?;
2188        row.active_session_ids.retain(|id| id != session_id);
2189        row.updated_at_ms = now_ms();
2190        let updated = row.clone();
2191        drop(guard);
2192        let _ = self.persist_routine_runs().await;
2193        Some(updated)
2194    }
2195
2196    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
2197        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
2198        let mut loaded_from_alternate = false;
2199        let mut path_counts = Vec::new();
2200        let mut canonical_loaded = false;
2201        if self.automations_v2_path.exists() {
2202            let raw = fs::read_to_string(&self.automations_v2_path).await?;
2203            if raw.trim().is_empty() || raw.trim() == "{}" {
2204                path_counts.push((self.automations_v2_path.clone(), 0usize));
2205            } else {
2206                let parsed = parse_automation_v2_file(&raw);
2207                path_counts.push((self.automations_v2_path.clone(), parsed.len()));
2208                canonical_loaded = !parsed.is_empty();
2209                merged = parsed;
2210            }
2211        } else {
2212            path_counts.push((self.automations_v2_path.clone(), 0usize));
2213        }
2214        if !canonical_loaded {
2215            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2216                if path == self.automations_v2_path {
2217                    continue;
2218                }
2219                if !path.exists() {
2220                    path_counts.push((path, 0usize));
2221                    continue;
2222                }
2223                let raw = fs::read_to_string(&path).await?;
2224                if raw.trim().is_empty() || raw.trim() == "{}" {
2225                    path_counts.push((path, 0usize));
2226                    continue;
2227                }
2228                let parsed = parse_automation_v2_file(&raw);
2229                path_counts.push((path.clone(), parsed.len()));
2230                if !parsed.is_empty() {
2231                    loaded_from_alternate = true;
2232                }
2233                for (automation_id, automation) in parsed {
2234                    match merged.get(&automation_id) {
2235                        Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
2236                        _ => {
2237                            merged.insert(automation_id, automation);
2238                        }
2239                    }
2240                }
2241            }
2242        } else {
2243            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2244                if path == self.automations_v2_path {
2245                    continue;
2246                }
2247                if !path.exists() {
2248                    path_counts.push((path, 0usize));
2249                    continue;
2250                }
2251                let raw = fs::read_to_string(&path).await?;
2252                let count = if raw.trim().is_empty() || raw.trim() == "{}" {
2253                    0usize
2254                } else {
2255                    parse_automation_v2_file(&raw).len()
2256                };
2257                path_counts.push((path, count));
2258            }
2259        }
2260        let active_path = self.automations_v2_path.display().to_string();
2261        let path_count_summary = path_counts
2262            .iter()
2263            .map(|(path, count)| format!("{}={count}", path.display()))
2264            .collect::<Vec<_>>();
2265        tracing::info!(
2266            active_path,
2267            canonical_loaded,
2268            path_counts = ?path_count_summary,
2269            merged_count = merged.len(),
2270            "loaded automation v2 definitions"
2271        );
2272        *self.automations_v2.write().await = merged;
2273        if loaded_from_alternate {
2274            let _ = self.persist_automations_v2().await;
2275        }
2276        Ok(())
2277    }
2278
2279    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
2280        let payload = {
2281            let guard = self.automations_v2.read().await;
2282            serde_json::to_string_pretty(&*guard)?
2283        };
2284        if let Some(parent) = self.automations_v2_path.parent() {
2285            fs::create_dir_all(parent).await?;
2286        }
2287        fs::write(&self.automations_v2_path, &payload).await?;
2288        Ok(())
2289    }
2290
2291    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
2292        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
2293        let mut loaded_from_alternate = false;
2294        let mut path_counts = Vec::new();
2295        for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
2296            if !path.exists() {
2297                path_counts.push((path, 0usize));
2298                continue;
2299            }
2300            let raw = fs::read_to_string(&path).await?;
2301            if raw.trim().is_empty() || raw.trim() == "{}" {
2302                path_counts.push((path, 0usize));
2303                continue;
2304            }
2305            let parsed = parse_automation_v2_runs_file(&raw);
2306            path_counts.push((path.clone(), parsed.len()));
2307            if path != self.automation_v2_runs_path {
2308                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2309            }
2310            for (run_id, run) in parsed {
2311                match merged.get(&run_id) {
2312                    Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
2313                    _ => {
2314                        merged.insert(run_id, run);
2315                    }
2316                }
2317            }
2318        }
2319        let active_runs_path = self.automation_v2_runs_path.display().to_string();
2320        let run_path_count_summary = path_counts
2321            .iter()
2322            .map(|(path, count)| format!("{}={count}", path.display()))
2323            .collect::<Vec<_>>();
2324        tracing::info!(
2325            active_path = active_runs_path,
2326            path_counts = ?run_path_count_summary,
2327            merged_count = merged.len(),
2328            "loaded automation v2 runs"
2329        );
2330        *self.automation_v2_runs.write().await = merged;
2331        let recovered = self
2332            .recover_automation_definitions_from_run_snapshots()
2333            .await?;
2334        let automation_count = self.automations_v2.read().await.len();
2335        let run_count = self.automation_v2_runs.read().await.len();
2336        if automation_count == 0 && run_count > 0 {
2337            let active_automations_path = self.automations_v2_path.display().to_string();
2338            let active_runs_path = self.automation_v2_runs_path.display().to_string();
2339            tracing::warn!(
2340                active_automations_path,
2341                active_runs_path,
2342                run_count,
2343                "automation v2 definitions are empty while run history exists"
2344            );
2345        }
2346        if loaded_from_alternate || recovered > 0 {
2347            let _ = self.persist_automation_v2_runs().await;
2348        }
2349        Ok(())
2350    }
2351
2352    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
2353        let payload = {
2354            let guard = self.automation_v2_runs.read().await;
2355            serde_json::to_string_pretty(&*guard)?
2356        };
2357        if let Some(parent) = self.automation_v2_runs_path.parent() {
2358            fs::create_dir_all(parent).await?;
2359        }
2360        fs::write(&self.automation_v2_runs_path, &payload).await?;
2361        Ok(())
2362    }
2363
2364    async fn verify_automation_v2_persisted(
2365        &self,
2366        automation_id: &str,
2367        expected_present: bool,
2368    ) -> anyhow::Result<()> {
2369        let active_raw = if self.automations_v2_path.exists() {
2370            fs::read_to_string(&self.automations_v2_path).await?
2371        } else {
2372            String::new()
2373        };
2374        let active_parsed = parse_automation_v2_file(&active_raw);
2375        let active_present = active_parsed.contains_key(automation_id);
2376        if active_present != expected_present {
2377            let active_path = self.automations_v2_path.display().to_string();
2378            tracing::error!(
2379                automation_id,
2380                expected_present,
2381                actual_present = active_present,
2382                count = active_parsed.len(),
2383                active_path,
2384                "automation v2 persistence verification failed"
2385            );
2386            anyhow::bail!(
2387                "automation v2 persistence verification failed for `{}`",
2388                automation_id
2389            );
2390        }
2391        let mut alternate_mismatches = Vec::new();
2392        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2393            if path == self.automations_v2_path {
2394                continue;
2395            }
2396            let raw = if path.exists() {
2397                fs::read_to_string(&path).await?
2398            } else {
2399                String::new()
2400            };
2401            let parsed = parse_automation_v2_file(&raw);
2402            let present = parsed.contains_key(automation_id);
2403            if present != expected_present {
2404                alternate_mismatches.push(format!(
2405                    "{} expected_present={} actual_present={} count={}",
2406                    path.display(),
2407                    expected_present,
2408                    present,
2409                    parsed.len()
2410                ));
2411            }
2412        }
2413        if !alternate_mismatches.is_empty() {
2414            let active_path = self.automations_v2_path.display().to_string();
2415            tracing::warn!(
2416                automation_id,
2417                expected_present,
2418                mismatches = ?alternate_mismatches,
2419                active_path,
2420                "automation v2 alternate persistence paths are stale"
2421            );
2422        }
2423        Ok(())
2424    }
2425
2426    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
2427        let runs = self
2428            .automation_v2_runs
2429            .read()
2430            .await
2431            .values()
2432            .cloned()
2433            .collect::<Vec<_>>();
2434        let mut guard = self.automations_v2.write().await;
2435        let mut recovered = 0usize;
2436        for run in runs {
2437            let Some(snapshot) = run.automation_snapshot.clone() else {
2438                continue;
2439            };
2440            let should_replace = match guard.get(&run.automation_id) {
2441                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
2442                None => true,
2443            };
2444            if should_replace {
2445                if !guard.contains_key(&run.automation_id) {
2446                    recovered += 1;
2447                }
2448                guard.insert(run.automation_id.clone(), snapshot);
2449            }
2450        }
2451        drop(guard);
2452        if recovered > 0 {
2453            let active_path = self.automations_v2_path.display().to_string();
2454            tracing::warn!(
2455                recovered,
2456                active_path,
2457                "recovered automation v2 definitions from run snapshots"
2458            );
2459            self.persist_automations_v2().await?;
2460        }
2461        Ok(recovered)
2462    }
2463
2464    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
2465        let path = if self.bug_monitor_config_path.exists() {
2466            self.bug_monitor_config_path.clone()
2467        } else if legacy_failure_reporter_path("failure_reporter_config.json").exists() {
2468            legacy_failure_reporter_path("failure_reporter_config.json")
2469        } else {
2470            return Ok(());
2471        };
2472        let raw = fs::read_to_string(path).await?;
2473        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
2474            .unwrap_or_else(|_| resolve_bug_monitor_env_config());
2475        *self.bug_monitor_config.write().await = parsed;
2476        Ok(())
2477    }
2478
2479    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
2480        if let Some(parent) = self.bug_monitor_config_path.parent() {
2481            fs::create_dir_all(parent).await?;
2482        }
2483        let payload = {
2484            let guard = self.bug_monitor_config.read().await;
2485            serde_json::to_string_pretty(&*guard)?
2486        };
2487        fs::write(&self.bug_monitor_config_path, payload).await?;
2488        Ok(())
2489    }
2490
2491    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
2492        self.bug_monitor_config.read().await.clone()
2493    }
2494
2495    pub async fn put_bug_monitor_config(
2496        &self,
2497        mut config: BugMonitorConfig,
2498    ) -> anyhow::Result<BugMonitorConfig> {
2499        config.workspace_root = config
2500            .workspace_root
2501            .as_ref()
2502            .map(|v| v.trim().to_string())
2503            .filter(|v| !v.is_empty());
2504        if let Some(repo) = config.repo.as_ref() {
2505            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
2506                anyhow::bail!("repo must be in owner/repo format");
2507            }
2508        }
2509        if let Some(server) = config.mcp_server.as_ref() {
2510            let servers = self.mcp.list().await;
2511            if !servers.contains_key(server) {
2512                anyhow::bail!("unknown mcp server `{server}`");
2513            }
2514        }
2515        if let Some(model_policy) = config.model_policy.as_ref() {
2516            crate::http::routines_automations::validate_model_policy(model_policy)
2517                .map_err(anyhow::Error::msg)?;
2518        }
2519        config.updated_at_ms = now_ms();
2520        *self.bug_monitor_config.write().await = config.clone();
2521        self.persist_bug_monitor_config().await?;
2522        Ok(config)
2523    }
2524
2525    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2526        let path = if self.bug_monitor_drafts_path.exists() {
2527            self.bug_monitor_drafts_path.clone()
2528        } else if legacy_failure_reporter_path("failure_reporter_drafts.json").exists() {
2529            legacy_failure_reporter_path("failure_reporter_drafts.json")
2530        } else {
2531            return Ok(());
2532        };
2533        let raw = fs::read_to_string(path).await?;
2534        let parsed =
2535            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
2536                .unwrap_or_default();
2537        *self.bug_monitor_drafts.write().await = parsed;
2538        Ok(())
2539    }
2540
2541    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2542        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
2543            fs::create_dir_all(parent).await?;
2544        }
2545        let payload = {
2546            let guard = self.bug_monitor_drafts.read().await;
2547            serde_json::to_string_pretty(&*guard)?
2548        };
2549        fs::write(&self.bug_monitor_drafts_path, payload).await?;
2550        Ok(())
2551    }
2552
2553    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2554        let path = if self.bug_monitor_incidents_path.exists() {
2555            self.bug_monitor_incidents_path.clone()
2556        } else if legacy_failure_reporter_path("failure_reporter_incidents.json").exists() {
2557            legacy_failure_reporter_path("failure_reporter_incidents.json")
2558        } else {
2559            return Ok(());
2560        };
2561        let raw = fs::read_to_string(path).await?;
2562        let parsed = serde_json::from_str::<
2563            std::collections::HashMap<String, BugMonitorIncidentRecord>,
2564        >(&raw)
2565        .unwrap_or_default();
2566        *self.bug_monitor_incidents.write().await = parsed;
2567        Ok(())
2568    }
2569
2570    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2571        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
2572            fs::create_dir_all(parent).await?;
2573        }
2574        let payload = {
2575            let guard = self.bug_monitor_incidents.read().await;
2576            serde_json::to_string_pretty(&*guard)?
2577        };
2578        fs::write(&self.bug_monitor_incidents_path, payload).await?;
2579        Ok(())
2580    }
2581
2582    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
2583        let path = if self.bug_monitor_posts_path.exists() {
2584            self.bug_monitor_posts_path.clone()
2585        } else if legacy_failure_reporter_path("failure_reporter_posts.json").exists() {
2586            legacy_failure_reporter_path("failure_reporter_posts.json")
2587        } else {
2588            return Ok(());
2589        };
2590        let raw = fs::read_to_string(path).await?;
2591        let parsed =
2592            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
2593                .unwrap_or_default();
2594        *self.bug_monitor_posts.write().await = parsed;
2595        Ok(())
2596    }
2597
2598    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
2599        if let Some(parent) = self.bug_monitor_posts_path.parent() {
2600            fs::create_dir_all(parent).await?;
2601        }
2602        let payload = {
2603            let guard = self.bug_monitor_posts.read().await;
2604            serde_json::to_string_pretty(&*guard)?
2605        };
2606        fs::write(&self.bug_monitor_posts_path, payload).await?;
2607        Ok(())
2608    }
2609
2610    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
2611        let mut rows = self
2612            .bug_monitor_incidents
2613            .read()
2614            .await
2615            .values()
2616            .cloned()
2617            .collect::<Vec<_>>();
2618        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2619        rows.truncate(limit.clamp(1, 200));
2620        rows
2621    }
2622
2623    pub async fn get_bug_monitor_incident(
2624        &self,
2625        incident_id: &str,
2626    ) -> Option<BugMonitorIncidentRecord> {
2627        self.bug_monitor_incidents
2628            .read()
2629            .await
2630            .get(incident_id)
2631            .cloned()
2632    }
2633
2634    pub async fn put_bug_monitor_incident(
2635        &self,
2636        incident: BugMonitorIncidentRecord,
2637    ) -> anyhow::Result<BugMonitorIncidentRecord> {
2638        self.bug_monitor_incidents
2639            .write()
2640            .await
2641            .insert(incident.incident_id.clone(), incident.clone());
2642        self.persist_bug_monitor_incidents().await?;
2643        Ok(incident)
2644    }
2645
2646    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
2647        let mut rows = self
2648            .bug_monitor_posts
2649            .read()
2650            .await
2651            .values()
2652            .cloned()
2653            .collect::<Vec<_>>();
2654        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2655        rows.truncate(limit.clamp(1, 200));
2656        rows
2657    }
2658
2659    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
2660        self.bug_monitor_posts.read().await.get(post_id).cloned()
2661    }
2662
2663    pub async fn put_bug_monitor_post(
2664        &self,
2665        post: BugMonitorPostRecord,
2666    ) -> anyhow::Result<BugMonitorPostRecord> {
2667        self.bug_monitor_posts
2668            .write()
2669            .await
2670            .insert(post.post_id.clone(), post.clone());
2671        self.persist_bug_monitor_posts().await?;
2672        Ok(post)
2673    }
2674
2675    pub async fn update_bug_monitor_runtime_status(
2676        &self,
2677        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
2678    ) -> BugMonitorRuntimeStatus {
2679        let mut guard = self.bug_monitor_runtime_status.write().await;
2680        update(&mut guard);
2681        guard.clone()
2682    }
2683
2684    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
2685        let mut rows = self
2686            .bug_monitor_drafts
2687            .read()
2688            .await
2689            .values()
2690            .cloned()
2691            .collect::<Vec<_>>();
2692        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2693        rows.truncate(limit.clamp(1, 200));
2694        rows
2695    }
2696
2697    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
2698        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
2699    }
2700
2701    pub async fn put_bug_monitor_draft(
2702        &self,
2703        draft: BugMonitorDraftRecord,
2704    ) -> anyhow::Result<BugMonitorDraftRecord> {
2705        self.bug_monitor_drafts
2706            .write()
2707            .await
2708            .insert(draft.draft_id.clone(), draft.clone());
2709        self.persist_bug_monitor_drafts().await?;
2710        Ok(draft)
2711    }
2712
2713    pub async fn submit_bug_monitor_draft(
2714        &self,
2715        mut submission: BugMonitorSubmission,
2716    ) -> anyhow::Result<BugMonitorDraftRecord> {
2717        fn normalize_optional(value: Option<String>) -> Option<String> {
2718            value
2719                .map(|v| v.trim().to_string())
2720                .filter(|v| !v.is_empty())
2721        }
2722
2723        fn compute_fingerprint(parts: &[&str]) -> String {
2724            use std::hash::{Hash, Hasher};
2725
2726            let mut hasher = std::collections::hash_map::DefaultHasher::new();
2727            for part in parts {
2728                part.hash(&mut hasher);
2729            }
2730            format!("{:016x}", hasher.finish())
2731        }
2732
2733        submission.repo = normalize_optional(submission.repo);
2734        submission.title = normalize_optional(submission.title);
2735        submission.detail = normalize_optional(submission.detail);
2736        submission.source = normalize_optional(submission.source);
2737        submission.run_id = normalize_optional(submission.run_id);
2738        submission.session_id = normalize_optional(submission.session_id);
2739        submission.correlation_id = normalize_optional(submission.correlation_id);
2740        submission.file_name = normalize_optional(submission.file_name);
2741        submission.process = normalize_optional(submission.process);
2742        submission.component = normalize_optional(submission.component);
2743        submission.event = normalize_optional(submission.event);
2744        submission.level = normalize_optional(submission.level);
2745        submission.fingerprint = normalize_optional(submission.fingerprint);
2746        submission.excerpt = submission
2747            .excerpt
2748            .into_iter()
2749            .map(|line| line.trim_end().to_string())
2750            .filter(|line| !line.is_empty())
2751            .take(50)
2752            .collect();
2753
2754        let config = self.bug_monitor_config().await;
2755        let repo = submission
2756            .repo
2757            .clone()
2758            .or(config.repo.clone())
2759            .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
2760        if !is_valid_owner_repo_slug(&repo) {
2761            anyhow::bail!("Bug Monitor repo must be in owner/repo format");
2762        }
2763
2764        let title = submission.title.clone().unwrap_or_else(|| {
2765            if let Some(event) = submission.event.as_ref() {
2766                format!("Failure detected in {event}")
2767            } else if let Some(component) = submission.component.as_ref() {
2768                format!("Failure detected in {component}")
2769            } else if let Some(process) = submission.process.as_ref() {
2770                format!("Failure detected in {process}")
2771            } else if let Some(source) = submission.source.as_ref() {
2772                format!("Failure report from {source}")
2773            } else {
2774                "Failure report".to_string()
2775            }
2776        });
2777
2778        let mut detail_lines = Vec::new();
2779        if let Some(source) = submission.source.as_ref() {
2780            detail_lines.push(format!("source: {source}"));
2781        }
2782        if let Some(file_name) = submission.file_name.as_ref() {
2783            detail_lines.push(format!("file: {file_name}"));
2784        }
2785        if let Some(level) = submission.level.as_ref() {
2786            detail_lines.push(format!("level: {level}"));
2787        }
2788        if let Some(process) = submission.process.as_ref() {
2789            detail_lines.push(format!("process: {process}"));
2790        }
2791        if let Some(component) = submission.component.as_ref() {
2792            detail_lines.push(format!("component: {component}"));
2793        }
2794        if let Some(event) = submission.event.as_ref() {
2795            detail_lines.push(format!("event: {event}"));
2796        }
2797        if let Some(run_id) = submission.run_id.as_ref() {
2798            detail_lines.push(format!("run_id: {run_id}"));
2799        }
2800        if let Some(session_id) = submission.session_id.as_ref() {
2801            detail_lines.push(format!("session_id: {session_id}"));
2802        }
2803        if let Some(correlation_id) = submission.correlation_id.as_ref() {
2804            detail_lines.push(format!("correlation_id: {correlation_id}"));
2805        }
2806        if let Some(detail) = submission.detail.as_ref() {
2807            detail_lines.push(String::new());
2808            detail_lines.push(detail.clone());
2809        }
2810        if !submission.excerpt.is_empty() {
2811            if !detail_lines.is_empty() {
2812                detail_lines.push(String::new());
2813            }
2814            detail_lines.push("excerpt:".to_string());
2815            detail_lines.extend(submission.excerpt.iter().map(|line| format!("  {line}")));
2816        }
2817        let detail = if detail_lines.is_empty() {
2818            None
2819        } else {
2820            Some(detail_lines.join("\n"))
2821        };
2822
2823        let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2824            compute_fingerprint(&[
2825                repo.as_str(),
2826                title.as_str(),
2827                detail.as_deref().unwrap_or(""),
2828                submission.source.as_deref().unwrap_or(""),
2829                submission.run_id.as_deref().unwrap_or(""),
2830                submission.session_id.as_deref().unwrap_or(""),
2831                submission.correlation_id.as_deref().unwrap_or(""),
2832            ])
2833        });
2834
2835        let mut drafts = self.bug_monitor_drafts.write().await;
2836        if let Some(existing) = drafts
2837            .values()
2838            .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2839            .cloned()
2840        {
2841            return Ok(existing);
2842        }
2843
2844        let draft = BugMonitorDraftRecord {
2845            draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2846            fingerprint,
2847            repo,
2848            status: if config.require_approval_for_new_issues {
2849                "approval_required".to_string()
2850            } else {
2851                "draft_ready".to_string()
2852            },
2853            created_at_ms: now_ms(),
2854            triage_run_id: None,
2855            issue_number: None,
2856            title: Some(title),
2857            detail,
2858            github_status: None,
2859            github_issue_url: None,
2860            github_comment_url: None,
2861            github_posted_at_ms: None,
2862            matched_issue_number: None,
2863            matched_issue_state: None,
2864            evidence_digest: None,
2865            last_post_error: None,
2866        };
2867        drafts.insert(draft.draft_id.clone(), draft.clone());
2868        drop(drafts);
2869        self.persist_bug_monitor_drafts().await?;
2870        Ok(draft)
2871    }
2872
2873    pub async fn update_bug_monitor_draft_status(
2874        &self,
2875        draft_id: &str,
2876        next_status: &str,
2877        reason: Option<&str>,
2878    ) -> anyhow::Result<BugMonitorDraftRecord> {
2879        let normalized_status = next_status.trim().to_ascii_lowercase();
2880        if normalized_status != "draft_ready" && normalized_status != "denied" {
2881            anyhow::bail!("unsupported Bug Monitor draft status");
2882        }
2883
2884        let mut drafts = self.bug_monitor_drafts.write().await;
2885        let Some(draft) = drafts.get_mut(draft_id) else {
2886            anyhow::bail!("Bug Monitor draft not found");
2887        };
2888        if !draft.status.eq_ignore_ascii_case("approval_required") {
2889            anyhow::bail!("Bug Monitor draft is not waiting for approval");
2890        }
2891        draft.status = normalized_status.clone();
2892        if let Some(reason) = reason
2893            .map(|value| value.trim())
2894            .filter(|value| !value.is_empty())
2895        {
2896            let next_detail = if let Some(detail) = draft.detail.as_ref() {
2897                format!("{detail}\n\noperator_note: {reason}")
2898            } else {
2899                format!("operator_note: {reason}")
2900            };
2901            draft.detail = Some(next_detail);
2902        }
2903        let updated = draft.clone();
2904        drop(drafts);
2905        self.persist_bug_monitor_drafts().await?;
2906
2907        let event_name = if normalized_status == "draft_ready" {
2908            "bug_monitor.draft.approved"
2909        } else {
2910            "bug_monitor.draft.denied"
2911        };
2912        self.event_bus.publish(EngineEvent::new(
2913            event_name,
2914            serde_json::json!({
2915                "draft_id": updated.draft_id,
2916                "repo": updated.repo,
2917                "status": updated.status,
2918                "reason": reason,
2919            }),
2920        ));
2921        Ok(updated)
2922    }
2923
2924    pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2925        let required_capabilities = vec![
2926            "github.list_issues".to_string(),
2927            "github.get_issue".to_string(),
2928            "github.create_issue".to_string(),
2929            "github.comment_on_issue".to_string(),
2930        ];
2931        let config = self.bug_monitor_config().await;
2932        let drafts = self.bug_monitor_drafts.read().await;
2933        let incidents = self.bug_monitor_incidents.read().await;
2934        let posts = self.bug_monitor_posts.read().await;
2935        let total_incidents = incidents.len();
2936        let pending_incidents = incidents
2937            .values()
2938            .filter(|row| {
2939                matches!(
2940                    row.status.as_str(),
2941                    "queued"
2942                        | "draft_created"
2943                        | "triage_queued"
2944                        | "analysis_queued"
2945                        | "triage_pending"
2946                        | "issue_draft_pending"
2947                )
2948            })
2949            .count();
2950        let pending_drafts = drafts
2951            .values()
2952            .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2953            .count();
2954        let pending_posts = posts
2955            .values()
2956            .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2957            .count();
2958        let last_activity_at_ms = drafts
2959            .values()
2960            .map(|row| row.created_at_ms)
2961            .chain(posts.values().map(|row| row.updated_at_ms))
2962            .max();
2963        drop(drafts);
2964        drop(incidents);
2965        drop(posts);
2966        let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2967        runtime.paused = config.paused;
2968        runtime.total_incidents = total_incidents;
2969        runtime.pending_incidents = pending_incidents;
2970        runtime.pending_posts = pending_posts;
2971
2972        let mut status = BugMonitorStatus {
2973            config: config.clone(),
2974            runtime,
2975            pending_drafts,
2976            pending_posts,
2977            last_activity_at_ms,
2978            ..BugMonitorStatus::default()
2979        };
2980        let repo_valid = config
2981            .repo
2982            .as_ref()
2983            .map(|repo| is_valid_owner_repo_slug(repo))
2984            .unwrap_or(false);
2985        let servers = self.mcp.list().await;
2986        let selected_server = config
2987            .mcp_server
2988            .as_ref()
2989            .and_then(|name| servers.get(name))
2990            .cloned();
2991        let provider_catalog = self.providers.list().await;
2992        let selected_model = config
2993            .model_policy
2994            .as_ref()
2995            .and_then(|policy| policy.get("default_model"))
2996            .and_then(parse_model_spec);
2997        let selected_model_ready = selected_model
2998            .as_ref()
2999            .map(|spec| provider_catalog_has_model(&provider_catalog, spec))
3000            .unwrap_or(false);
3001        let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
3002            self.mcp.server_tools(server_name).await
3003        } else {
3004            Vec::new()
3005        };
3006        let discovered_tools = self
3007            .capability_resolver
3008            .discover_from_runtime(selected_server_tools, Vec::new())
3009            .await;
3010        status.discovered_mcp_tools = discovered_tools
3011            .iter()
3012            .map(|row| row.tool_name.clone())
3013            .collect();
3014        let discovered_providers = discovered_tools
3015            .iter()
3016            .map(|row| row.provider.to_ascii_lowercase())
3017            .collect::<std::collections::HashSet<_>>();
3018        let provider_preference = match config.provider_preference {
3019            BugMonitorProviderPreference::OfficialGithub => {
3020                vec![
3021                    "mcp".to_string(),
3022                    "composio".to_string(),
3023                    "arcade".to_string(),
3024                ]
3025            }
3026            BugMonitorProviderPreference::Composio => {
3027                vec![
3028                    "composio".to_string(),
3029                    "mcp".to_string(),
3030                    "arcade".to_string(),
3031                ]
3032            }
3033            BugMonitorProviderPreference::Arcade => {
3034                vec![
3035                    "arcade".to_string(),
3036                    "mcp".to_string(),
3037                    "composio".to_string(),
3038                ]
3039            }
3040            BugMonitorProviderPreference::Auto => {
3041                vec![
3042                    "mcp".to_string(),
3043                    "composio".to_string(),
3044                    "arcade".to_string(),
3045                ]
3046            }
3047        };
3048        let capability_resolution = self
3049            .capability_resolver
3050            .resolve(
3051                crate::capability_resolver::CapabilityResolveInput {
3052                    workflow_id: Some("bug_monitor".to_string()),
3053                    required_capabilities: required_capabilities.clone(),
3054                    optional_capabilities: Vec::new(),
3055                    provider_preference,
3056                    available_tools: discovered_tools,
3057                },
3058                Vec::new(),
3059            )
3060            .await
3061            .ok();
3062        let bindings_file = self.capability_resolver.list_bindings().await.ok();
3063        if let Some(bindings) = bindings_file.as_ref() {
3064            status.binding_source_version = bindings.builtin_version.clone();
3065            status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
3066            status.selected_server_binding_candidates = bindings
3067                .bindings
3068                .iter()
3069                .filter(|binding| required_capabilities.contains(&binding.capability_id))
3070                .filter(|binding| {
3071                    discovered_providers.is_empty()
3072                        || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
3073                })
3074                .map(|binding| {
3075                    let binding_key = format!(
3076                        "{}::{}",
3077                        binding.capability_id,
3078                        binding.tool_name.to_ascii_lowercase()
3079                    );
3080                    let matched = capability_resolution
3081                        .as_ref()
3082                        .map(|resolution| {
3083                            resolution.resolved.iter().any(|row| {
3084                                row.capability_id == binding.capability_id
3085                                    && format!(
3086                                        "{}::{}",
3087                                        row.capability_id,
3088                                        row.tool_name.to_ascii_lowercase()
3089                                    ) == binding_key
3090                            })
3091                        })
3092                        .unwrap_or(false);
3093                    BugMonitorBindingCandidate {
3094                        capability_id: binding.capability_id.clone(),
3095                        binding_tool_name: binding.tool_name.clone(),
3096                        aliases: binding.tool_name_aliases.clone(),
3097                        matched,
3098                    }
3099                })
3100                .collect();
3101            status.selected_server_binding_candidates.sort_by(|a, b| {
3102                a.capability_id
3103                    .cmp(&b.capability_id)
3104                    .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
3105            });
3106        }
3107        let capability_ready = |capability_id: &str| -> bool {
3108            capability_resolution
3109                .as_ref()
3110                .map(|resolved| {
3111                    resolved
3112                        .resolved
3113                        .iter()
3114                        .any(|row| row.capability_id == capability_id)
3115                })
3116                .unwrap_or(false)
3117        };
3118        if let Some(resolution) = capability_resolution.as_ref() {
3119            status.missing_required_capabilities = resolution.missing_required.clone();
3120            status.resolved_capabilities = resolution
3121                .resolved
3122                .iter()
3123                .map(|row| BugMonitorCapabilityMatch {
3124                    capability_id: row.capability_id.clone(),
3125                    provider: row.provider.clone(),
3126                    tool_name: row.tool_name.clone(),
3127                    binding_index: row.binding_index,
3128                })
3129                .collect();
3130        } else {
3131            status.missing_required_capabilities = required_capabilities.clone();
3132        }
3133        status.required_capabilities = BugMonitorCapabilityReadiness {
3134            github_list_issues: capability_ready("github.list_issues"),
3135            github_get_issue: capability_ready("github.get_issue"),
3136            github_create_issue: capability_ready("github.create_issue"),
3137            github_comment_on_issue: capability_ready("github.comment_on_issue"),
3138        };
3139        status.selected_model = selected_model;
3140        status.readiness = BugMonitorReadiness {
3141            config_valid: repo_valid
3142                && selected_server.is_some()
3143                && status.required_capabilities.github_list_issues
3144                && status.required_capabilities.github_get_issue
3145                && status.required_capabilities.github_create_issue
3146                && status.required_capabilities.github_comment_on_issue
3147                && selected_model_ready,
3148            repo_valid,
3149            mcp_server_present: selected_server.is_some(),
3150            mcp_connected: selected_server
3151                .as_ref()
3152                .map(|row| row.connected)
3153                .unwrap_or(false),
3154            github_read_ready: status.required_capabilities.github_list_issues
3155                && status.required_capabilities.github_get_issue,
3156            github_write_ready: status.required_capabilities.github_create_issue
3157                && status.required_capabilities.github_comment_on_issue,
3158            selected_model_ready,
3159            ingest_ready: config.enabled && !config.paused && repo_valid,
3160            publish_ready: config.enabled
3161                && !config.paused
3162                && repo_valid
3163                && selected_server
3164                    .as_ref()
3165                    .map(|row| row.connected)
3166                    .unwrap_or(false)
3167                && status.required_capabilities.github_list_issues
3168                && status.required_capabilities.github_get_issue
3169                && status.required_capabilities.github_create_issue
3170                && status.required_capabilities.github_comment_on_issue
3171                && selected_model_ready,
3172            runtime_ready: config.enabled
3173                && !config.paused
3174                && repo_valid
3175                && selected_server
3176                    .as_ref()
3177                    .map(|row| row.connected)
3178                    .unwrap_or(false)
3179                && status.required_capabilities.github_list_issues
3180                && status.required_capabilities.github_get_issue
3181                && status.required_capabilities.github_create_issue
3182                && status.required_capabilities.github_comment_on_issue
3183                && selected_model_ready,
3184        };
3185        if config.enabled {
3186            if config.paused {
3187                status.last_error = Some("Bug monitor monitoring is paused.".to_string());
3188            } else if !repo_valid {
3189                status.last_error = Some("Target repo is missing or invalid.".to_string());
3190            } else if selected_server.is_none() {
3191                status.last_error = Some("Selected MCP server is missing.".to_string());
3192            } else if !status.readiness.mcp_connected {
3193                status.last_error = Some("Selected MCP server is disconnected.".to_string());
3194            } else if !selected_model_ready {
3195                status.last_error = Some(
3196                    "Selected provider/model is unavailable. Bug monitor is fail-closed."
3197                        .to_string(),
3198                );
3199            } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
3200                let missing = if status.missing_required_capabilities.is_empty() {
3201                    "unknown".to_string()
3202                } else {
3203                    status.missing_required_capabilities.join(", ")
3204                };
3205                status.last_error = Some(format!(
3206                    "Selected MCP server is missing required GitHub capabilities: {missing}"
3207                ));
3208            }
3209        }
3210        status.runtime.monitoring_active = status.readiness.ingest_ready;
3211        status
3212    }
3213
3214    pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
3215        if !self.workflow_runs_path.exists() {
3216            return Ok(());
3217        }
3218        let raw = fs::read_to_string(&self.workflow_runs_path).await?;
3219        let parsed =
3220            serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
3221                .unwrap_or_default();
3222        *self.workflow_runs.write().await = parsed;
3223        Ok(())
3224    }
3225
3226    pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
3227        if let Some(parent) = self.workflow_runs_path.parent() {
3228            fs::create_dir_all(parent).await?;
3229        }
3230        let payload = {
3231            let guard = self.workflow_runs.read().await;
3232            serde_json::to_string_pretty(&*guard)?
3233        };
3234        fs::write(&self.workflow_runs_path, payload).await?;
3235        Ok(())
3236    }
3237
3238    pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3239        if !self.workflow_hook_overrides_path.exists() {
3240            return Ok(());
3241        }
3242        let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
3243        let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
3244            .unwrap_or_default();
3245        *self.workflow_hook_overrides.write().await = parsed;
3246        Ok(())
3247    }
3248
3249    pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3250        if let Some(parent) = self.workflow_hook_overrides_path.parent() {
3251            fs::create_dir_all(parent).await?;
3252        }
3253        let payload = {
3254            let guard = self.workflow_hook_overrides.read().await;
3255            serde_json::to_string_pretty(&*guard)?
3256        };
3257        fs::write(&self.workflow_hook_overrides_path, payload).await?;
3258        Ok(())
3259    }
3260
3261    pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
3262        let mut sources = Vec::new();
3263        sources.push(WorkflowLoadSource {
3264            root: resolve_builtin_workflows_dir(),
3265            kind: WorkflowSourceKind::BuiltIn,
3266            pack_id: None,
3267        });
3268
3269        let workspace_root = self.workspace_index.snapshot().await.root;
3270        sources.push(WorkflowLoadSource {
3271            root: PathBuf::from(workspace_root).join(".tandem"),
3272            kind: WorkflowSourceKind::Workspace,
3273            pack_id: None,
3274        });
3275
3276        if let Ok(packs) = self.pack_manager.list().await {
3277            for pack in packs {
3278                sources.push(WorkflowLoadSource {
3279                    root: PathBuf::from(pack.install_path),
3280                    kind: WorkflowSourceKind::Pack,
3281                    pack_id: Some(pack.pack_id),
3282                });
3283            }
3284        }
3285
3286        let mut registry = load_workflow_registry(&sources)?;
3287        let overrides = self.workflow_hook_overrides.read().await.clone();
3288        for hook in &mut registry.hooks {
3289            if let Some(enabled) = overrides.get(&hook.binding_id) {
3290                hook.enabled = *enabled;
3291            }
3292        }
3293        for workflow in registry.workflows.values_mut() {
3294            workflow.hooks = registry
3295                .hooks
3296                .iter()
3297                .filter(|hook| hook.workflow_id == workflow.workflow_id)
3298                .cloned()
3299                .collect();
3300        }
3301        let messages = validate_workflow_registry(&registry);
3302        *self.workflows.write().await = registry;
3303        Ok(messages)
3304    }
3305
3306    pub async fn workflow_registry(&self) -> WorkflowRegistry {
3307        self.workflows.read().await.clone()
3308    }
3309
3310    pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
3311        let mut rows = self
3312            .workflows
3313            .read()
3314            .await
3315            .workflows
3316            .values()
3317            .cloned()
3318            .collect::<Vec<_>>();
3319        rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
3320        rows
3321    }
3322
3323    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
3324        self.workflows
3325            .read()
3326            .await
3327            .workflows
3328            .get(workflow_id)
3329            .cloned()
3330    }
3331
3332    pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
3333        let mut rows = self
3334            .workflows
3335            .read()
3336            .await
3337            .hooks
3338            .iter()
3339            .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
3340            .cloned()
3341            .collect::<Vec<_>>();
3342        rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
3343        rows
3344    }
3345
3346    pub async fn set_workflow_hook_enabled(
3347        &self,
3348        binding_id: &str,
3349        enabled: bool,
3350    ) -> anyhow::Result<Option<WorkflowHookBinding>> {
3351        self.workflow_hook_overrides
3352            .write()
3353            .await
3354            .insert(binding_id.to_string(), enabled);
3355        self.persist_workflow_hook_overrides().await?;
3356        let _ = self.reload_workflows().await?;
3357        Ok(self
3358            .workflows
3359            .read()
3360            .await
3361            .hooks
3362            .iter()
3363            .find(|hook| hook.binding_id == binding_id)
3364            .cloned())
3365    }
3366
3367    pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
3368        self.workflow_runs
3369            .write()
3370            .await
3371            .insert(run.run_id.clone(), run);
3372        self.persist_workflow_runs().await
3373    }
3374
3375    pub async fn update_workflow_run(
3376        &self,
3377        run_id: &str,
3378        update: impl FnOnce(&mut WorkflowRunRecord),
3379    ) -> Option<WorkflowRunRecord> {
3380        let mut guard = self.workflow_runs.write().await;
3381        let row = guard.get_mut(run_id)?;
3382        update(row);
3383        row.updated_at_ms = now_ms();
3384        if matches!(
3385            row.status,
3386            WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
3387        ) {
3388            row.finished_at_ms.get_or_insert_with(now_ms);
3389        }
3390        let out = row.clone();
3391        drop(guard);
3392        let _ = self.persist_workflow_runs().await;
3393        Some(out)
3394    }
3395
3396    pub async fn list_workflow_runs(
3397        &self,
3398        workflow_id: Option<&str>,
3399        limit: usize,
3400    ) -> Vec<WorkflowRunRecord> {
3401        let mut rows = self
3402            .workflow_runs
3403            .read()
3404            .await
3405            .values()
3406            .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
3407            .cloned()
3408            .collect::<Vec<_>>();
3409        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3410        rows.truncate(limit.clamp(1, 500));
3411        rows
3412    }
3413
3414    pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
3415        self.workflow_runs.read().await.get(run_id).cloned()
3416    }
3417
3418    pub async fn put_automation_v2(
3419        &self,
3420        mut automation: AutomationV2Spec,
3421    ) -> anyhow::Result<AutomationV2Spec> {
3422        if automation.automation_id.trim().is_empty() {
3423            anyhow::bail!("automation_id is required");
3424        }
3425        for agent in &mut automation.agents {
3426            if agent.display_name.trim().is_empty() {
3427                agent.display_name = auto_generated_agent_name(&agent.agent_id);
3428            }
3429            agent.tool_policy.allowlist =
3430                normalize_allowed_tools(agent.tool_policy.allowlist.clone());
3431            agent.tool_policy.denylist =
3432                normalize_allowed_tools(agent.tool_policy.denylist.clone());
3433            agent.mcp_policy.allowed_servers =
3434                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
3435            agent.mcp_policy.allowed_tools = agent
3436                .mcp_policy
3437                .allowed_tools
3438                .take()
3439                .map(normalize_allowed_tools);
3440        }
3441        let now = now_ms();
3442        if automation.created_at_ms == 0 {
3443            automation.created_at_ms = now;
3444        }
3445        automation.updated_at_ms = now;
3446        if automation.next_fire_at_ms.is_none() {
3447            automation.next_fire_at_ms =
3448                automation_schedule_next_fire_at_ms(&automation.schedule, now);
3449        }
3450        self.automations_v2
3451            .write()
3452            .await
3453            .insert(automation.automation_id.clone(), automation.clone());
3454        self.persist_automations_v2().await?;
3455        self.verify_automation_v2_persisted(&automation.automation_id, true)
3456            .await?;
3457        Ok(automation)
3458    }
3459
3460    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
3461        self.automations_v2.read().await.get(automation_id).cloned()
3462    }
3463
3464    pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
3465        self.workflow_plans
3466            .write()
3467            .await
3468            .insert(plan.plan_id.clone(), plan);
3469    }
3470
3471    pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
3472        self.workflow_plans.read().await.get(plan_id).cloned()
3473    }
3474
3475    pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
3476        self.workflow_plan_drafts
3477            .write()
3478            .await
3479            .insert(draft.current_plan.plan_id.clone(), draft.clone());
3480        self.put_workflow_plan(draft.current_plan).await;
3481    }
3482
3483    pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
3484        self.workflow_plan_drafts.read().await.get(plan_id).cloned()
3485    }
3486
3487    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3488        let mut rows = self
3489            .automations_v2
3490            .read()
3491            .await
3492            .values()
3493            .cloned()
3494            .collect::<Vec<_>>();
3495        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3496        rows
3497    }
3498
3499    pub async fn delete_automation_v2(
3500        &self,
3501        automation_id: &str,
3502    ) -> anyhow::Result<Option<AutomationV2Spec>> {
3503        let removed = self.automations_v2.write().await.remove(automation_id);
3504        self.persist_automations_v2().await?;
3505        self.verify_automation_v2_persisted(automation_id, false)
3506            .await?;
3507        Ok(removed)
3508    }
3509
3510    pub async fn create_automation_v2_run(
3511        &self,
3512        automation: &AutomationV2Spec,
3513        trigger_type: &str,
3514    ) -> anyhow::Result<AutomationV2RunRecord> {
3515        let now = now_ms();
3516        let pending_nodes = automation
3517            .flow
3518            .nodes
3519            .iter()
3520            .map(|n| n.node_id.clone())
3521            .collect::<Vec<_>>();
3522        let run = AutomationV2RunRecord {
3523            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
3524            automation_id: automation.automation_id.clone(),
3525            trigger_type: trigger_type.to_string(),
3526            status: AutomationRunStatus::Queued,
3527            created_at_ms: now,
3528            updated_at_ms: now,
3529            started_at_ms: None,
3530            finished_at_ms: None,
3531            active_session_ids: Vec::new(),
3532            active_instance_ids: Vec::new(),
3533            checkpoint: AutomationRunCheckpoint {
3534                completed_nodes: Vec::new(),
3535                pending_nodes,
3536                node_outputs: std::collections::HashMap::new(),
3537                node_attempts: std::collections::HashMap::new(),
3538            },
3539            automation_snapshot: Some(automation.clone()),
3540            pause_reason: None,
3541            resume_reason: None,
3542            detail: None,
3543            prompt_tokens: 0,
3544            completion_tokens: 0,
3545            total_tokens: 0,
3546            estimated_cost_usd: 0.0,
3547        };
3548        self.automation_v2_runs
3549            .write()
3550            .await
3551            .insert(run.run_id.clone(), run.clone());
3552        self.persist_automation_v2_runs().await?;
3553        Ok(run)
3554    }
3555
3556    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
3557        self.automation_v2_runs.read().await.get(run_id).cloned()
3558    }
3559
3560    pub async fn list_automation_v2_runs(
3561        &self,
3562        automation_id: Option<&str>,
3563        limit: usize,
3564    ) -> Vec<AutomationV2RunRecord> {
3565        let mut rows = self
3566            .automation_v2_runs
3567            .read()
3568            .await
3569            .values()
3570            .filter(|row| {
3571                if let Some(id) = automation_id {
3572                    row.automation_id == id
3573                } else {
3574                    true
3575                }
3576            })
3577            .cloned()
3578            .collect::<Vec<_>>();
3579        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3580        rows.truncate(limit.clamp(1, 500));
3581        rows
3582    }
3583
3584    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
3585        let mut guard = self.automation_v2_runs.write().await;
3586        let run_id = guard
3587            .values()
3588            .filter(|row| row.status == AutomationRunStatus::Queued)
3589            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
3590            .map(|row| row.run_id.clone())?;
3591        let now = now_ms();
3592        let run = guard.get_mut(&run_id)?;
3593        run.status = AutomationRunStatus::Running;
3594        run.updated_at_ms = now;
3595        run.started_at_ms.get_or_insert(now);
3596        let claimed = run.clone();
3597        drop(guard);
3598        let _ = self.persist_automation_v2_runs().await;
3599        Some(claimed)
3600    }
3601
3602    pub async fn update_automation_v2_run(
3603        &self,
3604        run_id: &str,
3605        update: impl FnOnce(&mut AutomationV2RunRecord),
3606    ) -> Option<AutomationV2RunRecord> {
3607        let mut guard = self.automation_v2_runs.write().await;
3608        let run = guard.get_mut(run_id)?;
3609        update(run);
3610        run.updated_at_ms = now_ms();
3611        if matches!(
3612            run.status,
3613            AutomationRunStatus::Completed
3614                | AutomationRunStatus::Failed
3615                | AutomationRunStatus::Cancelled
3616        ) {
3617            run.finished_at_ms.get_or_insert_with(now_ms);
3618        }
3619        let out = run.clone();
3620        drop(guard);
3621        let _ = self.persist_automation_v2_runs().await;
3622        Some(out)
3623    }
3624
3625    pub async fn add_automation_v2_session(
3626        &self,
3627        run_id: &str,
3628        session_id: &str,
3629    ) -> Option<AutomationV2RunRecord> {
3630        let updated = self
3631            .update_automation_v2_run(run_id, |row| {
3632                if !row.active_session_ids.iter().any(|id| id == session_id) {
3633                    row.active_session_ids.push(session_id.to_string());
3634                }
3635            })
3636            .await;
3637        self.automation_v2_session_runs
3638            .write()
3639            .await
3640            .insert(session_id.to_string(), run_id.to_string());
3641        updated
3642    }
3643
3644    pub async fn clear_automation_v2_session(
3645        &self,
3646        run_id: &str,
3647        session_id: &str,
3648    ) -> Option<AutomationV2RunRecord> {
3649        self.automation_v2_session_runs
3650            .write()
3651            .await
3652            .remove(session_id);
3653        self.update_automation_v2_run(run_id, |row| {
3654            row.active_session_ids.retain(|id| id != session_id);
3655        })
3656        .await
3657    }
3658
3659    pub async fn apply_provider_usage_to_runs(
3660        &self,
3661        session_id: &str,
3662        prompt_tokens: u64,
3663        completion_tokens: u64,
3664        total_tokens: u64,
3665    ) {
3666        if let Some(policy) = self.routine_session_policy(session_id).await {
3667            let rate = self.token_cost_per_1k_usd.max(0.0);
3668            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3669            let mut guard = self.routine_runs.write().await;
3670            if let Some(run) = guard.get_mut(&policy.run_id) {
3671                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3672                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3673                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3674                run.estimated_cost_usd += delta_cost;
3675                run.updated_at_ms = now_ms();
3676            }
3677            drop(guard);
3678            let _ = self.persist_routine_runs().await;
3679        }
3680
3681        let maybe_v2_run_id = self
3682            .automation_v2_session_runs
3683            .read()
3684            .await
3685            .get(session_id)
3686            .cloned();
3687        if let Some(run_id) = maybe_v2_run_id {
3688            let rate = self.token_cost_per_1k_usd.max(0.0);
3689            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3690            let mut guard = self.automation_v2_runs.write().await;
3691            if let Some(run) = guard.get_mut(&run_id) {
3692                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3693                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3694                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3695                run.estimated_cost_usd += delta_cost;
3696                run.updated_at_ms = now_ms();
3697            }
3698            drop(guard);
3699            let _ = self.persist_automation_v2_runs().await;
3700        }
3701    }
3702
3703    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
3704        let mut fired = Vec::new();
3705        let mut guard = self.automations_v2.write().await;
3706        for automation in guard.values_mut() {
3707            if automation.status != AutomationV2Status::Active {
3708                continue;
3709            }
3710            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
3711                automation.next_fire_at_ms =
3712                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3713                continue;
3714            };
3715            if now_ms < next_fire_at_ms {
3716                continue;
3717            }
3718            let run_count =
3719                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
3720            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3721            automation.next_fire_at_ms = next;
3722            automation.last_fired_at_ms = Some(now_ms);
3723            for _ in 0..run_count {
3724                fired.push(automation.automation_id.clone());
3725            }
3726        }
3727        drop(guard);
3728        let _ = self.persist_automations_v2().await;
3729        fired
3730    }
3731}
3732
3733async fn build_channels_config(
3734    state: &AppState,
3735    channels: &ChannelsConfigFile,
3736) -> Option<ChannelsConfig> {
3737    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
3738        return None;
3739    }
3740    Some(ChannelsConfig {
3741        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
3742            bot_token: cfg.bot_token,
3743            allowed_users: cfg.allowed_users,
3744            mention_only: cfg.mention_only,
3745            style_profile: cfg.style_profile,
3746        }),
3747        discord: channels.discord.clone().map(|cfg| DiscordConfig {
3748            bot_token: cfg.bot_token,
3749            guild_id: cfg.guild_id,
3750            allowed_users: cfg.allowed_users,
3751            mention_only: cfg.mention_only,
3752        }),
3753        slack: channels.slack.clone().map(|cfg| SlackConfig {
3754            bot_token: cfg.bot_token,
3755            channel_id: cfg.channel_id,
3756            allowed_users: cfg.allowed_users,
3757            mention_only: cfg.mention_only,
3758        }),
3759        server_base_url: state.server_base_url(),
3760        api_token: state.api_token().await.unwrap_or_default(),
3761        tool_policy: channels.tool_policy.clone(),
3762    })
3763}
3764
3765fn normalize_web_ui_prefix(prefix: &str) -> String {
3766    let trimmed = prefix.trim();
3767    if trimmed.is_empty() || trimmed == "/" {
3768        return "/admin".to_string();
3769    }
3770    let with_leading = if trimmed.starts_with('/') {
3771        trimmed.to_string()
3772    } else {
3773        format!("/{trimmed}")
3774    };
3775    with_leading.trim_end_matches('/').to_string()
3776}
3777
3778fn default_web_ui_prefix() -> String {
3779    "/admin".to_string()
3780}
3781
3782fn default_allow_all() -> Vec<String> {
3783    vec!["*".to_string()]
3784}
3785
3786fn default_discord_mention_only() -> bool {
3787    true
3788}
3789
3790fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
3791    normalize_non_empty_list(raw)
3792}
3793
3794fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
3795    let mut out = Vec::new();
3796    let mut seen = std::collections::HashSet::new();
3797    for item in raw {
3798        let normalized = item.trim().to_string();
3799        if normalized.is_empty() {
3800            continue;
3801        }
3802        if seen.insert(normalized.clone()) {
3803            out.push(normalized);
3804        }
3805    }
3806    out
3807}
3808
3809fn resolve_run_stale_ms() -> u64 {
3810    std::env::var("TANDEM_RUN_STALE_MS")
3811        .ok()
3812        .and_then(|v| v.trim().parse::<u64>().ok())
3813        .unwrap_or(120_000)
3814        .clamp(30_000, 600_000)
3815}
3816
3817fn resolve_token_cost_per_1k_usd() -> f64 {
3818    std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
3819        .ok()
3820        .and_then(|v| v.trim().parse::<f64>().ok())
3821        .unwrap_or(0.0)
3822        .max(0.0)
3823}
3824
3825fn default_true() -> bool {
3826    true
3827}
3828
3829fn parse_bool_env(key: &str, default: bool) -> bool {
3830    std::env::var(key)
3831        .ok()
3832        .map(|raw| {
3833            matches!(
3834                raw.trim().to_ascii_lowercase().as_str(),
3835                "1" | "true" | "yes" | "on"
3836            )
3837        })
3838        .unwrap_or(default)
3839}
3840
3841fn resolve_bug_monitor_env_config() -> BugMonitorConfig {
3842    fn env_value(new_name: &str, legacy_name: &str) -> Option<String> {
3843        std::env::var(new_name)
3844            .ok()
3845            .or_else(|| std::env::var(legacy_name).ok())
3846            .map(|v| v.trim().to_string())
3847            .filter(|v| !v.is_empty())
3848    }
3849
3850    fn env_bool(new_name: &str, legacy_name: &str, default: bool) -> bool {
3851        env_value(new_name, legacy_name)
3852            .map(|value| parse_bool_like(&value, default))
3853            .unwrap_or(default)
3854    }
3855
3856    fn parse_bool_like(value: &str, default: bool) -> bool {
3857        match value.trim().to_ascii_lowercase().as_str() {
3858            "1" | "true" | "yes" | "on" => true,
3859            "0" | "false" | "no" | "off" => false,
3860            _ => default,
3861        }
3862    }
3863
3864    let provider_preference = match env_value(
3865        "TANDEM_BUG_MONITOR_PROVIDER_PREFERENCE",
3866        "TANDEM_FAILURE_REPORTER_PROVIDER_PREFERENCE",
3867    )
3868    .unwrap_or_default()
3869    .trim()
3870    .to_ascii_lowercase()
3871    .as_str()
3872    {
3873        "official_github" | "official-github" | "github" => {
3874            BugMonitorProviderPreference::OfficialGithub
3875        }
3876        "composio" => BugMonitorProviderPreference::Composio,
3877        "arcade" => BugMonitorProviderPreference::Arcade,
3878        _ => BugMonitorProviderPreference::Auto,
3879    };
3880    let provider_id = env_value(
3881        "TANDEM_BUG_MONITOR_PROVIDER_ID",
3882        "TANDEM_FAILURE_REPORTER_PROVIDER_ID",
3883    );
3884    let model_id = env_value(
3885        "TANDEM_BUG_MONITOR_MODEL_ID",
3886        "TANDEM_FAILURE_REPORTER_MODEL_ID",
3887    );
3888    let model_policy = match (provider_id, model_id) {
3889        (Some(provider_id), Some(model_id)) => Some(json!({
3890            "default_model": {
3891                "provider_id": provider_id,
3892                "model_id": model_id,
3893            }
3894        })),
3895        _ => None,
3896    };
3897    BugMonitorConfig {
3898        enabled: env_bool(
3899            "TANDEM_BUG_MONITOR_ENABLED",
3900            "TANDEM_FAILURE_REPORTER_ENABLED",
3901            false,
3902        ),
3903        paused: env_bool(
3904            "TANDEM_BUG_MONITOR_PAUSED",
3905            "TANDEM_FAILURE_REPORTER_PAUSED",
3906            false,
3907        ),
3908        workspace_root: env_value(
3909            "TANDEM_BUG_MONITOR_WORKSPACE_ROOT",
3910            "TANDEM_FAILURE_REPORTER_WORKSPACE_ROOT",
3911        ),
3912        repo: env_value("TANDEM_BUG_MONITOR_REPO", "TANDEM_FAILURE_REPORTER_REPO"),
3913        mcp_server: env_value(
3914            "TANDEM_BUG_MONITOR_MCP_SERVER",
3915            "TANDEM_FAILURE_REPORTER_MCP_SERVER",
3916        ),
3917        provider_preference,
3918        model_policy,
3919        auto_create_new_issues: env_bool(
3920            "TANDEM_BUG_MONITOR_AUTO_CREATE_NEW_ISSUES",
3921            "TANDEM_FAILURE_REPORTER_AUTO_CREATE_NEW_ISSUES",
3922            true,
3923        ),
3924        require_approval_for_new_issues: env_bool(
3925            "TANDEM_BUG_MONITOR_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3926            "TANDEM_FAILURE_REPORTER_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3927            false,
3928        ),
3929        auto_comment_on_matched_open_issues: env_bool(
3930            "TANDEM_BUG_MONITOR_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3931            "TANDEM_FAILURE_REPORTER_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3932            true,
3933        ),
3934        label_mode: BugMonitorLabelMode::ReporterOnly,
3935        updated_at_ms: 0,
3936    }
3937}
3938
3939fn is_valid_owner_repo_slug(value: &str) -> bool {
3940    let trimmed = value.trim();
3941    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
3942        return false;
3943    }
3944    let mut parts = trimmed.split('/');
3945    let Some(owner) = parts.next() else {
3946        return false;
3947    };
3948    let Some(repo) = parts.next() else {
3949        return false;
3950    };
3951    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
3952}
3953
3954fn resolve_shared_resources_path() -> PathBuf {
3955    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3956        let trimmed = dir.trim();
3957        if !trimmed.is_empty() {
3958            return PathBuf::from(trimmed).join("shared_resources.json");
3959        }
3960    }
3961    default_state_dir().join("shared_resources.json")
3962}
3963
3964fn resolve_routines_path() -> PathBuf {
3965    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3966        let trimmed = dir.trim();
3967        if !trimmed.is_empty() {
3968            return PathBuf::from(trimmed).join("routines.json");
3969        }
3970    }
3971    default_state_dir().join("routines.json")
3972}
3973
3974fn resolve_routine_history_path() -> PathBuf {
3975    if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
3976        let trimmed = root.trim();
3977        if !trimmed.is_empty() {
3978            return PathBuf::from(trimmed).join("routine_history.json");
3979        }
3980    }
3981    default_state_dir().join("routine_history.json")
3982}
3983
3984fn resolve_routine_runs_path() -> PathBuf {
3985    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
3986        let trimmed = root.trim();
3987        if !trimmed.is_empty() {
3988            return PathBuf::from(trimmed).join("routine_runs.json");
3989        }
3990    }
3991    default_state_dir().join("routine_runs.json")
3992}
3993
3994fn resolve_automations_v2_path() -> PathBuf {
3995    resolve_canonical_data_file_path("automations_v2.json")
3996}
3997
3998fn legacy_automations_v2_path() -> Option<PathBuf> {
3999    resolve_legacy_root_file_path("automations_v2.json")
4000        .filter(|path| path != &resolve_automations_v2_path())
4001}
4002
4003fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4004    let mut candidates = vec![active_path.clone()];
4005    if let Some(legacy_path) = legacy_automations_v2_path() {
4006        if !candidates.contains(&legacy_path) {
4007            candidates.push(legacy_path);
4008        }
4009    }
4010    let default_path = default_state_dir().join("automations_v2.json");
4011    if !candidates.contains(&default_path) {
4012        candidates.push(default_path);
4013    }
4014    candidates
4015}
4016
4017fn resolve_automation_v2_runs_path() -> PathBuf {
4018    resolve_canonical_data_file_path("automation_v2_runs.json")
4019}
4020
4021fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
4022    resolve_legacy_root_file_path("automation_v2_runs.json")
4023        .filter(|path| path != &resolve_automation_v2_runs_path())
4024}
4025
4026fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4027    let mut candidates = vec![active_path.clone()];
4028    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
4029        if !candidates.contains(&legacy_path) {
4030            candidates.push(legacy_path);
4031        }
4032    }
4033    let default_path = default_state_dir().join("automation_v2_runs.json");
4034    if !candidates.contains(&default_path) {
4035        candidates.push(default_path);
4036    }
4037    candidates
4038}
4039
4040fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4041    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4042        .unwrap_or_default()
4043}
4044
4045fn parse_automation_v2_runs_file(
4046    raw: &str,
4047) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4048    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4049        .unwrap_or_default()
4050}
4051
4052fn resolve_canonical_data_file_path(file_name: &str) -> PathBuf {
4053    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4054        let trimmed = root.trim();
4055        if !trimmed.is_empty() {
4056            let base = PathBuf::from(trimmed);
4057            return if path_is_data_dir(&base) {
4058                base.join(file_name)
4059            } else {
4060                base.join("data").join(file_name)
4061            };
4062        }
4063    }
4064    default_state_dir().join(file_name)
4065}
4066
4067fn resolve_legacy_root_file_path(file_name: &str) -> Option<PathBuf> {
4068    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4069        let trimmed = root.trim();
4070        if !trimmed.is_empty() {
4071            let base = PathBuf::from(trimmed);
4072            if !path_is_data_dir(&base) {
4073                return Some(base.join(file_name));
4074            }
4075        }
4076    }
4077    resolve_shared_paths()
4078        .ok()
4079        .map(|paths| paths.canonical_root.join(file_name))
4080}
4081
4082fn path_is_data_dir(path: &std::path::Path) -> bool {
4083    path.file_name()
4084        .and_then(|value| value.to_str())
4085        .map(|value| value.eq_ignore_ascii_case("data"))
4086        .unwrap_or(false)
4087}
4088
4089fn resolve_workflow_runs_path() -> PathBuf {
4090    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4091        let trimmed = root.trim();
4092        if !trimmed.is_empty() {
4093            return PathBuf::from(trimmed).join("workflow_runs.json");
4094        }
4095    }
4096    default_state_dir().join("workflow_runs.json")
4097}
4098
4099fn resolve_bug_monitor_config_path() -> PathBuf {
4100    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4101        let trimmed = root.trim();
4102        if !trimmed.is_empty() {
4103            return PathBuf::from(trimmed).join("bug_monitor_config.json");
4104        }
4105    }
4106    default_state_dir().join("bug_monitor_config.json")
4107}
4108
4109fn resolve_bug_monitor_drafts_path() -> PathBuf {
4110    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4111        let trimmed = root.trim();
4112        if !trimmed.is_empty() {
4113            return PathBuf::from(trimmed).join("bug_monitor_drafts.json");
4114        }
4115    }
4116    default_state_dir().join("bug_monitor_drafts.json")
4117}
4118
4119fn resolve_bug_monitor_incidents_path() -> PathBuf {
4120    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4121        let trimmed = root.trim();
4122        if !trimmed.is_empty() {
4123            return PathBuf::from(trimmed).join("bug_monitor_incidents.json");
4124        }
4125    }
4126    default_state_dir().join("bug_monitor_incidents.json")
4127}
4128
4129fn resolve_bug_monitor_posts_path() -> PathBuf {
4130    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4131        let trimmed = root.trim();
4132        if !trimmed.is_empty() {
4133            return PathBuf::from(trimmed).join("bug_monitor_posts.json");
4134        }
4135    }
4136    default_state_dir().join("bug_monitor_posts.json")
4137}
4138
4139fn legacy_failure_reporter_path(file_name: &str) -> PathBuf {
4140    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4141        let trimmed = root.trim();
4142        if !trimmed.is_empty() {
4143            return PathBuf::from(trimmed).join(file_name);
4144        }
4145    }
4146    default_state_dir().join(file_name)
4147}
4148
4149fn resolve_workflow_hook_overrides_path() -> PathBuf {
4150    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4151        let trimmed = root.trim();
4152        if !trimmed.is_empty() {
4153            return PathBuf::from(trimmed).join("workflow_hook_overrides.json");
4154        }
4155    }
4156    default_state_dir().join("workflow_hook_overrides.json")
4157}
4158
4159fn resolve_builtin_workflows_dir() -> PathBuf {
4160    if let Ok(root) = std::env::var("TANDEM_BUILTIN_WORKFLOW_DIR") {
4161        let trimmed = root.trim();
4162        if !trimmed.is_empty() {
4163            return PathBuf::from(trimmed);
4164        }
4165    }
4166    default_state_dir().join("builtin_workflows")
4167}
4168
4169fn resolve_agent_team_audit_path() -> PathBuf {
4170    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
4171        let trimmed = base.trim();
4172        if !trimmed.is_empty() {
4173            return PathBuf::from(trimmed)
4174                .join("agent-team")
4175                .join("audit.log.jsonl");
4176        }
4177    }
4178    default_state_dir()
4179        .join("agent-team")
4180        .join("audit.log.jsonl")
4181}
4182
4183fn default_state_dir() -> PathBuf {
4184    if let Ok(paths) = resolve_shared_paths() {
4185        return paths.engine_state_dir;
4186    }
4187    if let Some(data_dir) = dirs::data_dir() {
4188        return data_dir.join("tandem").join("data");
4189    }
4190    dirs::home_dir()
4191        .map(|home| home.join(".tandem").join("data"))
4192        .unwrap_or_else(|| PathBuf::from(".tandem"))
4193}
4194
4195fn sibling_backup_path(path: &PathBuf) -> PathBuf {
4196    let base = path
4197        .file_name()
4198        .and_then(|name| name.to_str())
4199        .unwrap_or("state.json");
4200    let backup_name = format!("{base}.bak");
4201    path.with_file_name(backup_name)
4202}
4203
4204fn sibling_tmp_path(path: &PathBuf) -> PathBuf {
4205    let base = path
4206        .file_name()
4207        .and_then(|name| name.to_str())
4208        .unwrap_or("state.json");
4209    let tmp_name = format!("{base}.tmp");
4210    path.with_file_name(tmp_name)
4211}
4212
4213fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4214    match schedule {
4215        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4216        RoutineSchedule::Cron { .. } => None,
4217    }
4218}
4219
4220fn parse_timezone(timezone: &str) -> Option<Tz> {
4221    timezone.trim().parse::<Tz>().ok()
4222}
4223
4224fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4225    let tz = parse_timezone(timezone)?;
4226    let schedule = Schedule::from_str(expression).ok()?;
4227    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4228    let local_from = from_dt.with_timezone(&tz);
4229    let next = schedule.after(&local_from).next()?;
4230    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4231}
4232
4233fn compute_next_schedule_fire_at_ms(
4234    schedule: &RoutineSchedule,
4235    timezone: &str,
4236    from_ms: u64,
4237) -> Option<u64> {
4238    let _ = parse_timezone(timezone)?;
4239    match schedule {
4240        RoutineSchedule::IntervalSeconds { seconds } => {
4241            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4242        }
4243        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4244    }
4245}
4246
4247fn compute_misfire_plan_for_schedule(
4248    now_ms: u64,
4249    next_fire_at_ms: u64,
4250    schedule: &RoutineSchedule,
4251    timezone: &str,
4252    policy: &RoutineMisfirePolicy,
4253) -> (u32, u64) {
4254    match schedule {
4255        RoutineSchedule::IntervalSeconds { .. } => {
4256            let Some(interval_ms) = routine_interval_ms(schedule) else {
4257                return (0, next_fire_at_ms);
4258            };
4259            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4260        }
4261        RoutineSchedule::Cron { expression } => {
4262            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4263                .unwrap_or_else(|| now_ms.saturating_add(60_000));
4264            match policy {
4265                RoutineMisfirePolicy::Skip => (0, aligned_next),
4266                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4267                RoutineMisfirePolicy::CatchUp { max_runs } => {
4268                    let mut count = 0u32;
4269                    let mut cursor = next_fire_at_ms;
4270                    while cursor <= now_ms && count < *max_runs {
4271                        count = count.saturating_add(1);
4272                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4273                            break;
4274                        };
4275                        if next <= cursor {
4276                            break;
4277                        }
4278                        cursor = next;
4279                    }
4280                    (count, aligned_next)
4281                }
4282            }
4283        }
4284    }
4285}
4286
4287fn compute_misfire_plan(
4288    now_ms: u64,
4289    next_fire_at_ms: u64,
4290    interval_ms: u64,
4291    policy: &RoutineMisfirePolicy,
4292) -> (u32, u64) {
4293    if now_ms < next_fire_at_ms || interval_ms == 0 {
4294        return (0, next_fire_at_ms);
4295    }
4296    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4297    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4298    match policy {
4299        RoutineMisfirePolicy::Skip => (0, aligned_next),
4300        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4301        RoutineMisfirePolicy::CatchUp { max_runs } => {
4302            let count = missed.min(u64::from(*max_runs)) as u32;
4303            (count, aligned_next)
4304        }
4305    }
4306}
4307
4308fn auto_generated_agent_name(agent_id: &str) -> String {
4309    let names = [
4310        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4311    ];
4312    let digest = Sha256::digest(agent_id.as_bytes());
4313    let idx = usize::from(digest[0]) % names.len();
4314    format!("{}-{:02x}", names[idx], digest[1])
4315}
4316
4317fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4318    match schedule.schedule_type {
4319        AutomationV2ScheduleType::Manual => None,
4320        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4321            seconds: schedule.interval_seconds.unwrap_or(60),
4322        }),
4323        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4324            expression: schedule.cron_expression.clone().unwrap_or_default(),
4325        }),
4326    }
4327}
4328
4329fn automation_schedule_next_fire_at_ms(
4330    schedule: &AutomationV2Schedule,
4331    from_ms: u64,
4332) -> Option<u64> {
4333    let routine_schedule = schedule_from_automation_v2(schedule)?;
4334    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4335}
4336
4337fn automation_schedule_due_count(
4338    schedule: &AutomationV2Schedule,
4339    now_ms: u64,
4340    next_fire_at_ms: u64,
4341) -> u32 {
4342    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4343        return 0;
4344    };
4345    let (count, _) = compute_misfire_plan_for_schedule(
4346        now_ms,
4347        next_fire_at_ms,
4348        &routine_schedule,
4349        &schedule.timezone,
4350        &schedule.misfire_policy,
4351    );
4352    count.max(1)
4353}
4354
4355#[derive(Debug, Clone, PartialEq, Eq)]
4356pub enum RoutineExecutionDecision {
4357    Allowed,
4358    RequiresApproval { reason: String },
4359    Blocked { reason: String },
4360}
4361
4362pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4363    let entrypoint = routine.entrypoint.to_ascii_lowercase();
4364    if entrypoint.starts_with("connector.")
4365        || entrypoint.starts_with("integration.")
4366        || entrypoint.contains("external")
4367    {
4368        return true;
4369    }
4370    routine
4371        .args
4372        .get("uses_external_integrations")
4373        .and_then(|v| v.as_bool())
4374        .unwrap_or(false)
4375        || routine
4376            .args
4377            .get("connector_id")
4378            .and_then(|v| v.as_str())
4379            .is_some()
4380}
4381
4382pub fn evaluate_routine_execution_policy(
4383    routine: &RoutineSpec,
4384    trigger_type: &str,
4385) -> RoutineExecutionDecision {
4386    if !routine_uses_external_integrations(routine) {
4387        return RoutineExecutionDecision::Allowed;
4388    }
4389    if !routine.external_integrations_allowed {
4390        return RoutineExecutionDecision::Blocked {
4391            reason: "external integrations are disabled by policy".to_string(),
4392        };
4393    }
4394    if routine.requires_approval {
4395        return RoutineExecutionDecision::RequiresApproval {
4396            reason: format!(
4397                "manual approval required before external side effects ({})",
4398                trigger_type
4399            ),
4400        };
4401    }
4402    RoutineExecutionDecision::Allowed
4403}
4404
4405fn is_valid_resource_key(key: &str) -> bool {
4406    let trimmed = key.trim();
4407    if trimmed.is_empty() {
4408        return false;
4409    }
4410    if trimmed == "swarm.active_tasks" {
4411        return true;
4412    }
4413    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4414    if !allowed_prefix
4415        .iter()
4416        .any(|prefix| trimmed.starts_with(prefix))
4417    {
4418        return false;
4419    }
4420    !trimmed.contains("//")
4421}
4422
4423impl Deref for AppState {
4424    type Target = RuntimeState;
4425
4426    fn deref(&self) -> &Self::Target {
4427        self.runtime
4428            .get()
4429            .expect("runtime accessed before startup completion")
4430    }
4431}
4432
4433#[derive(Clone)]
4434struct ServerPromptContextHook {
4435    state: AppState,
4436}
4437
4438impl ServerPromptContextHook {
4439    fn new(state: AppState) -> Self {
4440        Self { state }
4441    }
4442
4443    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4444        let paths = resolve_shared_paths().ok()?;
4445        MemoryDatabase::new(&paths.memory_db_path).await.ok()
4446    }
4447
4448    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4449        let paths = resolve_shared_paths().ok()?;
4450        tandem_memory::MemoryManager::new(&paths.memory_db_path)
4451            .await
4452            .ok()
4453    }
4454
4455    fn hash_query(input: &str) -> String {
4456        let mut hasher = Sha256::new();
4457        hasher.update(input.as_bytes());
4458        format!("{:x}", hasher.finalize())
4459    }
4460
4461    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4462        let mut out = vec!["<memory_context>".to_string()];
4463        let mut used = 0usize;
4464        for hit in hits {
4465            let text = hit
4466                .record
4467                .content
4468                .split_whitespace()
4469                .take(60)
4470                .collect::<Vec<_>>()
4471                .join(" ");
4472            let line = format!(
4473                "- [{:.3}] {} (source={}, run={})",
4474                hit.score, text, hit.record.source_type, hit.record.run_id
4475            );
4476            used = used.saturating_add(line.len());
4477            if used > 2200 {
4478                break;
4479            }
4480            out.push(line);
4481        }
4482        out.push("</memory_context>".to_string());
4483        out.join("\n")
4484    }
4485
4486    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4487        chunk
4488            .metadata
4489            .as_ref()
4490            .and_then(|meta| meta.get("source_url"))
4491            .and_then(Value::as_str)
4492            .map(str::trim)
4493            .filter(|v| !v.is_empty())
4494            .map(ToString::to_string)
4495    }
4496
4497    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4498        if let Some(path) = chunk
4499            .metadata
4500            .as_ref()
4501            .and_then(|meta| meta.get("relative_path"))
4502            .and_then(Value::as_str)
4503            .map(str::trim)
4504            .filter(|v| !v.is_empty())
4505        {
4506            return path.to_string();
4507        }
4508        chunk
4509            .source
4510            .strip_prefix("guide_docs:")
4511            .unwrap_or(chunk.source.as_str())
4512            .to_string()
4513    }
4514
4515    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4516        let mut out = vec!["<docs_context>".to_string()];
4517        let mut used = 0usize;
4518        for hit in hits {
4519            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4520            let path = Self::extract_docs_relative_path(&hit.chunk);
4521            let text = hit
4522                .chunk
4523                .content
4524                .split_whitespace()
4525                .take(70)
4526                .collect::<Vec<_>>()
4527                .join(" ");
4528            let line = format!(
4529                "- [{:.3}] {} (doc_path={}, source_url={})",
4530                hit.similarity, text, path, url
4531            );
4532            used = used.saturating_add(line.len());
4533            if used > 2800 {
4534                break;
4535            }
4536            out.push(line);
4537        }
4538        out.push("</docs_context>".to_string());
4539        out.join("\n")
4540    }
4541
4542    async fn search_embedded_docs(
4543        &self,
4544        query: &str,
4545        limit: usize,
4546    ) -> Vec<tandem_memory::types::MemorySearchResult> {
4547        let Some(manager) = self.open_memory_manager().await else {
4548            return Vec::new();
4549        };
4550        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
4551        manager
4552            .search(
4553                query,
4554                Some(MemoryTier::Global),
4555                None,
4556                None,
4557                Some(search_limit),
4558            )
4559            .await
4560            .unwrap_or_default()
4561            .into_iter()
4562            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
4563            .take(limit)
4564            .collect()
4565    }
4566
4567    fn should_skip_memory_injection(query: &str) -> bool {
4568        let trimmed = query.trim();
4569        if trimmed.is_empty() {
4570            return true;
4571        }
4572        let lower = trimmed.to_ascii_lowercase();
4573        let social = [
4574            "hi",
4575            "hello",
4576            "hey",
4577            "thanks",
4578            "thank you",
4579            "ok",
4580            "okay",
4581            "cool",
4582            "nice",
4583            "yo",
4584            "good morning",
4585            "good afternoon",
4586            "good evening",
4587        ];
4588        lower.len() <= 32 && social.contains(&lower.as_str())
4589    }
4590
4591    fn personality_preset_text(preset: &str) -> &'static str {
4592        match preset {
4593            "concise" => {
4594                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
4595            }
4596            "friendly" => {
4597                "Default style: friendly and supportive while staying technically rigorous and concrete."
4598            }
4599            "mentor" => {
4600                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
4601            }
4602            "critical" => {
4603                "Default style: critical and risk-first. Surface failure modes and assumptions early."
4604            }
4605            _ => {
4606                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
4607            }
4608        }
4609    }
4610
4611    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
4612        let allow_agent_override = agent_name
4613            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
4614            .unwrap_or(false);
4615        let legacy_bot_name = config
4616            .get("bot_name")
4617            .and_then(Value::as_str)
4618            .map(str::trim)
4619            .filter(|v| !v.is_empty());
4620        let bot_name = config
4621            .get("identity")
4622            .and_then(|identity| identity.get("bot"))
4623            .and_then(|bot| bot.get("canonical_name"))
4624            .and_then(Value::as_str)
4625            .map(str::trim)
4626            .filter(|v| !v.is_empty())
4627            .or(legacy_bot_name)
4628            .unwrap_or("Tandem");
4629
4630        let default_profile = config
4631            .get("identity")
4632            .and_then(|identity| identity.get("personality"))
4633            .and_then(|personality| personality.get("default"));
4634        let default_preset = default_profile
4635            .and_then(|profile| profile.get("preset"))
4636            .and_then(Value::as_str)
4637            .map(str::trim)
4638            .filter(|v| !v.is_empty())
4639            .unwrap_or("balanced");
4640        let default_custom = default_profile
4641            .and_then(|profile| profile.get("custom_instructions"))
4642            .and_then(Value::as_str)
4643            .map(str::trim)
4644            .filter(|v| !v.is_empty())
4645            .map(ToString::to_string);
4646        let legacy_persona = config
4647            .get("persona")
4648            .and_then(Value::as_str)
4649            .map(str::trim)
4650            .filter(|v| !v.is_empty())
4651            .map(ToString::to_string);
4652
4653        let per_agent_profile = if allow_agent_override {
4654            agent_name.and_then(|name| {
4655                config
4656                    .get("identity")
4657                    .and_then(|identity| identity.get("personality"))
4658                    .and_then(|personality| personality.get("per_agent"))
4659                    .and_then(|per_agent| per_agent.get(name))
4660            })
4661        } else {
4662            None
4663        };
4664        let preset = per_agent_profile
4665            .and_then(|profile| profile.get("preset"))
4666            .and_then(Value::as_str)
4667            .map(str::trim)
4668            .filter(|v| !v.is_empty())
4669            .unwrap_or(default_preset);
4670        let custom = per_agent_profile
4671            .and_then(|profile| profile.get("custom_instructions"))
4672            .and_then(Value::as_str)
4673            .map(str::trim)
4674            .filter(|v| !v.is_empty())
4675            .map(ToString::to_string)
4676            .or(default_custom)
4677            .or(legacy_persona);
4678
4679        let mut lines = vec![
4680            format!("You are {bot_name}, an AI assistant."),
4681            Self::personality_preset_text(preset).to_string(),
4682        ];
4683        if let Some(custom) = custom {
4684            lines.push(format!("Additional personality instructions: {custom}"));
4685        }
4686        Some(lines.join("\n"))
4687    }
4688
4689    fn build_memory_scope_block(
4690        session_id: &str,
4691        project_id: Option<&str>,
4692        workspace_root: Option<&str>,
4693    ) -> String {
4694        let mut lines = vec![
4695            "<memory_scope>".to_string(),
4696            format!("- current_session_id: {}", session_id),
4697        ];
4698        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
4699            lines.push(format!("- current_project_id: {}", project_id));
4700        }
4701        if let Some(workspace_root) = workspace_root
4702            .map(str::trim)
4703            .filter(|value| !value.is_empty())
4704        {
4705            lines.push(format!("- workspace_root: {}", workspace_root));
4706        }
4707        lines.push(
4708            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
4709                .to_string(),
4710        );
4711        lines.push(
4712            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
4713                .to_string(),
4714        );
4715        lines.push(
4716            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
4717                .to_string(),
4718        );
4719        lines.push("</memory_scope>".to_string());
4720        lines.join("\n")
4721    }
4722}
4723
4724impl PromptContextHook for ServerPromptContextHook {
4725    fn augment_provider_messages(
4726        &self,
4727        ctx: PromptContextHookContext,
4728        mut messages: Vec<ChatMessage>,
4729    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
4730        let this = self.clone();
4731        Box::pin(async move {
4732            // Startup can invoke prompt plumbing before RuntimeState is installed.
4733            // Never panic from context hooks; fail-open and continue without augmentation.
4734            if !this.state.is_ready() {
4735                return Ok(messages);
4736            }
4737            let run = this.state.run_registry.get(&ctx.session_id).await;
4738            let Some(run) = run else {
4739                return Ok(messages);
4740            };
4741            let config = this.state.config.get_effective_value().await;
4742            if let Some(identity_block) =
4743                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
4744            {
4745                messages.push(ChatMessage {
4746                    role: "system".to_string(),
4747                    content: identity_block,
4748                    attachments: Vec::new(),
4749                });
4750            }
4751            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
4752                messages.push(ChatMessage {
4753                    role: "system".to_string(),
4754                    content: Self::build_memory_scope_block(
4755                        &ctx.session_id,
4756                        session.project_id.as_deref(),
4757                        session.workspace_root.as_deref(),
4758                    ),
4759                    attachments: Vec::new(),
4760                });
4761            }
4762            let run_id = run.run_id;
4763            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
4764            let query = messages
4765                .iter()
4766                .rev()
4767                .find(|m| m.role == "user")
4768                .map(|m| m.content.clone())
4769                .unwrap_or_default();
4770            if query.trim().is_empty() {
4771                return Ok(messages);
4772            }
4773            if Self::should_skip_memory_injection(&query) {
4774                return Ok(messages);
4775            }
4776
4777            let docs_hits = this.search_embedded_docs(&query, 6).await;
4778            if !docs_hits.is_empty() {
4779                let docs_block = Self::build_docs_memory_block(&docs_hits);
4780                messages.push(ChatMessage {
4781                    role: "system".to_string(),
4782                    content: docs_block.clone(),
4783                    attachments: Vec::new(),
4784                });
4785                this.state.event_bus.publish(EngineEvent::new(
4786                    "memory.docs.context.injected",
4787                    json!({
4788                        "runID": run_id,
4789                        "sessionID": ctx.session_id,
4790                        "messageID": ctx.message_id,
4791                        "iteration": ctx.iteration,
4792                        "count": docs_hits.len(),
4793                        "tokenSizeApprox": docs_block.split_whitespace().count(),
4794                        "sourcePrefix": "guide_docs:"
4795                    }),
4796                ));
4797                return Ok(messages);
4798            }
4799
4800            let Some(db) = this.open_memory_db().await else {
4801                return Ok(messages);
4802            };
4803            let started = now_ms();
4804            let hits = db
4805                .search_global_memory(&user_id, &query, 8, None, None, None)
4806                .await
4807                .unwrap_or_default();
4808            let latency_ms = now_ms().saturating_sub(started);
4809            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
4810            this.state.event_bus.publish(EngineEvent::new(
4811                "memory.search.performed",
4812                json!({
4813                    "runID": run_id,
4814                    "sessionID": ctx.session_id,
4815                    "messageID": ctx.message_id,
4816                    "providerID": ctx.provider_id,
4817                    "modelID": ctx.model_id,
4818                    "iteration": ctx.iteration,
4819                    "queryHash": Self::hash_query(&query),
4820                    "resultCount": hits.len(),
4821                    "scoreMin": scores.iter().copied().reduce(f64::min),
4822                    "scoreMax": scores.iter().copied().reduce(f64::max),
4823                    "scores": scores,
4824                    "latencyMs": latency_ms,
4825                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
4826                }),
4827            ));
4828
4829            if hits.is_empty() {
4830                return Ok(messages);
4831            }
4832
4833            let memory_block = Self::build_memory_block(&hits);
4834            messages.push(ChatMessage {
4835                role: "system".to_string(),
4836                content: memory_block.clone(),
4837                attachments: Vec::new(),
4838            });
4839            this.state.event_bus.publish(EngineEvent::new(
4840                "memory.context.injected",
4841                json!({
4842                    "runID": run_id,
4843                    "sessionID": ctx.session_id,
4844                    "messageID": ctx.message_id,
4845                    "iteration": ctx.iteration,
4846                    "count": hits.len(),
4847                    "tokenSizeApprox": memory_block.split_whitespace().count(),
4848                }),
4849            ));
4850            Ok(messages)
4851        })
4852    }
4853}
4854
4855fn extract_event_session_id(properties: &Value) -> Option<String> {
4856    properties
4857        .get("sessionID")
4858        .or_else(|| properties.get("sessionId"))
4859        .or_else(|| properties.get("id"))
4860        .or_else(|| {
4861            properties
4862                .get("part")
4863                .and_then(|part| part.get("sessionID"))
4864        })
4865        .or_else(|| {
4866            properties
4867                .get("part")
4868                .and_then(|part| part.get("sessionId"))
4869        })
4870        .and_then(|v| v.as_str())
4871        .map(|s| s.to_string())
4872}
4873
4874fn extract_event_run_id(properties: &Value) -> Option<String> {
4875    properties
4876        .get("runID")
4877        .or_else(|| properties.get("run_id"))
4878        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
4879        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
4880        .and_then(|v| v.as_str())
4881        .map(|s| s.to_string())
4882}
4883
4884fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
4885    let part = properties.get("part")?;
4886    let part_type = part
4887        .get("type")
4888        .and_then(|v| v.as_str())
4889        .unwrap_or_default()
4890        .to_ascii_lowercase();
4891    if part_type != "tool" && part_type != "tool-invocation" && part_type != "tool-result" {
4892        return None;
4893    }
4894    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
4895    let message_id = part
4896        .get("messageID")
4897        .or_else(|| part.get("message_id"))
4898        .and_then(|v| v.as_str())?
4899        .to_string();
4900    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
4901    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
4902        if let Some(preview) = properties
4903            .get("toolCallDelta")
4904            .and_then(|delta| delta.get("parsedArgsPreview"))
4905            .cloned()
4906        {
4907            let preview_nonempty = !preview.is_null()
4908                && !preview.as_object().is_some_and(|value| value.is_empty())
4909                && !preview
4910                    .as_str()
4911                    .map(|value| value.trim().is_empty())
4912                    .unwrap_or(false);
4913            if preview_nonempty {
4914                args = preview;
4915            }
4916        }
4917    }
4918    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
4919    {
4920        tracing::info!(
4921            message_id = %message_id,
4922            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
4923            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
4924            has_result = part.get("result").is_some(),
4925            has_error = part.get("error").is_some(),
4926            "persistable write tool part still has empty args"
4927        );
4928    }
4929    let result = part.get("result").cloned().filter(|value| !value.is_null());
4930    let error = part
4931        .get("error")
4932        .and_then(|v| v.as_str())
4933        .map(|value| value.to_string());
4934    Some((
4935        message_id,
4936        MessagePart::ToolInvocation {
4937            tool,
4938            args,
4939            result,
4940            error,
4941        },
4942    ))
4943}
4944
4945fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
4946    let session_id = extract_event_session_id(&event.properties)?;
4947    let run_id = extract_event_run_id(&event.properties);
4948    let key = format!("run/{session_id}/status");
4949
4950    let mut base = serde_json::Map::new();
4951    base.insert("sessionID".to_string(), Value::String(session_id));
4952    if let Some(run_id) = run_id {
4953        base.insert("runID".to_string(), Value::String(run_id));
4954    }
4955
4956    match event.event_type.as_str() {
4957        "session.run.started" => {
4958            base.insert("state".to_string(), Value::String("running".to_string()));
4959            base.insert("phase".to_string(), Value::String("run".to_string()));
4960            base.insert(
4961                "eventType".to_string(),
4962                Value::String("session.run.started".to_string()),
4963            );
4964            Some(StatusIndexUpdate {
4965                key,
4966                value: Value::Object(base),
4967            })
4968        }
4969        "session.run.finished" => {
4970            base.insert("state".to_string(), Value::String("finished".to_string()));
4971            base.insert("phase".to_string(), Value::String("run".to_string()));
4972            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
4973                base.insert("result".to_string(), Value::String(status.to_string()));
4974            }
4975            base.insert(
4976                "eventType".to_string(),
4977                Value::String("session.run.finished".to_string()),
4978            );
4979            Some(StatusIndexUpdate {
4980                key,
4981                value: Value::Object(base),
4982            })
4983        }
4984        "message.part.updated" => {
4985            let part_type = event
4986                .properties
4987                .get("part")
4988                .and_then(|v| v.get("type"))
4989                .and_then(|v| v.as_str())?;
4990            let part_state = event
4991                .properties
4992                .get("part")
4993                .and_then(|v| v.get("state"))
4994                .and_then(|v| v.as_str())
4995                .unwrap_or("");
4996            let (phase, tool_active) = match (part_type, part_state) {
4997                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
4998                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
4999                _ => return None,
5000            };
5001            base.insert("state".to_string(), Value::String("running".to_string()));
5002            base.insert("phase".to_string(), Value::String(phase.to_string()));
5003            base.insert("toolActive".to_string(), Value::Bool(tool_active));
5004            if let Some(tool_name) = event
5005                .properties
5006                .get("part")
5007                .and_then(|v| v.get("tool"))
5008                .and_then(|v| v.as_str())
5009            {
5010                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
5011            }
5012            base.insert(
5013                "eventType".to_string(),
5014                Value::String("message.part.updated".to_string()),
5015            );
5016            Some(StatusIndexUpdate {
5017                key,
5018                value: Value::Object(base),
5019            })
5020        }
5021        _ => None,
5022    }
5023}
5024
5025pub async fn run_session_part_persister(state: AppState) {
5026    if !state.wait_until_ready_or_failed(120, 250).await {
5027        tracing::warn!("session part persister: skipped because runtime did not become ready");
5028        return;
5029    }
5030    let Some(mut rx) = state.event_bus.take_session_part_receiver() else {
5031        tracing::warn!("session part persister: skipped because receiver was already taken");
5032        return;
5033    };
5034    while let Some(event) = rx.recv().await {
5035        if event.event_type != "message.part.updated" {
5036            continue;
5037        }
5038        // Streaming tool-call previews are useful for the live UI, but persistence
5039        // should store the finalized invocation/result events to avoid duplicating
5040        // one tool part per streamed args delta.
5041        if event.properties.get("toolCallDelta").is_some() {
5042            continue;
5043        }
5044        let Some(session_id) = extract_event_session_id(&event.properties) else {
5045            continue;
5046        };
5047        let Some((message_id, part)) = extract_persistable_tool_part(&event.properties) else {
5048            continue;
5049        };
5050        if let Err(error) = state
5051            .storage
5052            .append_message_part(&session_id, &message_id, part)
5053            .await
5054        {
5055            tracing::warn!(
5056                "session part persister failed for session={} message={}: {error:#}",
5057                session_id,
5058                message_id
5059            );
5060        }
5061    }
5062}
5063
5064pub async fn run_status_indexer(state: AppState) {
5065    if !state.wait_until_ready_or_failed(120, 250).await {
5066        tracing::warn!("status indexer: skipped because runtime did not become ready");
5067        return;
5068    }
5069    let mut rx = state.event_bus.subscribe();
5070    loop {
5071        match rx.recv().await {
5072            Ok(event) => {
5073                if let Some(update) = derive_status_index_update(&event) {
5074                    if let Err(error) = state
5075                        .put_shared_resource(
5076                            update.key,
5077                            update.value,
5078                            None,
5079                            "system.status_indexer".to_string(),
5080                            None,
5081                        )
5082                        .await
5083                    {
5084                        tracing::warn!("status indexer failed to persist update: {error:?}");
5085                    }
5086                }
5087            }
5088            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5089            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5090        }
5091    }
5092}
5093
5094pub async fn run_agent_team_supervisor(state: AppState) {
5095    if !state.wait_until_ready_or_failed(120, 250).await {
5096        tracing::warn!("agent team supervisor: skipped because runtime did not become ready");
5097        return;
5098    }
5099    let mut rx = state.event_bus.subscribe();
5100    loop {
5101        match rx.recv().await {
5102            Ok(event) => {
5103                state.agent_teams.handle_engine_event(&state, &event).await;
5104            }
5105            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5106            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5107        }
5108    }
5109}
5110
5111pub async fn run_bug_monitor(state: AppState) {
5112    if !state.wait_until_ready_or_failed(120, 250).await {
5113        tracing::warn!("bug monitor: skipped because runtime did not become ready");
5114        return;
5115    }
5116    state
5117        .update_bug_monitor_runtime_status(|runtime| {
5118            runtime.monitoring_active = false;
5119            runtime.last_runtime_error = None;
5120        })
5121        .await;
5122    let mut rx = state.event_bus.subscribe();
5123    loop {
5124        match rx.recv().await {
5125            Ok(event) => {
5126                if !is_bug_monitor_candidate_event(&event) {
5127                    continue;
5128                }
5129                let status = state.bug_monitor_status().await;
5130                if !status.config.enabled || status.config.paused || !status.readiness.repo_valid {
5131                    state
5132                        .update_bug_monitor_runtime_status(|runtime| {
5133                            runtime.monitoring_active = status.config.enabled
5134                                && !status.config.paused
5135                                && status.readiness.repo_valid;
5136                            runtime.paused = status.config.paused;
5137                            runtime.last_runtime_error = status.last_error.clone();
5138                        })
5139                        .await;
5140                    continue;
5141                }
5142                match process_bug_monitor_event(&state, &event, &status.config).await {
5143                    Ok(incident) => {
5144                        state
5145                            .update_bug_monitor_runtime_status(|runtime| {
5146                                runtime.monitoring_active = true;
5147                                runtime.paused = status.config.paused;
5148                                runtime.last_processed_at_ms = Some(now_ms());
5149                                runtime.last_incident_event_type =
5150                                    Some(incident.event_type.clone());
5151                                runtime.last_runtime_error = None;
5152                            })
5153                            .await;
5154                    }
5155                    Err(error) => {
5156                        let detail = truncate_text(&error.to_string(), 500);
5157                        state
5158                            .update_bug_monitor_runtime_status(|runtime| {
5159                                runtime.monitoring_active = true;
5160                                runtime.paused = status.config.paused;
5161                                runtime.last_processed_at_ms = Some(now_ms());
5162                                runtime.last_incident_event_type = Some(event.event_type.clone());
5163                                runtime.last_runtime_error = Some(detail.clone());
5164                            })
5165                            .await;
5166                        state.event_bus.publish(EngineEvent::new(
5167                            "bug_monitor.error",
5168                            serde_json::json!({
5169                                "eventType": event.event_type,
5170                                "detail": detail,
5171                            }),
5172                        ));
5173                    }
5174                }
5175            }
5176            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5177            Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => {
5178                state
5179                    .update_bug_monitor_runtime_status(|runtime| {
5180                        runtime.last_runtime_error =
5181                            Some(format!("Bug monitor lagged and dropped {count} events."));
5182                    })
5183                    .await;
5184            }
5185        }
5186    }
5187}
5188
5189pub async fn run_usage_aggregator(state: AppState) {
5190    if !state.wait_until_ready_or_failed(120, 250).await {
5191        tracing::warn!("usage aggregator: skipped because runtime did not become ready");
5192        return;
5193    }
5194    let mut rx = state.event_bus.subscribe();
5195    loop {
5196        match rx.recv().await {
5197            Ok(event) => {
5198                if event.event_type != "provider.usage" {
5199                    continue;
5200                }
5201                let session_id = event
5202                    .properties
5203                    .get("sessionID")
5204                    .and_then(|v| v.as_str())
5205                    .unwrap_or("");
5206                if session_id.is_empty() {
5207                    continue;
5208                }
5209                let prompt_tokens = event
5210                    .properties
5211                    .get("promptTokens")
5212                    .and_then(|v| v.as_u64())
5213                    .unwrap_or(0);
5214                let completion_tokens = event
5215                    .properties
5216                    .get("completionTokens")
5217                    .and_then(|v| v.as_u64())
5218                    .unwrap_or(0);
5219                let total_tokens = event
5220                    .properties
5221                    .get("totalTokens")
5222                    .and_then(|v| v.as_u64())
5223                    .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
5224                state
5225                    .apply_provider_usage_to_runs(
5226                        session_id,
5227                        prompt_tokens,
5228                        completion_tokens,
5229                        total_tokens,
5230                    )
5231                    .await;
5232            }
5233            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5234            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5235        }
5236    }
5237}
5238
5239fn is_bug_monitor_candidate_event(event: &EngineEvent) -> bool {
5240    if event.event_type.starts_with("bug_monitor.") {
5241        return false;
5242    }
5243    matches!(
5244        event.event_type.as_str(),
5245        "context.task.failed" | "workflow.run.failed" | "routine.run.failed" | "session.error"
5246    )
5247}
5248
5249async fn process_bug_monitor_event(
5250    state: &AppState,
5251    event: &EngineEvent,
5252    config: &BugMonitorConfig,
5253) -> anyhow::Result<BugMonitorIncidentRecord> {
5254    let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
5255    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5256        state,
5257        submission.repo.as_deref().unwrap_or_default(),
5258        submission.fingerprint.as_deref().unwrap_or_default(),
5259        submission.title.as_deref(),
5260        submission.detail.as_deref(),
5261        &submission.excerpt,
5262        3,
5263    )
5264    .await;
5265    let fingerprint = submission
5266        .fingerprint
5267        .clone()
5268        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5269    let default_workspace_root = state.workspace_index.snapshot().await.root;
5270    let workspace_root = config
5271        .workspace_root
5272        .clone()
5273        .unwrap_or(default_workspace_root);
5274    let now = now_ms();
5275
5276    let existing = state
5277        .bug_monitor_incidents
5278        .read()
5279        .await
5280        .values()
5281        .find(|row| row.fingerprint == fingerprint)
5282        .cloned();
5283
5284    let mut incident = if let Some(mut row) = existing {
5285        row.occurrence_count = row.occurrence_count.saturating_add(1);
5286        row.updated_at_ms = now;
5287        row.last_seen_at_ms = Some(now);
5288        if row.excerpt.is_empty() {
5289            row.excerpt = submission.excerpt.clone();
5290        }
5291        row
5292    } else {
5293        BugMonitorIncidentRecord {
5294            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5295            fingerprint: fingerprint.clone(),
5296            event_type: event.event_type.clone(),
5297            status: "queued".to_string(),
5298            repo: submission.repo.clone().unwrap_or_default(),
5299            workspace_root,
5300            title: submission
5301                .title
5302                .clone()
5303                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5304            detail: submission.detail.clone(),
5305            excerpt: submission.excerpt.clone(),
5306            source: submission.source.clone(),
5307            run_id: submission.run_id.clone(),
5308            session_id: submission.session_id.clone(),
5309            correlation_id: submission.correlation_id.clone(),
5310            component: submission.component.clone(),
5311            level: submission.level.clone(),
5312            occurrence_count: 1,
5313            created_at_ms: now,
5314            updated_at_ms: now,
5315            last_seen_at_ms: Some(now),
5316            draft_id: None,
5317            triage_run_id: None,
5318            last_error: None,
5319            duplicate_summary: None,
5320            duplicate_matches: None,
5321            event_payload: Some(event.properties.clone()),
5322        }
5323    };
5324    state.put_bug_monitor_incident(incident.clone()).await?;
5325
5326    if !duplicate_matches.is_empty() {
5327        incident.status = "duplicate_suppressed".to_string();
5328        let duplicate_summary =
5329            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5330        incident.duplicate_summary = Some(duplicate_summary.clone());
5331        incident.duplicate_matches = Some(duplicate_matches.clone());
5332        incident.updated_at_ms = now_ms();
5333        state.put_bug_monitor_incident(incident.clone()).await?;
5334        state.event_bus.publish(EngineEvent::new(
5335            "bug_monitor.incident.duplicate_suppressed",
5336            serde_json::json!({
5337                "incident_id": incident.incident_id,
5338                "fingerprint": incident.fingerprint,
5339                "eventType": incident.event_type,
5340                "status": incident.status,
5341                "duplicate_summary": duplicate_summary,
5342                "duplicate_matches": duplicate_matches,
5343            }),
5344        ));
5345        return Ok(incident);
5346    }
5347
5348    let draft = match state.submit_bug_monitor_draft(submission).await {
5349        Ok(draft) => draft,
5350        Err(error) => {
5351            incident.status = "draft_failed".to_string();
5352            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5353            incident.updated_at_ms = now_ms();
5354            state.put_bug_monitor_incident(incident.clone()).await?;
5355            state.event_bus.publish(EngineEvent::new(
5356                "bug_monitor.incident.detected",
5357                serde_json::json!({
5358                    "incident_id": incident.incident_id,
5359                    "fingerprint": incident.fingerprint,
5360                    "eventType": incident.event_type,
5361                    "draft_id": incident.draft_id,
5362                    "triage_run_id": incident.triage_run_id,
5363                    "status": incident.status,
5364                    "detail": incident.last_error,
5365                }),
5366            ));
5367            return Ok(incident);
5368        }
5369    };
5370    incident.draft_id = Some(draft.draft_id.clone());
5371    incident.status = "draft_created".to_string();
5372    state.put_bug_monitor_incident(incident.clone()).await?;
5373
5374    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5375        state.clone(),
5376        &draft.draft_id,
5377        true,
5378    )
5379    .await
5380    {
5381        Ok((updated_draft, _run_id, _deduped)) => {
5382            incident.triage_run_id = updated_draft.triage_run_id.clone();
5383            if incident.triage_run_id.is_some() {
5384                incident.status = "triage_queued".to_string();
5385            }
5386            incident.last_error = None;
5387        }
5388        Err(error) => {
5389            incident.status = "draft_created".to_string();
5390            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5391        }
5392    }
5393
5394    if let Some(draft_id) = incident.draft_id.clone() {
5395        let latest_draft = state
5396            .get_bug_monitor_draft(&draft_id)
5397            .await
5398            .unwrap_or(draft.clone());
5399        match crate::bug_monitor_github::publish_draft(
5400            state,
5401            &draft_id,
5402            Some(&incident.incident_id),
5403            crate::bug_monitor_github::PublishMode::Auto,
5404        )
5405        .await
5406        {
5407            Ok(outcome) => {
5408                incident.status = outcome.action;
5409                incident.last_error = None;
5410            }
5411            Err(error) => {
5412                let detail = truncate_text(&error.to_string(), 500);
5413                incident.last_error = Some(detail.clone());
5414                let mut failed_draft = latest_draft;
5415                failed_draft.status = "github_post_failed".to_string();
5416                failed_draft.github_status = Some("github_post_failed".to_string());
5417                failed_draft.last_post_error = Some(detail.clone());
5418                let evidence_digest = failed_draft.evidence_digest.clone();
5419                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5420                let _ = crate::bug_monitor_github::record_post_failure(
5421                    state,
5422                    &failed_draft,
5423                    Some(&incident.incident_id),
5424                    "auto_post",
5425                    evidence_digest.as_deref(),
5426                    &detail,
5427                )
5428                .await;
5429            }
5430        }
5431    }
5432
5433    incident.updated_at_ms = now_ms();
5434    state.put_bug_monitor_incident(incident.clone()).await?;
5435    state.event_bus.publish(EngineEvent::new(
5436        "bug_monitor.incident.detected",
5437        serde_json::json!({
5438            "incident_id": incident.incident_id,
5439            "fingerprint": incident.fingerprint,
5440            "eventType": incident.event_type,
5441            "draft_id": incident.draft_id,
5442            "triage_run_id": incident.triage_run_id,
5443            "status": incident.status,
5444        }),
5445    ));
5446    Ok(incident)
5447}
5448
5449async fn build_bug_monitor_submission_from_event(
5450    state: &AppState,
5451    config: &BugMonitorConfig,
5452    event: &EngineEvent,
5453) -> anyhow::Result<BugMonitorSubmission> {
5454    let repo = config
5455        .repo
5456        .clone()
5457        .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
5458    let default_workspace_root = state.workspace_index.snapshot().await.root;
5459    let workspace_root = config
5460        .workspace_root
5461        .clone()
5462        .unwrap_or(default_workspace_root);
5463    let reason = first_string(
5464        &event.properties,
5465        &["reason", "error", "detail", "message", "summary"],
5466    );
5467    let run_id = first_string(&event.properties, &["runID", "run_id"]);
5468    let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
5469    let correlation_id = first_string(
5470        &event.properties,
5471        &["correlationID", "correlation_id", "commandID", "command_id"],
5472    );
5473    let component = first_string(
5474        &event.properties,
5475        &[
5476            "component",
5477            "routineID",
5478            "routine_id",
5479            "workflowID",
5480            "workflow_id",
5481            "task",
5482            "title",
5483        ],
5484    );
5485    let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
5486    if excerpt.is_empty() {
5487        if let Some(reason) = reason.as_ref() {
5488            excerpt.push(reason.clone());
5489        }
5490    }
5491    let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
5492    let fingerprint = sha256_hex(&[
5493        repo.as_str(),
5494        workspace_root.as_str(),
5495        event.event_type.as_str(),
5496        reason.as_deref().unwrap_or(""),
5497        run_id.as_deref().unwrap_or(""),
5498        session_id.as_deref().unwrap_or(""),
5499        correlation_id.as_deref().unwrap_or(""),
5500        component.as_deref().unwrap_or(""),
5501        serialized.as_str(),
5502    ]);
5503    let title = if let Some(component) = component.as_ref() {
5504        format!("{} failure in {}", event.event_type, component)
5505    } else {
5506        format!("{} detected", event.event_type)
5507    };
5508    let mut detail_lines = vec![
5509        format!("event_type: {}", event.event_type),
5510        format!("workspace_root: {}", workspace_root),
5511    ];
5512    if let Some(reason) = reason.as_ref() {
5513        detail_lines.push(format!("reason: {reason}"));
5514    }
5515    if let Some(run_id) = run_id.as_ref() {
5516        detail_lines.push(format!("run_id: {run_id}"));
5517    }
5518    if let Some(session_id) = session_id.as_ref() {
5519        detail_lines.push(format!("session_id: {session_id}"));
5520    }
5521    if let Some(correlation_id) = correlation_id.as_ref() {
5522        detail_lines.push(format!("correlation_id: {correlation_id}"));
5523    }
5524    if let Some(component) = component.as_ref() {
5525        detail_lines.push(format!("component: {component}"));
5526    }
5527    if !serialized.trim().is_empty() {
5528        detail_lines.push(String::new());
5529        detail_lines.push("payload:".to_string());
5530        detail_lines.push(truncate_text(&serialized, 2_000));
5531    }
5532
5533    Ok(BugMonitorSubmission {
5534        repo: Some(repo),
5535        title: Some(title),
5536        detail: Some(detail_lines.join("\n")),
5537        source: Some("tandem_events".to_string()),
5538        run_id,
5539        session_id,
5540        correlation_id,
5541        file_name: None,
5542        process: Some("tandem-engine".to_string()),
5543        component,
5544        event: Some(event.event_type.clone()),
5545        level: Some("error".to_string()),
5546        excerpt,
5547        fingerprint: Some(fingerprint),
5548    })
5549}
5550
5551async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
5552    let mut excerpt = Vec::new();
5553    if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
5554        excerpt.push(reason);
5555    }
5556    if let Some(title) = first_string(properties, &["title", "task"]) {
5557        if !excerpt.iter().any(|row| row == &title) {
5558            excerpt.push(title);
5559        }
5560    }
5561    let logs = state.logs.read().await;
5562    for entry in logs.iter().rev().take(3) {
5563        if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
5564            excerpt.push(truncate_text(message, 240));
5565        }
5566    }
5567    excerpt.truncate(8);
5568    excerpt
5569}
5570
5571fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
5572    for key in keys {
5573        if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
5574            let trimmed = value.trim();
5575            if !trimmed.is_empty() {
5576                return Some(trimmed.to_string());
5577            }
5578        }
5579    }
5580    None
5581}
5582
5583fn sha256_hex(parts: &[&str]) -> String {
5584    let mut hasher = Sha256::new();
5585    for part in parts {
5586        hasher.update(part.as_bytes());
5587        hasher.update([0u8]);
5588    }
5589    format!("{:x}", hasher.finalize())
5590}
5591
5592pub async fn run_routine_scheduler(state: AppState) {
5593    loop {
5594        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5595        let now = now_ms();
5596        let plans = state.evaluate_routine_misfires(now).await;
5597        for plan in plans {
5598            let Some(routine) = state.get_routine(&plan.routine_id).await else {
5599                continue;
5600            };
5601            match evaluate_routine_execution_policy(&routine, "scheduled") {
5602                RoutineExecutionDecision::Allowed => {
5603                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
5604                    let run = state
5605                        .create_routine_run(
5606                            &routine,
5607                            "scheduled",
5608                            plan.run_count,
5609                            RoutineRunStatus::Queued,
5610                            None,
5611                        )
5612                        .await;
5613                    state
5614                        .append_routine_history(RoutineHistoryEvent {
5615                            routine_id: plan.routine_id.clone(),
5616                            trigger_type: "scheduled".to_string(),
5617                            run_count: plan.run_count,
5618                            fired_at_ms: now,
5619                            status: "queued".to_string(),
5620                            detail: None,
5621                        })
5622                        .await;
5623                    state.event_bus.publish(EngineEvent::new(
5624                        "routine.fired",
5625                        serde_json::json!({
5626                            "routineID": plan.routine_id,
5627                            "runID": run.run_id,
5628                            "runCount": plan.run_count,
5629                            "scheduledAtMs": plan.scheduled_at_ms,
5630                            "nextFireAtMs": plan.next_fire_at_ms,
5631                        }),
5632                    ));
5633                    state.event_bus.publish(EngineEvent::new(
5634                        "routine.run.created",
5635                        serde_json::json!({
5636                            "run": run,
5637                        }),
5638                    ));
5639                }
5640                RoutineExecutionDecision::RequiresApproval { reason } => {
5641                    let run = state
5642                        .create_routine_run(
5643                            &routine,
5644                            "scheduled",
5645                            plan.run_count,
5646                            RoutineRunStatus::PendingApproval,
5647                            Some(reason.clone()),
5648                        )
5649                        .await;
5650                    state
5651                        .append_routine_history(RoutineHistoryEvent {
5652                            routine_id: plan.routine_id.clone(),
5653                            trigger_type: "scheduled".to_string(),
5654                            run_count: plan.run_count,
5655                            fired_at_ms: now,
5656                            status: "pending_approval".to_string(),
5657                            detail: Some(reason.clone()),
5658                        })
5659                        .await;
5660                    state.event_bus.publish(EngineEvent::new(
5661                        "routine.approval_required",
5662                        serde_json::json!({
5663                            "routineID": plan.routine_id,
5664                            "runID": run.run_id,
5665                            "runCount": plan.run_count,
5666                            "triggerType": "scheduled",
5667                            "reason": reason,
5668                        }),
5669                    ));
5670                    state.event_bus.publish(EngineEvent::new(
5671                        "routine.run.created",
5672                        serde_json::json!({
5673                            "run": run,
5674                        }),
5675                    ));
5676                }
5677                RoutineExecutionDecision::Blocked { reason } => {
5678                    let run = state
5679                        .create_routine_run(
5680                            &routine,
5681                            "scheduled",
5682                            plan.run_count,
5683                            RoutineRunStatus::BlockedPolicy,
5684                            Some(reason.clone()),
5685                        )
5686                        .await;
5687                    state
5688                        .append_routine_history(RoutineHistoryEvent {
5689                            routine_id: plan.routine_id.clone(),
5690                            trigger_type: "scheduled".to_string(),
5691                            run_count: plan.run_count,
5692                            fired_at_ms: now,
5693                            status: "blocked_policy".to_string(),
5694                            detail: Some(reason.clone()),
5695                        })
5696                        .await;
5697                    state.event_bus.publish(EngineEvent::new(
5698                        "routine.blocked",
5699                        serde_json::json!({
5700                            "routineID": plan.routine_id,
5701                            "runID": run.run_id,
5702                            "runCount": plan.run_count,
5703                            "triggerType": "scheduled",
5704                            "reason": reason,
5705                        }),
5706                    ));
5707                    state.event_bus.publish(EngineEvent::new(
5708                        "routine.run.created",
5709                        serde_json::json!({
5710                            "run": run,
5711                        }),
5712                    ));
5713                }
5714            }
5715        }
5716    }
5717}
5718
5719pub async fn run_routine_executor(state: AppState) {
5720    loop {
5721        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5722        let Some(run) = state.claim_next_queued_routine_run().await else {
5723            continue;
5724        };
5725
5726        state.event_bus.publish(EngineEvent::new(
5727            "routine.run.started",
5728            serde_json::json!({
5729                "runID": run.run_id,
5730                "routineID": run.routine_id,
5731                "triggerType": run.trigger_type,
5732                "startedAtMs": now_ms(),
5733            }),
5734        ));
5735
5736        let workspace_root = state.workspace_index.snapshot().await.root;
5737        let mut session = Session::new(
5738            Some(format!("Routine {}", run.routine_id)),
5739            Some(workspace_root.clone()),
5740        );
5741        let session_id = session.id.clone();
5742        session.workspace_root = Some(workspace_root);
5743
5744        if let Err(error) = state.storage.save_session(session).await {
5745            let detail = format!("failed to create routine session: {error}");
5746            let _ = state
5747                .update_routine_run_status(
5748                    &run.run_id,
5749                    RoutineRunStatus::Failed,
5750                    Some(detail.clone()),
5751                )
5752                .await;
5753            state.event_bus.publish(EngineEvent::new(
5754                "routine.run.failed",
5755                serde_json::json!({
5756                    "runID": run.run_id,
5757                    "routineID": run.routine_id,
5758                    "reason": detail,
5759                }),
5760            ));
5761            continue;
5762        }
5763
5764        state
5765            .set_routine_session_policy(
5766                session_id.clone(),
5767                run.run_id.clone(),
5768                run.routine_id.clone(),
5769                run.allowed_tools.clone(),
5770            )
5771            .await;
5772        state
5773            .add_active_session_id(&run.run_id, session_id.clone())
5774            .await;
5775        state
5776            .engine_loop
5777            .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
5778            .await;
5779        state
5780            .engine_loop
5781            .set_session_auto_approve_permissions(&session_id, true)
5782            .await;
5783
5784        let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
5785        if let Some(spec) = selected_model.as_ref() {
5786            state.event_bus.publish(EngineEvent::new(
5787                "routine.run.model_selected",
5788                serde_json::json!({
5789                    "runID": run.run_id,
5790                    "routineID": run.routine_id,
5791                    "providerID": spec.provider_id,
5792                    "modelID": spec.model_id,
5793                    "source": model_source,
5794                }),
5795            ));
5796        }
5797
5798        let request = SendMessageRequest {
5799            parts: vec![MessagePartInput::Text {
5800                text: build_routine_prompt(&state, &run).await,
5801            }],
5802            model: selected_model,
5803            agent: None,
5804            tool_mode: None,
5805            tool_allowlist: None,
5806            context_mode: None,
5807            write_required: None,
5808        };
5809
5810        let run_result = state
5811            .engine_loop
5812            .run_prompt_async_with_context(
5813                session_id.clone(),
5814                request,
5815                Some(format!("routine:{}", run.run_id)),
5816            )
5817            .await;
5818
5819        state.clear_routine_session_policy(&session_id).await;
5820        state
5821            .clear_active_session_id(&run.run_id, &session_id)
5822            .await;
5823        state
5824            .engine_loop
5825            .clear_session_allowed_tools(&session_id)
5826            .await;
5827        state
5828            .engine_loop
5829            .clear_session_auto_approve_permissions(&session_id)
5830            .await;
5831
5832        match run_result {
5833            Ok(()) => {
5834                append_configured_output_artifacts(&state, &run).await;
5835                let _ = state
5836                    .update_routine_run_status(
5837                        &run.run_id,
5838                        RoutineRunStatus::Completed,
5839                        Some("routine run completed".to_string()),
5840                    )
5841                    .await;
5842                state.event_bus.publish(EngineEvent::new(
5843                    "routine.run.completed",
5844                    serde_json::json!({
5845                        "runID": run.run_id,
5846                        "routineID": run.routine_id,
5847                        "sessionID": session_id,
5848                        "finishedAtMs": now_ms(),
5849                    }),
5850                ));
5851            }
5852            Err(error) => {
5853                if let Some(latest) = state.get_routine_run(&run.run_id).await {
5854                    if latest.status == RoutineRunStatus::Paused {
5855                        state.event_bus.publish(EngineEvent::new(
5856                            "routine.run.paused",
5857                            serde_json::json!({
5858                                "runID": run.run_id,
5859                                "routineID": run.routine_id,
5860                                "sessionID": session_id,
5861                                "finishedAtMs": now_ms(),
5862                            }),
5863                        ));
5864                        continue;
5865                    }
5866                }
5867                let detail = truncate_text(&error.to_string(), 500);
5868                let _ = state
5869                    .update_routine_run_status(
5870                        &run.run_id,
5871                        RoutineRunStatus::Failed,
5872                        Some(detail.clone()),
5873                    )
5874                    .await;
5875                state.event_bus.publish(EngineEvent::new(
5876                    "routine.run.failed",
5877                    serde_json::json!({
5878                        "runID": run.run_id,
5879                        "routineID": run.routine_id,
5880                        "sessionID": session_id,
5881                        "reason": detail,
5882                        "finishedAtMs": now_ms(),
5883                    }),
5884                ));
5885            }
5886        }
5887    }
5888}
5889
5890pub async fn run_automation_v2_scheduler(state: AppState) {
5891    loop {
5892        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5893        let startup = state.startup_snapshot().await;
5894        if !matches!(startup.status, StartupStatus::Ready) {
5895            continue;
5896        }
5897        let now = now_ms();
5898        let due = state.evaluate_automation_v2_misfires(now).await;
5899        for automation_id in due {
5900            let Some(automation) = state.get_automation_v2(&automation_id).await else {
5901                continue;
5902            };
5903            if let Ok(run) = state
5904                .create_automation_v2_run(&automation, "scheduled")
5905                .await
5906            {
5907                state.event_bus.publish(EngineEvent::new(
5908                    "automation.v2.run.created",
5909                    serde_json::json!({
5910                        "automationID": automation_id,
5911                        "run": run,
5912                        "triggerType": "scheduled",
5913                    }),
5914                ));
5915            }
5916        }
5917    }
5918}
5919
5920fn build_automation_v2_upstream_inputs(
5921    run: &AutomationV2RunRecord,
5922    node: &AutomationFlowNode,
5923) -> anyhow::Result<Vec<Value>> {
5924    let mut inputs = Vec::new();
5925    for input_ref in &node.input_refs {
5926        let Some(output) = run.checkpoint.node_outputs.get(&input_ref.from_step_id) else {
5927            anyhow::bail!(
5928                "missing upstream output for `{}` referenced by node `{}`",
5929                input_ref.from_step_id,
5930                node.node_id
5931            );
5932        };
5933        inputs.push(json!({
5934            "alias": input_ref.alias,
5935            "from_step_id": input_ref.from_step_id,
5936            "output": output,
5937        }));
5938    }
5939    Ok(inputs)
5940}
5941
5942fn render_automation_v2_prompt(
5943    automation: &AutomationV2Spec,
5944    run_id: &str,
5945    node: &AutomationFlowNode,
5946    agent: &AutomationAgentProfile,
5947    upstream_inputs: &[Value],
5948    template_system_prompt: Option<&str>,
5949    standup_report_path: Option<&str>,
5950    memory_project_id: Option<&str>,
5951) -> String {
5952    let contract_kind = node
5953        .output_contract
5954        .as_ref()
5955        .map(|contract| contract.kind.as_str())
5956        .unwrap_or("structured_json");
5957    let mut sections = Vec::new();
5958    if let Some(system_prompt) = template_system_prompt
5959        .map(str::trim)
5960        .filter(|value| !value.is_empty())
5961    {
5962        sections.push(format!("Template system prompt:\n{}", system_prompt));
5963    }
5964    sections.push(format!(
5965        "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}\nOutput contract kind: {}",
5966        automation.automation_id, run_id, node.node_id, agent.display_name, node.objective, contract_kind
5967    ));
5968    let mut prompt = sections.join("\n\n");
5969    if !upstream_inputs.is_empty() {
5970        prompt.push_str("\n\nUpstream Inputs:");
5971        for input in upstream_inputs {
5972            let alias = input
5973                .get("alias")
5974                .and_then(Value::as_str)
5975                .unwrap_or("input");
5976            let from_step_id = input
5977                .get("from_step_id")
5978                .and_then(Value::as_str)
5979                .unwrap_or("unknown");
5980            let output = input.get("output").cloned().unwrap_or(Value::Null);
5981            let rendered =
5982                serde_json::to_string_pretty(&output).unwrap_or_else(|_| output.to_string());
5983            prompt.push_str(&format!(
5984                "\n- {}\n  from_step_id: {}\n  output:\n{}",
5985                alias,
5986                from_step_id,
5987                rendered
5988                    .lines()
5989                    .map(|line| format!("    {}", line))
5990                    .collect::<Vec<_>>()
5991                    .join("\n")
5992            ));
5993        }
5994    }
5995    if node.node_id == "notify_user" || node.objective.to_ascii_lowercase().contains("email") {
5996        prompt.push_str(
5997            "\n\nDelivery rules:\n- Prefer inline email body delivery by default.\n- Only include an email attachment when upstream inputs contain a concrete attachment artifact with a non-empty s3key or upload result.\n- Never send an attachment parameter with an empty or null s3key.\n- If no attachment artifact exists, omit the attachment parameter entirely.",
5998        );
5999    }
6000    if let Some(report_path) = standup_report_path
6001        .map(str::trim)
6002        .filter(|value| !value.is_empty())
6003    {
6004        prompt.push_str(&format!(
6005            "\n\nStandup report path:\n- Write the final markdown report to `{}` relative to the workspace root.\n- Use the `write` tool for the report.\n- The report must remain inside the workspace.",
6006            report_path
6007        ));
6008    }
6009    if let Some(project_id) = memory_project_id
6010        .map(str::trim)
6011        .filter(|value| !value.is_empty())
6012    {
6013        prompt.push_str(&format!(
6014            "\n\nMemory search scope:\n- `memory_search` defaults to the current session, current project, and global memory.\n- Current project_id: `{}`.\n- Use `tier: \"project\"` when you need recall limited to this workspace.\n- Use workspace files via `glob`, `grep`, and `read` when memory is sparse or stale.",
6015            project_id
6016        ));
6017    }
6018    prompt.push_str(
6019        "\n\nReturn a concise completion. If you produce structured content, keep it valid JSON inside the response body.",
6020    );
6021    prompt
6022}
6023
6024fn is_agent_standup_automation(automation: &AutomationV2Spec) -> bool {
6025    automation
6026        .metadata
6027        .as_ref()
6028        .and_then(|value| value.get("feature"))
6029        .and_then(Value::as_str)
6030        .map(|value| value == "agent_standup")
6031        .unwrap_or(false)
6032}
6033
6034fn resolve_standup_report_path_template(automation: &AutomationV2Spec) -> Option<String> {
6035    automation
6036        .metadata
6037        .as_ref()
6038        .and_then(|value| value.get("standup"))
6039        .and_then(|value| value.get("report_path_template"))
6040        .and_then(Value::as_str)
6041        .map(|value| value.trim().to_string())
6042        .filter(|value| !value.is_empty())
6043}
6044
6045fn resolve_standup_report_path_for_run(
6046    automation: &AutomationV2Spec,
6047    started_at_ms: u64,
6048) -> Option<String> {
6049    let template = resolve_standup_report_path_template(automation)?;
6050    if !template.contains("{{date}}") {
6051        return Some(template);
6052    }
6053    let date = chrono::DateTime::<chrono::Utc>::from_timestamp_millis(started_at_ms as i64)
6054        .unwrap_or_else(chrono::Utc::now)
6055        .format("%Y-%m-%d")
6056        .to_string();
6057    Some(template.replace("{{date}}", &date))
6058}
6059
6060fn automation_workspace_project_id(workspace_root: &str) -> String {
6061    tandem_core::workspace_project_id(workspace_root)
6062        .unwrap_or_else(|| "workspace-unknown".to_string())
6063}
6064
6065fn merge_automation_agent_allowlist(
6066    agent: &AutomationAgentProfile,
6067    template: Option<&tandem_orchestrator::AgentTemplate>,
6068) -> Vec<String> {
6069    let mut allowlist = if agent.tool_policy.allowlist.is_empty() {
6070        template
6071            .map(|value| value.capabilities.tool_allowlist.clone())
6072            .unwrap_or_default()
6073    } else {
6074        agent.tool_policy.allowlist.clone()
6075    };
6076    allowlist.sort();
6077    allowlist.dedup();
6078    allowlist
6079}
6080
6081fn resolve_automation_agent_model(
6082    agent: &AutomationAgentProfile,
6083    template: Option<&tandem_orchestrator::AgentTemplate>,
6084) -> Option<ModelSpec> {
6085    if let Some(model) = agent
6086        .model_policy
6087        .as_ref()
6088        .and_then(|policy| policy.get("default_model"))
6089        .and_then(parse_model_spec)
6090    {
6091        return Some(model);
6092    }
6093    template
6094        .and_then(|value| value.default_model.as_ref())
6095        .and_then(parse_model_spec)
6096}
6097
6098fn extract_session_text_output(session: &Session) -> String {
6099    session
6100        .messages
6101        .iter()
6102        .rev()
6103        .find(|message| matches!(message.role, MessageRole::Assistant))
6104        .map(|message| {
6105            message
6106                .parts
6107                .iter()
6108                .filter_map(|part| match part {
6109                    MessagePart::Text { text } | MessagePart::Reasoning { text } => {
6110                        Some(text.as_str())
6111                    }
6112                    MessagePart::ToolInvocation { .. } => None,
6113                })
6114                .collect::<Vec<_>>()
6115                .join("\n")
6116        })
6117        .unwrap_or_default()
6118}
6119
6120fn wrap_automation_node_output(
6121    node: &AutomationFlowNode,
6122    session_id: &str,
6123    session_text: &str,
6124) -> Value {
6125    let contract_kind = node
6126        .output_contract
6127        .as_ref()
6128        .map(|contract| contract.kind.clone())
6129        .unwrap_or_else(|| "structured_json".to_string());
6130    let summary = if session_text.trim().is_empty() {
6131        format!("Node `{}` completed successfully.", node.node_id)
6132    } else {
6133        truncate_text(session_text.trim(), 240)
6134    };
6135    let content = match contract_kind.as_str() {
6136        "report_markdown" | "text_summary" => {
6137            json!({ "text": session_text.trim(), "session_id": session_id })
6138        }
6139        "urls" => json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id }),
6140        "citations" => {
6141            json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id })
6142        }
6143        _ => json!({ "text": session_text.trim(), "session_id": session_id }),
6144    };
6145    json!(AutomationNodeOutput {
6146        contract_kind,
6147        summary,
6148        content,
6149        created_at_ms: now_ms(),
6150        node_id: node.node_id.clone(),
6151    })
6152}
6153
6154fn automation_node_max_attempts(node: &AutomationFlowNode) -> u32 {
6155    node.retry_policy
6156        .as_ref()
6157        .and_then(|value| value.get("max_attempts"))
6158        .and_then(Value::as_u64)
6159        .map(|value| value.clamp(1, 10) as u32)
6160        .unwrap_or(3)
6161}
6162
6163async fn resolve_automation_v2_workspace_root(
6164    state: &AppState,
6165    automation: &AutomationV2Spec,
6166) -> String {
6167    if let Some(workspace_root) = automation
6168        .workspace_root
6169        .as_deref()
6170        .map(str::trim)
6171        .filter(|value| !value.is_empty())
6172        .map(str::to_string)
6173    {
6174        return workspace_root;
6175    }
6176    if let Some(workspace_root) = automation
6177        .metadata
6178        .as_ref()
6179        .and_then(|row| row.get("workspace_root"))
6180        .and_then(Value::as_str)
6181        .map(str::trim)
6182        .filter(|value| !value.is_empty())
6183        .map(str::to_string)
6184    {
6185        return workspace_root;
6186    }
6187    state.workspace_index.snapshot().await.root
6188}
6189
6190async fn execute_automation_v2_node(
6191    state: &AppState,
6192    run_id: &str,
6193    automation: &AutomationV2Spec,
6194    node: &AutomationFlowNode,
6195    agent: &AutomationAgentProfile,
6196) -> anyhow::Result<Value> {
6197    let run = state
6198        .get_automation_v2_run(run_id)
6199        .await
6200        .ok_or_else(|| anyhow::anyhow!("automation run `{}` not found", run_id))?;
6201    let upstream_inputs = build_automation_v2_upstream_inputs(&run, node)?;
6202    let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
6203    let workspace_path = PathBuf::from(&workspace_root);
6204    if !workspace_path.exists() {
6205        anyhow::bail!(
6206            "workspace_root `{}` for automation `{}` does not exist",
6207            workspace_root,
6208            automation.automation_id
6209        );
6210    }
6211    if !workspace_path.is_dir() {
6212        anyhow::bail!(
6213            "workspace_root `{}` for automation `{}` is not a directory",
6214            workspace_root,
6215            automation.automation_id
6216        );
6217    }
6218    let template = if let Some(template_id) = agent.template_id.as_deref().map(str::trim) {
6219        if template_id.is_empty() {
6220            None
6221        } else {
6222            state
6223                .agent_teams
6224                .get_template_for_workspace(&workspace_root, template_id)
6225                .await?
6226                .ok_or_else(|| anyhow::anyhow!("agent template `{}` not found", template_id))
6227                .map(Some)?
6228        }
6229    } else {
6230        None
6231    };
6232    let mut session = Session::new(
6233        Some(format!(
6234            "Automation {} / {}",
6235            automation.automation_id, node.node_id
6236        )),
6237        Some(workspace_root.clone()),
6238    );
6239    let session_id = session.id.clone();
6240    let project_id = automation_workspace_project_id(&workspace_root);
6241    session.project_id = Some(project_id.clone());
6242    session.workspace_root = Some(workspace_root);
6243    state.storage.save_session(session).await?;
6244
6245    state.add_automation_v2_session(run_id, &session_id).await;
6246
6247    let mut allowlist = merge_automation_agent_allowlist(agent, template.as_ref());
6248    if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
6249        allowlist.extend(mcp_tools.clone());
6250    }
6251    state
6252        .engine_loop
6253        .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
6254        .await;
6255    state
6256        .engine_loop
6257        .set_session_auto_approve_permissions(&session_id, true)
6258        .await;
6259
6260    let model = resolve_automation_agent_model(agent, template.as_ref());
6261    let standup_report_path = if is_agent_standup_automation(automation)
6262        && node.node_id == "standup_synthesis"
6263    {
6264        resolve_standup_report_path_for_run(automation, run.started_at_ms.unwrap_or_else(now_ms))
6265    } else {
6266        None
6267    };
6268    let prompt = render_automation_v2_prompt(
6269        automation,
6270        run_id,
6271        node,
6272        agent,
6273        &upstream_inputs,
6274        template
6275            .as_ref()
6276            .and_then(|value| value.system_prompt.as_deref()),
6277        standup_report_path.as_deref(),
6278        if is_agent_standup_automation(automation) {
6279            Some(project_id.as_str())
6280        } else {
6281            None
6282        },
6283    );
6284    let req = SendMessageRequest {
6285        parts: vec![MessagePartInput::Text { text: prompt }],
6286        model,
6287        agent: None,
6288        tool_mode: None,
6289        tool_allowlist: None,
6290        context_mode: None,
6291        write_required: None,
6292    };
6293    let result = state
6294        .engine_loop
6295        .run_prompt_async_with_context(
6296            session_id.clone(),
6297            req,
6298            Some(format!("automation-v2:{run_id}")),
6299        )
6300        .await;
6301
6302    state
6303        .engine_loop
6304        .clear_session_allowed_tools(&session_id)
6305        .await;
6306    state
6307        .engine_loop
6308        .clear_session_auto_approve_permissions(&session_id)
6309        .await;
6310    state.clear_automation_v2_session(run_id, &session_id).await;
6311
6312    result?;
6313    let session = state
6314        .storage
6315        .get_session(&session_id)
6316        .await
6317        .ok_or_else(|| anyhow::anyhow!("automation session `{}` missing after run", session_id))?;
6318    let session_text = extract_session_text_output(&session);
6319    Ok(wrap_automation_node_output(
6320        node,
6321        &session_id,
6322        &session_text,
6323    ))
6324}
6325
6326pub async fn run_automation_v2_executor(state: AppState) {
6327    loop {
6328        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
6329        let Some(run) = state.claim_next_queued_automation_v2_run().await else {
6330            continue;
6331        };
6332        let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
6333            let _ = state
6334                .update_automation_v2_run(&run.run_id, |row| {
6335                    row.status = AutomationRunStatus::Failed;
6336                    row.detail = Some("automation not found".to_string());
6337                })
6338                .await;
6339            continue;
6340        };
6341        let max_parallel = automation
6342            .execution
6343            .max_parallel_agents
6344            .unwrap_or(1)
6345            .clamp(1, 16) as usize;
6346
6347        loop {
6348            let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
6349                break;
6350            };
6351            if matches!(
6352                latest.status,
6353                AutomationRunStatus::Paused
6354                    | AutomationRunStatus::Pausing
6355                    | AutomationRunStatus::Cancelled
6356                    | AutomationRunStatus::Failed
6357                    | AutomationRunStatus::Completed
6358            ) {
6359                break;
6360            }
6361            if latest.checkpoint.pending_nodes.is_empty() {
6362                let _ = state
6363                    .update_automation_v2_run(&run.run_id, |row| {
6364                        row.status = AutomationRunStatus::Completed;
6365                        row.detail = Some("automation run completed".to_string());
6366                    })
6367                    .await;
6368                break;
6369            }
6370
6371            let completed = latest
6372                .checkpoint
6373                .completed_nodes
6374                .iter()
6375                .cloned()
6376                .collect::<std::collections::HashSet<_>>();
6377            let pending = latest.checkpoint.pending_nodes.clone();
6378            let runnable = pending
6379                .iter()
6380                .filter_map(|node_id| {
6381                    let node = automation
6382                        .flow
6383                        .nodes
6384                        .iter()
6385                        .find(|n| n.node_id == *node_id)?;
6386                    if node.depends_on.iter().all(|dep| completed.contains(dep)) {
6387                        Some(node.clone())
6388                    } else {
6389                        None
6390                    }
6391                })
6392                .take(max_parallel)
6393                .collect::<Vec<_>>();
6394
6395            if runnable.is_empty() {
6396                let _ = state
6397                    .update_automation_v2_run(&run.run_id, |row| {
6398                        row.status = AutomationRunStatus::Failed;
6399                        row.detail = Some("flow deadlock: no runnable nodes".to_string());
6400                    })
6401                    .await;
6402                break;
6403            }
6404
6405            let runnable_node_ids = runnable
6406                .iter()
6407                .map(|node| node.node_id.clone())
6408                .collect::<Vec<_>>();
6409            let _ = state
6410                .update_automation_v2_run(&run.run_id, |row| {
6411                    for node_id in &runnable_node_ids {
6412                        let attempts = row
6413                            .checkpoint
6414                            .node_attempts
6415                            .entry(node_id.clone())
6416                            .or_insert(0);
6417                        *attempts += 1;
6418                    }
6419                })
6420                .await;
6421
6422            let tasks = runnable
6423                .iter()
6424                .map(|node| {
6425                    let Some(agent) = automation
6426                        .agents
6427                        .iter()
6428                        .find(|a| a.agent_id == node.agent_id)
6429                        .cloned()
6430                    else {
6431                        return futures::future::ready((
6432                            node.node_id.clone(),
6433                            Err(anyhow::anyhow!("agent not found")),
6434                        ))
6435                        .boxed();
6436                    };
6437                    let state = state.clone();
6438                    let run_id = run.run_id.clone();
6439                    let automation = automation.clone();
6440                    let node = node.clone();
6441                    async move {
6442                        let result =
6443                            execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
6444                                .await;
6445                        (node.node_id, result)
6446                    }
6447                    .boxed()
6448                })
6449                .collect::<Vec<_>>();
6450            let outcomes = join_all(tasks).await;
6451
6452            let mut terminal_failure = None::<String>;
6453            let latest_attempts = state
6454                .get_automation_v2_run(&run.run_id)
6455                .await
6456                .map(|row| row.checkpoint.node_attempts)
6457                .unwrap_or_default();
6458            for (node_id, result) in outcomes {
6459                match result {
6460                    Ok(output) => {
6461                        let _ = state
6462                            .update_automation_v2_run(&run.run_id, |row| {
6463                                row.checkpoint.pending_nodes.retain(|id| id != &node_id);
6464                                if !row
6465                                    .checkpoint
6466                                    .completed_nodes
6467                                    .iter()
6468                                    .any(|id| id == &node_id)
6469                                {
6470                                    row.checkpoint.completed_nodes.push(node_id.clone());
6471                                }
6472                                row.checkpoint.node_outputs.insert(node_id.clone(), output);
6473                            })
6474                            .await;
6475                    }
6476                    Err(error) => {
6477                        let is_paused = state
6478                            .get_automation_v2_run(&run.run_id)
6479                            .await
6480                            .map(|row| row.status == AutomationRunStatus::Paused)
6481                            .unwrap_or(false);
6482                        if is_paused {
6483                            break;
6484                        }
6485                        let detail = truncate_text(&error.to_string(), 500);
6486                        let attempts = latest_attempts.get(&node_id).copied().unwrap_or(1);
6487                        let max_attempts = automation
6488                            .flow
6489                            .nodes
6490                            .iter()
6491                            .find(|row| row.node_id == node_id)
6492                            .map(automation_node_max_attempts)
6493                            .unwrap_or(1);
6494                        if attempts >= max_attempts {
6495                            terminal_failure = Some(format!(
6496                                "node `{}` failed after {}/{} attempts: {}",
6497                                node_id, attempts, max_attempts, detail
6498                            ));
6499                            break;
6500                        }
6501                        let _ = state
6502                            .update_automation_v2_run(&run.run_id, |row| {
6503                                row.detail = Some(format!(
6504                                    "retrying node `{}` after attempt {}/{} failed: {}",
6505                                    node_id, attempts, max_attempts, detail
6506                                ));
6507                            })
6508                            .await;
6509                    }
6510                }
6511            }
6512            if let Some(detail) = terminal_failure {
6513                let _ = state
6514                    .update_automation_v2_run(&run.run_id, |row| {
6515                        row.status = AutomationRunStatus::Failed;
6516                        row.detail = Some(detail);
6517                    })
6518                    .await;
6519                break;
6520            }
6521        }
6522    }
6523}
6524
6525async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
6526    let normalized_entrypoint = run.entrypoint.trim();
6527    let known_tool = state
6528        .tools
6529        .list()
6530        .await
6531        .into_iter()
6532        .any(|schema| schema.name == normalized_entrypoint);
6533    if known_tool {
6534        let args = if run.args.is_object() {
6535            run.args.clone()
6536        } else {
6537            serde_json::json!({})
6538        };
6539        return format!("/tool {} {}", normalized_entrypoint, args);
6540    }
6541
6542    if let Some(objective) = routine_objective_from_args(run) {
6543        return build_routine_mission_prompt(run, &objective);
6544    }
6545
6546    format!(
6547        "Execute routine '{}' using entrypoint '{}' with args: {}",
6548        run.routine_id, run.entrypoint, run.args
6549    )
6550}
6551
6552fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
6553    run.args
6554        .get("prompt")
6555        .and_then(|v| v.as_str())
6556        .map(str::trim)
6557        .filter(|v| !v.is_empty())
6558        .map(ToString::to_string)
6559}
6560
6561fn routine_mode_from_args(args: &Value) -> &str {
6562    args.get("mode")
6563        .and_then(|v| v.as_str())
6564        .map(str::trim)
6565        .filter(|v| !v.is_empty())
6566        .unwrap_or("standalone")
6567}
6568
6569fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
6570    args.get("success_criteria")
6571        .and_then(|v| v.as_array())
6572        .map(|rows| {
6573            rows.iter()
6574                .filter_map(|row| row.as_str())
6575                .map(str::trim)
6576                .filter(|row| !row.is_empty())
6577                .map(ToString::to_string)
6578                .collect::<Vec<_>>()
6579        })
6580        .unwrap_or_default()
6581}
6582
6583fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
6584    let mode = routine_mode_from_args(&run.args);
6585    let success_criteria = routine_success_criteria_from_args(&run.args);
6586    let orchestrator_only_tool_calls = run
6587        .args
6588        .get("orchestrator_only_tool_calls")
6589        .and_then(|v| v.as_bool())
6590        .unwrap_or(false);
6591
6592    let mut lines = vec![
6593        format!("Automation ID: {}", run.routine_id),
6594        format!("Run ID: {}", run.run_id),
6595        format!("Mode: {}", mode),
6596        format!("Mission Objective: {}", objective),
6597    ];
6598
6599    if !success_criteria.is_empty() {
6600        lines.push("Success Criteria:".to_string());
6601        for criterion in success_criteria {
6602            lines.push(format!("- {}", criterion));
6603        }
6604    }
6605
6606    if run.allowed_tools.is_empty() {
6607        lines.push("Allowed Tools: all available by current policy".to_string());
6608    } else {
6609        lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
6610    }
6611
6612    if run.output_targets.is_empty() {
6613        lines.push("Output Targets: none configured".to_string());
6614    } else {
6615        lines.push("Output Targets:".to_string());
6616        for target in &run.output_targets {
6617            lines.push(format!("- {}", target));
6618        }
6619    }
6620
6621    if mode.eq_ignore_ascii_case("orchestrated") {
6622        lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
6623        lines
6624            .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
6625        if orchestrator_only_tool_calls {
6626            lines.push(
6627                "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
6628                    .to_string(),
6629            );
6630        }
6631    } else {
6632        lines.push("Execution Pattern: Standalone mission run".to_string());
6633    }
6634
6635    lines.push(
6636        "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
6637            .to_string(),
6638    );
6639
6640    lines.join("\n")
6641}
6642
6643fn truncate_text(input: &str, max_len: usize) -> String {
6644    if input.len() <= max_len {
6645        return input.to_string();
6646    }
6647    let mut out = input[..max_len].to_string();
6648    out.push_str("...<truncated>");
6649    out
6650}
6651
6652async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
6653    if run.output_targets.is_empty() {
6654        return;
6655    }
6656    for target in &run.output_targets {
6657        let artifact = RoutineRunArtifact {
6658            artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
6659            uri: target.clone(),
6660            kind: "output_target".to_string(),
6661            label: Some("configured output target".to_string()),
6662            created_at_ms: now_ms(),
6663            metadata: Some(serde_json::json!({
6664                "source": "routine.output_targets",
6665                "runID": run.run_id,
6666                "routineID": run.routine_id,
6667            })),
6668        };
6669        let _ = state
6670            .append_routine_run_artifact(&run.run_id, artifact.clone())
6671            .await;
6672        state.event_bus.publish(EngineEvent::new(
6673            "routine.run.artifact_added",
6674            serde_json::json!({
6675                "runID": run.run_id,
6676                "routineID": run.routine_id,
6677                "artifact": artifact,
6678            }),
6679        ));
6680    }
6681}
6682
6683fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
6684    let obj = value.as_object()?;
6685    let provider_id = obj.get("provider_id")?.as_str()?.trim();
6686    let model_id = obj.get("model_id")?.as_str()?.trim();
6687    if provider_id.is_empty() || model_id.is_empty() {
6688        return None;
6689    }
6690    Some(ModelSpec {
6691        provider_id: provider_id.to_string(),
6692        model_id: model_id.to_string(),
6693    })
6694}
6695
6696fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
6697    args.get("model_policy")
6698        .and_then(|v| v.get("role_models"))
6699        .and_then(|v| v.get(role))
6700        .and_then(parse_model_spec)
6701}
6702
6703fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
6704    args.get("model_policy")
6705        .and_then(|v| v.get("default_model"))
6706        .and_then(parse_model_spec)
6707}
6708
6709fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6710    let provider_id = config
6711        .get("default_provider")
6712        .and_then(|v| v.as_str())
6713        .map(str::trim)
6714        .filter(|v| !v.is_empty())?;
6715    let model_id = config
6716        .get("providers")
6717        .and_then(|v| v.get(provider_id))
6718        .and_then(|v| v.get("default_model"))
6719        .and_then(|v| v.as_str())
6720        .map(str::trim)
6721        .filter(|v| !v.is_empty())?;
6722    Some(ModelSpec {
6723        provider_id: provider_id.to_string(),
6724        model_id: model_id.to_string(),
6725    })
6726}
6727
6728fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
6729    providers.iter().any(|provider| {
6730        provider.id == spec.provider_id
6731            && provider
6732                .models
6733                .iter()
6734                .any(|model| model.id == spec.model_id)
6735    })
6736}
6737
6738async fn resolve_routine_model_spec_for_run(
6739    state: &AppState,
6740    run: &RoutineRunRecord,
6741) -> (Option<ModelSpec>, String) {
6742    let providers = state.providers.list().await;
6743    let mode = routine_mode_from_args(&run.args);
6744    let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
6745
6746    if mode.eq_ignore_ascii_case("orchestrated") {
6747        if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
6748            requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
6749        }
6750    }
6751    if let Some(default_model) = default_model_spec_from_args(&run.args) {
6752        requested.push((default_model, "args.model_policy.default_model"));
6753    }
6754    let effective_config = state.config.get_effective_value().await;
6755    if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
6756        requested.push((config_default, "config.default_provider"));
6757    }
6758
6759    for (candidate, source) in requested {
6760        if provider_catalog_has_model(&providers, &candidate) {
6761            return (Some(candidate), source.to_string());
6762        }
6763    }
6764
6765    let fallback = providers
6766        .into_iter()
6767        .find(|provider| !provider.models.is_empty())
6768        .and_then(|provider| {
6769            let model = provider.models.first()?;
6770            Some(ModelSpec {
6771                provider_id: provider.id,
6772                model_id: model.id.clone(),
6773            })
6774        });
6775
6776    (fallback, "provider_catalog_fallback".to_string())
6777}
6778
6779#[cfg(test)]
6780mod tests {
6781    use super::*;
6782
6783    fn test_state_with_path(path: PathBuf) -> AppState {
6784        let mut state = AppState::new_starting("test-attempt".to_string(), true);
6785        state.shared_resources_path = path;
6786        state.routines_path = tmp_routines_file("shared-state");
6787        state.routine_history_path = tmp_routines_file("routine-history");
6788        state.routine_runs_path = tmp_routines_file("routine-runs");
6789        state
6790    }
6791
6792    fn tmp_resource_file(name: &str) -> PathBuf {
6793        std::env::temp_dir().join(format!(
6794            "tandem-server-{name}-{}.json",
6795            uuid::Uuid::new_v4()
6796        ))
6797    }
6798
6799    fn tmp_routines_file(name: &str) -> PathBuf {
6800        std::env::temp_dir().join(format!(
6801            "tandem-server-routines-{name}-{}.json",
6802            uuid::Uuid::new_v4()
6803        ))
6804    }
6805
6806    #[test]
6807    fn default_model_spec_from_effective_config_reads_default_route() {
6808        let cfg = serde_json::json!({
6809            "default_provider": "openrouter",
6810            "providers": {
6811                "openrouter": {
6812                    "default_model": "google/gemini-3-flash-preview"
6813                }
6814            }
6815        });
6816        let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
6817        assert_eq!(spec.provider_id, "openrouter");
6818        assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
6819    }
6820
6821    #[test]
6822    fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
6823        let missing_provider = serde_json::json!({
6824            "providers": {
6825                "openrouter": {
6826                    "default_model": "google/gemini-3-flash-preview"
6827                }
6828            }
6829        });
6830        assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
6831
6832        let missing_model = serde_json::json!({
6833            "default_provider": "openrouter",
6834            "providers": {
6835                "openrouter": {}
6836            }
6837        });
6838        assert!(default_model_spec_from_effective_config(&missing_model).is_none());
6839    }
6840
6841    #[tokio::test]
6842    async fn shared_resource_put_increments_revision() {
6843        let path = tmp_resource_file("shared-resource-put");
6844        let state = test_state_with_path(path.clone());
6845
6846        let first = state
6847            .put_shared_resource(
6848                "project/demo/board".to_string(),
6849                serde_json::json!({"status":"todo"}),
6850                None,
6851                "agent-1".to_string(),
6852                None,
6853            )
6854            .await
6855            .expect("first put");
6856        assert_eq!(first.rev, 1);
6857
6858        let second = state
6859            .put_shared_resource(
6860                "project/demo/board".to_string(),
6861                serde_json::json!({"status":"doing"}),
6862                Some(1),
6863                "agent-2".to_string(),
6864                Some(60_000),
6865            )
6866            .await
6867            .expect("second put");
6868        assert_eq!(second.rev, 2);
6869        assert_eq!(second.updated_by, "agent-2");
6870        assert_eq!(second.ttl_ms, Some(60_000));
6871
6872        let raw = tokio::fs::read_to_string(path.clone())
6873            .await
6874            .expect("persisted");
6875        assert!(raw.contains("\"rev\": 2"));
6876        let _ = tokio::fs::remove_file(path).await;
6877    }
6878
6879    #[tokio::test]
6880    async fn shared_resource_put_detects_revision_conflict() {
6881        let path = tmp_resource_file("shared-resource-conflict");
6882        let state = test_state_with_path(path.clone());
6883
6884        let _ = state
6885            .put_shared_resource(
6886                "mission/demo/card-1".to_string(),
6887                serde_json::json!({"title":"Card 1"}),
6888                None,
6889                "agent-1".to_string(),
6890                None,
6891            )
6892            .await
6893            .expect("seed put");
6894
6895        let conflict = state
6896            .put_shared_resource(
6897                "mission/demo/card-1".to_string(),
6898                serde_json::json!({"title":"Card 1 edited"}),
6899                Some(99),
6900                "agent-2".to_string(),
6901                None,
6902            )
6903            .await
6904            .expect_err("expected conflict");
6905
6906        match conflict {
6907            ResourceStoreError::RevisionConflict(conflict) => {
6908                assert_eq!(conflict.expected_rev, Some(99));
6909                assert_eq!(conflict.current_rev, Some(1));
6910            }
6911            other => panic!("unexpected error: {other:?}"),
6912        }
6913
6914        let _ = tokio::fs::remove_file(path).await;
6915    }
6916
6917    #[tokio::test]
6918    async fn shared_resource_rejects_invalid_namespace_key() {
6919        let path = tmp_resource_file("shared-resource-invalid-key");
6920        let state = test_state_with_path(path.clone());
6921
6922        let error = state
6923            .put_shared_resource(
6924                "global/demo/key".to_string(),
6925                serde_json::json!({"x":1}),
6926                None,
6927                "agent-1".to_string(),
6928                None,
6929            )
6930            .await
6931            .expect_err("invalid key should fail");
6932
6933        match error {
6934            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
6935            other => panic!("unexpected error: {other:?}"),
6936        }
6937
6938        assert!(!path.exists());
6939    }
6940
6941    #[test]
6942    fn derive_status_index_update_for_run_started() {
6943        let event = EngineEvent::new(
6944            "session.run.started",
6945            serde_json::json!({
6946                "sessionID": "s-1",
6947                "runID": "r-1"
6948            }),
6949        );
6950        let update = derive_status_index_update(&event).expect("update");
6951        assert_eq!(update.key, "run/s-1/status");
6952        assert_eq!(
6953            update.value.get("state").and_then(|v| v.as_str()),
6954            Some("running")
6955        );
6956        assert_eq!(
6957            update.value.get("phase").and_then(|v| v.as_str()),
6958            Some("run")
6959        );
6960    }
6961
6962    #[test]
6963    fn derive_status_index_update_for_tool_invocation() {
6964        let event = EngineEvent::new(
6965            "message.part.updated",
6966            serde_json::json!({
6967                "sessionID": "s-2",
6968                "runID": "r-2",
6969                "part": { "type": "tool-invocation", "tool": "todo_write" }
6970            }),
6971        );
6972        let update = derive_status_index_update(&event).expect("update");
6973        assert_eq!(update.key, "run/s-2/status");
6974        assert_eq!(
6975            update.value.get("phase").and_then(|v| v.as_str()),
6976            Some("tool")
6977        );
6978        assert_eq!(
6979            update.value.get("toolActive").and_then(|v| v.as_bool()),
6980            Some(true)
6981        );
6982        assert_eq!(
6983            update.value.get("tool").and_then(|v| v.as_str()),
6984            Some("todo_write")
6985        );
6986    }
6987
6988    #[test]
6989    fn misfire_skip_drops_runs_and_advances_next_fire() {
6990        let (count, next_fire) =
6991            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
6992        assert_eq!(count, 0);
6993        assert_eq!(next_fire, 11_000);
6994    }
6995
6996    #[test]
6997    fn misfire_run_once_emits_single_trigger() {
6998        let (count, next_fire) =
6999            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
7000        assert_eq!(count, 1);
7001        assert_eq!(next_fire, 11_000);
7002    }
7003
7004    #[test]
7005    fn misfire_catch_up_caps_trigger_count() {
7006        let (count, next_fire) = compute_misfire_plan(
7007            25_000,
7008            5_000,
7009            1_000,
7010            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
7011        );
7012        assert_eq!(count, 3);
7013        assert_eq!(next_fire, 26_000);
7014    }
7015
7016    #[tokio::test]
7017    async fn routine_put_persists_and_loads() {
7018        let routines_path = tmp_routines_file("persist-load");
7019        let mut state = AppState::new_starting("routines-put".to_string(), true);
7020        state.routines_path = routines_path.clone();
7021
7022        let routine = RoutineSpec {
7023            routine_id: "routine-1".to_string(),
7024            name: "Digest".to_string(),
7025            status: RoutineStatus::Active,
7026            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7027            timezone: "UTC".to_string(),
7028            misfire_policy: RoutineMisfirePolicy::RunOnce,
7029            entrypoint: "mission.default".to_string(),
7030            args: serde_json::json!({"topic":"status"}),
7031            allowed_tools: vec![],
7032            output_targets: vec![],
7033            creator_type: "user".to_string(),
7034            creator_id: "user-1".to_string(),
7035            requires_approval: true,
7036            external_integrations_allowed: false,
7037            next_fire_at_ms: Some(5_000),
7038            last_fired_at_ms: None,
7039        };
7040
7041        state.put_routine(routine).await.expect("store routine");
7042
7043        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
7044        reloaded.routines_path = routines_path.clone();
7045        reloaded.load_routines().await.expect("load routines");
7046        let list = reloaded.list_routines().await;
7047        assert_eq!(list.len(), 1);
7048        assert_eq!(list[0].routine_id, "routine-1");
7049
7050        let _ = tokio::fs::remove_file(routines_path).await;
7051    }
7052
7053    #[tokio::test]
7054    async fn persist_routines_does_not_clobber_existing_store_with_empty_state() {
7055        let routines_path = tmp_routines_file("persist-guard");
7056        let mut writer = AppState::new_starting("routines-writer".to_string(), true);
7057        writer.routines_path = routines_path.clone();
7058        writer
7059            .put_routine(RoutineSpec {
7060                routine_id: "automation-guarded".to_string(),
7061                name: "Guarded Automation".to_string(),
7062                status: RoutineStatus::Active,
7063                schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7064                timezone: "UTC".to_string(),
7065                misfire_policy: RoutineMisfirePolicy::RunOnce,
7066                entrypoint: "mission.default".to_string(),
7067                args: serde_json::json!({
7068                    "prompt": "Keep this saved across restart"
7069                }),
7070                allowed_tools: vec!["read".to_string()],
7071                output_targets: vec![],
7072                creator_type: "user".to_string(),
7073                creator_id: "user-1".to_string(),
7074                requires_approval: false,
7075                external_integrations_allowed: false,
7076                next_fire_at_ms: Some(5_000),
7077                last_fired_at_ms: None,
7078            })
7079            .await
7080            .expect("persist baseline routine");
7081
7082        let mut empty_state = AppState::new_starting("routines-empty".to_string(), true);
7083        empty_state.routines_path = routines_path.clone();
7084        let persist = empty_state.persist_routines().await;
7085        assert!(
7086            persist.is_err(),
7087            "empty state should not overwrite existing routines store"
7088        );
7089
7090        let raw = tokio::fs::read_to_string(&routines_path)
7091            .await
7092            .expect("read guarded routines file");
7093        let parsed: std::collections::HashMap<String, RoutineSpec> =
7094            serde_json::from_str(&raw).expect("parse guarded routines file");
7095        assert!(parsed.contains_key("automation-guarded"));
7096
7097        let _ = tokio::fs::remove_file(routines_path.clone()).await;
7098        let _ = tokio::fs::remove_file(sibling_backup_path(&routines_path)).await;
7099    }
7100
7101    #[tokio::test]
7102    async fn load_routines_recovers_from_backup_when_primary_corrupt() {
7103        let routines_path = tmp_routines_file("backup-recovery");
7104        let backup_path = sibling_backup_path(&routines_path);
7105        let mut state = AppState::new_starting("routines-backup-recovery".to_string(), true);
7106        state.routines_path = routines_path.clone();
7107
7108        let primary = "{ not valid json";
7109        tokio::fs::write(&routines_path, primary)
7110            .await
7111            .expect("write corrupt primary");
7112        let backup = serde_json::json!({
7113            "routine-1": {
7114                "routine_id": "routine-1",
7115                "name": "Recovered",
7116                "status": "active",
7117                "schedule": { "interval_seconds": { "seconds": 60 } },
7118                "timezone": "UTC",
7119                "misfire_policy": { "type": "run_once" },
7120                "entrypoint": "mission.default",
7121                "args": {},
7122                "allowed_tools": [],
7123                "output_targets": [],
7124                "creator_type": "user",
7125                "creator_id": "u-1",
7126                "requires_approval": true,
7127                "external_integrations_allowed": false,
7128                "next_fire_at_ms": null,
7129                "last_fired_at_ms": null
7130            }
7131        });
7132        tokio::fs::write(&backup_path, serde_json::to_string_pretty(&backup).unwrap())
7133            .await
7134            .expect("write backup");
7135
7136        state.load_routines().await.expect("load from backup");
7137        let list = state.list_routines().await;
7138        assert_eq!(list.len(), 1);
7139        assert_eq!(list[0].routine_id, "routine-1");
7140
7141        let _ = tokio::fs::remove_file(routines_path).await;
7142        let _ = tokio::fs::remove_file(backup_path).await;
7143    }
7144
7145    #[tokio::test]
7146    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
7147        let routines_path = tmp_routines_file("misfire-eval");
7148        let mut state = AppState::new_starting("routines-eval".to_string(), true);
7149        state.routines_path = routines_path.clone();
7150
7151        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
7152            routine_id: id.to_string(),
7153            name: id.to_string(),
7154            status: RoutineStatus::Active,
7155            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
7156            timezone: "UTC".to_string(),
7157            misfire_policy: policy,
7158            entrypoint: "mission.default".to_string(),
7159            args: serde_json::json!({}),
7160            allowed_tools: vec![],
7161            output_targets: vec![],
7162            creator_type: "user".to_string(),
7163            creator_id: "u-1".to_string(),
7164            requires_approval: false,
7165            external_integrations_allowed: false,
7166            next_fire_at_ms: Some(5_000),
7167            last_fired_at_ms: None,
7168        };
7169
7170        state
7171            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
7172            .await
7173            .expect("put skip");
7174        state
7175            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
7176            .await
7177            .expect("put once");
7178        state
7179            .put_routine(base(
7180                "routine-catch",
7181                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
7182            ))
7183            .await
7184            .expect("put catch");
7185
7186        let plans = state.evaluate_routine_misfires(10_500).await;
7187        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
7188        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
7189        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
7190
7191        assert!(plan_skip.is_none());
7192        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
7193        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
7194
7195        let stored = state.list_routines().await;
7196        let skip_next = stored
7197            .iter()
7198            .find(|r| r.routine_id == "routine-skip")
7199            .and_then(|r| r.next_fire_at_ms)
7200            .expect("skip next");
7201        assert!(skip_next > 10_500);
7202
7203        let _ = tokio::fs::remove_file(routines_path).await;
7204    }
7205
7206    #[test]
7207    fn routine_policy_blocks_external_side_effects_by_default() {
7208        let routine = RoutineSpec {
7209            routine_id: "routine-policy-1".to_string(),
7210            name: "Connector routine".to_string(),
7211            status: RoutineStatus::Active,
7212            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7213            timezone: "UTC".to_string(),
7214            misfire_policy: RoutineMisfirePolicy::RunOnce,
7215            entrypoint: "connector.email.reply".to_string(),
7216            args: serde_json::json!({}),
7217            allowed_tools: vec![],
7218            output_targets: vec![],
7219            creator_type: "user".to_string(),
7220            creator_id: "u-1".to_string(),
7221            requires_approval: true,
7222            external_integrations_allowed: false,
7223            next_fire_at_ms: None,
7224            last_fired_at_ms: None,
7225        };
7226
7227        let decision = evaluate_routine_execution_policy(&routine, "manual");
7228        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
7229    }
7230
7231    #[test]
7232    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
7233        let routine = RoutineSpec {
7234            routine_id: "routine-policy-2".to_string(),
7235            name: "Connector routine".to_string(),
7236            status: RoutineStatus::Active,
7237            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7238            timezone: "UTC".to_string(),
7239            misfire_policy: RoutineMisfirePolicy::RunOnce,
7240            entrypoint: "connector.email.reply".to_string(),
7241            args: serde_json::json!({}),
7242            allowed_tools: vec![],
7243            output_targets: vec![],
7244            creator_type: "user".to_string(),
7245            creator_id: "u-1".to_string(),
7246            requires_approval: true,
7247            external_integrations_allowed: true,
7248            next_fire_at_ms: None,
7249            last_fired_at_ms: None,
7250        };
7251
7252        let decision = evaluate_routine_execution_policy(&routine, "manual");
7253        assert!(matches!(
7254            decision,
7255            RoutineExecutionDecision::RequiresApproval { .. }
7256        ));
7257    }
7258
7259    #[test]
7260    fn routine_policy_allows_non_external_entrypoints() {
7261        let routine = RoutineSpec {
7262            routine_id: "routine-policy-3".to_string(),
7263            name: "Internal mission routine".to_string(),
7264            status: RoutineStatus::Active,
7265            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7266            timezone: "UTC".to_string(),
7267            misfire_policy: RoutineMisfirePolicy::RunOnce,
7268            entrypoint: "mission.default".to_string(),
7269            args: serde_json::json!({}),
7270            allowed_tools: vec![],
7271            output_targets: vec![],
7272            creator_type: "user".to_string(),
7273            creator_id: "u-1".to_string(),
7274            requires_approval: true,
7275            external_integrations_allowed: false,
7276            next_fire_at_ms: None,
7277            last_fired_at_ms: None,
7278        };
7279
7280        let decision = evaluate_routine_execution_policy(&routine, "manual");
7281        assert_eq!(decision, RoutineExecutionDecision::Allowed);
7282    }
7283
7284    #[tokio::test]
7285    async fn claim_next_queued_routine_run_marks_oldest_running() {
7286        let mut state = AppState::new_starting("routine-claim".to_string(), true);
7287        state.routine_runs_path = tmp_routines_file("routine-claim-runs");
7288
7289        let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
7290            run_id: run_id.to_string(),
7291            routine_id: "routine-claim".to_string(),
7292            trigger_type: "manual".to_string(),
7293            run_count: 1,
7294            status: RoutineRunStatus::Queued,
7295            created_at_ms,
7296            updated_at_ms: created_at_ms,
7297            fired_at_ms: Some(created_at_ms),
7298            started_at_ms: None,
7299            finished_at_ms: None,
7300            requires_approval: false,
7301            approval_reason: None,
7302            denial_reason: None,
7303            paused_reason: None,
7304            detail: None,
7305            entrypoint: "mission.default".to_string(),
7306            args: serde_json::json!({}),
7307            allowed_tools: vec![],
7308            output_targets: vec![],
7309            artifacts: vec![],
7310            active_session_ids: vec![],
7311            latest_session_id: None,
7312            prompt_tokens: 0,
7313            completion_tokens: 0,
7314            total_tokens: 0,
7315            estimated_cost_usd: 0.0,
7316        };
7317
7318        {
7319            let mut guard = state.routine_runs.write().await;
7320            guard.insert("run-late".to_string(), mk("run-late", 2_000));
7321            guard.insert("run-early".to_string(), mk("run-early", 1_000));
7322        }
7323        state.persist_routine_runs().await.expect("persist");
7324
7325        let claimed = state
7326            .claim_next_queued_routine_run()
7327            .await
7328            .expect("claimed run");
7329        assert_eq!(claimed.run_id, "run-early");
7330        assert_eq!(claimed.status, RoutineRunStatus::Running);
7331        assert!(claimed.started_at_ms.is_some());
7332    }
7333
7334    #[tokio::test]
7335    async fn routine_session_policy_roundtrip_normalizes_tools() {
7336        let state = AppState::new_starting("routine-policy-hook".to_string(), true);
7337        state
7338            .set_routine_session_policy(
7339                "session-routine-1".to_string(),
7340                "run-1".to_string(),
7341                "routine-1".to_string(),
7342                vec![
7343                    "read".to_string(),
7344                    " mcp.arcade.search ".to_string(),
7345                    "read".to_string(),
7346                    "".to_string(),
7347                ],
7348            )
7349            .await;
7350
7351        let policy = state
7352            .routine_session_policy("session-routine-1")
7353            .await
7354            .expect("policy");
7355        assert_eq!(
7356            policy.allowed_tools,
7357            vec!["read".to_string(), "mcp.arcade.search".to_string()]
7358        );
7359    }
7360
7361    #[tokio::test]
7362    async fn routine_run_preserves_latest_session_id_after_session_clears() {
7363        let state = AppState::new_starting("routine-latest-session".to_string(), true);
7364        let routine = RoutineSpec {
7365            routine_id: "routine-session-link".to_string(),
7366            name: "Routine Session Link".to_string(),
7367            status: RoutineStatus::Active,
7368            schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7369            timezone: "UTC".to_string(),
7370            misfire_policy: RoutineMisfirePolicy::Skip,
7371            entrypoint: "mission.default".to_string(),
7372            args: serde_json::json!({}),
7373            allowed_tools: vec![],
7374            output_targets: vec![],
7375            creator_type: "user".to_string(),
7376            creator_id: "test".to_string(),
7377            requires_approval: false,
7378            external_integrations_allowed: false,
7379            next_fire_at_ms: None,
7380            last_fired_at_ms: None,
7381        };
7382
7383        let run = state
7384            .create_routine_run(&routine, "manual", 1, RoutineRunStatus::Queued, None)
7385            .await;
7386        state
7387            .add_active_session_id(&run.run_id, "session-123".to_string())
7388            .await
7389            .expect("active session added");
7390        state
7391            .clear_active_session_id(&run.run_id, "session-123")
7392            .await
7393            .expect("active session cleared");
7394
7395        let updated = state
7396            .get_routine_run(&run.run_id)
7397            .await
7398            .expect("run exists");
7399        assert!(updated.active_session_ids.is_empty());
7400        assert_eq!(updated.latest_session_id.as_deref(), Some("session-123"));
7401    }
7402
7403    #[test]
7404    fn routine_mission_prompt_includes_orchestrated_contract() {
7405        let run = RoutineRunRecord {
7406            run_id: "run-orchestrated-1".to_string(),
7407            routine_id: "automation-orchestrated".to_string(),
7408            trigger_type: "manual".to_string(),
7409            run_count: 1,
7410            status: RoutineRunStatus::Queued,
7411            created_at_ms: 1_000,
7412            updated_at_ms: 1_000,
7413            fired_at_ms: Some(1_000),
7414            started_at_ms: None,
7415            finished_at_ms: None,
7416            requires_approval: true,
7417            approval_reason: None,
7418            denial_reason: None,
7419            paused_reason: None,
7420            detail: None,
7421            entrypoint: "mission.default".to_string(),
7422            args: serde_json::json!({
7423                "prompt": "Coordinate a multi-step release readiness check.",
7424                "mode": "orchestrated",
7425                "success_criteria": ["All blockers listed", "Output artifact written"],
7426                "orchestrator_only_tool_calls": true
7427            }),
7428            allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
7429            output_targets: vec!["file://reports/release-readiness.md".to_string()],
7430            artifacts: vec![],
7431            active_session_ids: vec![],
7432            latest_session_id: None,
7433            prompt_tokens: 0,
7434            completion_tokens: 0,
7435            total_tokens: 0,
7436            estimated_cost_usd: 0.0,
7437        };
7438
7439        let objective = routine_objective_from_args(&run).expect("objective");
7440        let prompt = build_routine_mission_prompt(&run, &objective);
7441
7442        assert!(prompt.contains("Mode: orchestrated"));
7443        assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
7444        assert!(prompt.contains("only the orchestrator may execute tools"));
7445        assert!(prompt.contains("Allowed Tools: read, webfetch"));
7446        assert!(prompt.contains("file://reports/release-readiness.md"));
7447    }
7448
7449    #[test]
7450    fn routine_mission_prompt_includes_standalone_defaults() {
7451        let run = RoutineRunRecord {
7452            run_id: "run-standalone-1".to_string(),
7453            routine_id: "automation-standalone".to_string(),
7454            trigger_type: "manual".to_string(),
7455            run_count: 1,
7456            status: RoutineRunStatus::Queued,
7457            created_at_ms: 2_000,
7458            updated_at_ms: 2_000,
7459            fired_at_ms: Some(2_000),
7460            started_at_ms: None,
7461            finished_at_ms: None,
7462            requires_approval: false,
7463            approval_reason: None,
7464            denial_reason: None,
7465            paused_reason: None,
7466            detail: None,
7467            entrypoint: "mission.default".to_string(),
7468            args: serde_json::json!({
7469                "prompt": "Summarize top engineering updates.",
7470                "success_criteria": ["Three bullet summary"]
7471            }),
7472            allowed_tools: vec![],
7473            output_targets: vec![],
7474            artifacts: vec![],
7475            active_session_ids: vec![],
7476            latest_session_id: None,
7477            prompt_tokens: 0,
7478            completion_tokens: 0,
7479            total_tokens: 0,
7480            estimated_cost_usd: 0.0,
7481        };
7482
7483        let objective = routine_objective_from_args(&run).expect("objective");
7484        let prompt = build_routine_mission_prompt(&run, &objective);
7485
7486        assert!(prompt.contains("Mode: standalone"));
7487        assert!(prompt.contains("Execution Pattern: Standalone mission run"));
7488        assert!(prompt.contains("Allowed Tools: all available by current policy"));
7489        assert!(prompt.contains("Output Targets: none configured"));
7490    }
7491
7492    #[test]
7493    fn shared_resource_key_validator_accepts_swarm_active_tasks() {
7494        assert!(is_valid_resource_key("swarm.active_tasks"));
7495        assert!(is_valid_resource_key("project/demo"));
7496        assert!(!is_valid_resource_key("swarm//active_tasks"));
7497        assert!(!is_valid_resource_key("misc/demo"));
7498    }
7499}