Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22    EngineEvent, HostOs, HostRuntimeContext, MessagePart, MessagePartInput, MessageRole, ModelSpec,
23    PathStyle, SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30    resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31    PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38use tandem_workflows::{
39    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
40    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
41    WorkflowRunStatus, WorkflowSourceKind, WorkflowSourceRef, WorkflowSpec,
42    WorkflowValidationMessage,
43};
44
45mod agent_teams;
46mod browser;
47mod bug_monitor_github;
48mod capability_resolver;
49mod http;
50mod mcp_catalog;
51mod pack_builder;
52mod pack_manager;
53mod preset_composer;
54mod preset_registry;
55mod preset_summary;
56pub mod webui;
57mod workflows;
58
59pub use agent_teams::AgentTeamRuntime;
60pub use browser::{
61    install_browser_sidecar, BrowserHealthSummary, BrowserSidecarInstallResult,
62    BrowserSmokeTestResult, BrowserSubsystem,
63};
64pub use capability_resolver::CapabilityResolver;
65pub use http::serve;
66pub use pack_manager::PackManager;
67pub use preset_composer::PromptComposeInput;
68pub use preset_registry::PresetRegistry;
69pub use workflows::{
70    canonical_workflow_event_names, dispatch_workflow_event, execute_hook_binding,
71    execute_workflow, parse_workflow_action, run_workflow_dispatcher, simulate_workflow_event,
72};
73
74pub(crate) fn normalize_absolute_workspace_root(raw: &str) -> Result<String, String> {
75    let trimmed = raw.trim();
76    if trimmed.is_empty() {
77        return Err("workspace_root is required".to_string());
78    }
79    let as_path = PathBuf::from(trimmed);
80    if !as_path.is_absolute() {
81        return Err("workspace_root must be an absolute path".to_string());
82    }
83    tandem_core::normalize_workspace_path(trimmed)
84        .ok_or_else(|| "workspace_root is invalid".to_string())
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, Default)]
88pub struct ChannelStatus {
89    pub enabled: bool,
90    pub connected: bool,
91    pub last_error: Option<String>,
92    pub active_sessions: u64,
93    pub meta: Value,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
97pub struct WebUiConfig {
98    #[serde(default)]
99    pub enabled: bool,
100    #[serde(default = "default_web_ui_prefix")]
101    pub path_prefix: String,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, Default)]
105pub struct ChannelsConfigFile {
106    pub telegram: Option<TelegramConfigFile>,
107    pub discord: Option<DiscordConfigFile>,
108    pub slack: Option<SlackConfigFile>,
109    #[serde(default)]
110    pub tool_policy: tandem_channels::config::ChannelToolPolicy,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct TelegramConfigFile {
115    pub bot_token: String,
116    #[serde(default = "default_allow_all")]
117    pub allowed_users: Vec<String>,
118    #[serde(default)]
119    pub mention_only: bool,
120    #[serde(default)]
121    pub style_profile: tandem_channels::config::TelegramStyleProfile,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DiscordConfigFile {
126    pub bot_token: String,
127    #[serde(default)]
128    pub guild_id: Option<String>,
129    #[serde(default = "default_allow_all")]
130    pub allowed_users: Vec<String>,
131    #[serde(default = "default_discord_mention_only")]
132    pub mention_only: bool,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct SlackConfigFile {
137    pub bot_token: String,
138    pub channel_id: String,
139    #[serde(default = "default_allow_all")]
140    pub allowed_users: Vec<String>,
141    #[serde(default)]
142    pub mention_only: bool,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, Default)]
146struct EffectiveAppConfig {
147    #[serde(default)]
148    pub channels: ChannelsConfigFile,
149    #[serde(default)]
150    pub web_ui: WebUiConfig,
151    #[serde(default)]
152    pub browser: tandem_core::BrowserConfig,
153    #[serde(default)]
154    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
155}
156
157#[derive(Default)]
158pub struct ChannelRuntime {
159    pub listeners: Option<tokio::task::JoinSet<()>>,
160    pub statuses: std::collections::HashMap<String, ChannelStatus>,
161}
162
163#[derive(Debug, Clone)]
164pub struct EngineLease {
165    pub lease_id: String,
166    pub client_id: String,
167    pub client_type: String,
168    pub acquired_at_ms: u64,
169    pub last_renewed_at_ms: u64,
170    pub ttl_ms: u64,
171}
172
173impl EngineLease {
174    pub fn is_expired(&self, now_ms: u64) -> bool {
175        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
176    }
177}
178
179#[derive(Debug, Clone, Serialize)]
180pub struct ActiveRun {
181    #[serde(rename = "runID")]
182    pub run_id: String,
183    #[serde(rename = "startedAtMs")]
184    pub started_at_ms: u64,
185    #[serde(rename = "lastActivityAtMs")]
186    pub last_activity_at_ms: u64,
187    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
188    pub client_id: Option<String>,
189    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
190    pub agent_id: Option<String>,
191    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
192    pub agent_profile: Option<String>,
193}
194
195#[derive(Clone, Default)]
196pub struct RunRegistry {
197    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
198}
199
200impl RunRegistry {
201    pub fn new() -> Self {
202        Self::default()
203    }
204
205    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
206        self.active.read().await.get(session_id).cloned()
207    }
208
209    pub async fn acquire(
210        &self,
211        session_id: &str,
212        run_id: String,
213        client_id: Option<String>,
214        agent_id: Option<String>,
215        agent_profile: Option<String>,
216    ) -> std::result::Result<ActiveRun, ActiveRun> {
217        let mut guard = self.active.write().await;
218        if let Some(existing) = guard.get(session_id).cloned() {
219            return Err(existing);
220        }
221        let now = now_ms();
222        let run = ActiveRun {
223            run_id,
224            started_at_ms: now,
225            last_activity_at_ms: now,
226            client_id,
227            agent_id,
228            agent_profile,
229        };
230        guard.insert(session_id.to_string(), run.clone());
231        Ok(run)
232    }
233
234    pub async fn touch(&self, session_id: &str, run_id: &str) {
235        let mut guard = self.active.write().await;
236        if let Some(run) = guard.get_mut(session_id) {
237            if run.run_id == run_id {
238                run.last_activity_at_ms = now_ms();
239            }
240        }
241    }
242
243    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
244        let mut guard = self.active.write().await;
245        if let Some(run) = guard.get(session_id) {
246            if run.run_id == run_id {
247                return guard.remove(session_id);
248            }
249        }
250        None
251    }
252
253    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
254        self.active.write().await.remove(session_id)
255    }
256
257    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
258        let now = now_ms();
259        let mut guard = self.active.write().await;
260        let stale_ids = guard
261            .iter()
262            .filter_map(|(session_id, run)| {
263                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
264                    Some(session_id.clone())
265                } else {
266                    None
267                }
268            })
269            .collect::<Vec<_>>();
270        let mut out = Vec::with_capacity(stale_ids.len());
271        for session_id in stale_ids {
272            if let Some(run) = guard.remove(&session_id) {
273                out.push((session_id, run));
274            }
275        }
276        out
277    }
278}
279
280pub fn now_ms() -> u64 {
281    SystemTime::now()
282        .duration_since(UNIX_EPOCH)
283        .map(|d| d.as_millis() as u64)
284        .unwrap_or(0)
285}
286
287pub fn build_id() -> String {
288    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
289        let trimmed = explicit.trim();
290        if !trimmed.is_empty() {
291            return trimmed.to_string();
292        }
293    }
294    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
295        let trimmed = git_sha.trim();
296        if !trimmed.is_empty() {
297            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
298        }
299    }
300    env!("CARGO_PKG_VERSION").to_string()
301}
302
303pub fn detect_host_runtime_context() -> HostRuntimeContext {
304    let os = if cfg!(target_os = "windows") {
305        HostOs::Windows
306    } else if cfg!(target_os = "macos") {
307        HostOs::Macos
308    } else {
309        HostOs::Linux
310    };
311    let (shell_family, path_style) = match os {
312        HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
313        HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
314    };
315    HostRuntimeContext {
316        os,
317        arch: std::env::consts::ARCH.to_string(),
318        shell_family,
319        path_style,
320    }
321}
322
323pub fn binary_path_for_health() -> Option<String> {
324    #[cfg(debug_assertions)]
325    {
326        std::env::current_exe()
327            .ok()
328            .map(|p| p.to_string_lossy().to_string())
329    }
330    #[cfg(not(debug_assertions))]
331    {
332        None
333    }
334}
335
336#[derive(Clone)]
337pub struct RuntimeState {
338    pub storage: Arc<Storage>,
339    pub config: ConfigStore,
340    pub event_bus: EventBus,
341    pub providers: ProviderRegistry,
342    pub plugins: PluginRegistry,
343    pub agents: AgentRegistry,
344    pub tools: ToolRegistry,
345    pub permissions: PermissionManager,
346    pub mcp: McpRegistry,
347    pub pty: PtyManager,
348    pub lsp: LspManager,
349    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
350    pub logs: Arc<RwLock<Vec<Value>>>,
351    pub workspace_index: WorkspaceIndex,
352    pub cancellations: CancellationRegistry,
353    pub engine_loop: EngineLoop,
354    pub host_runtime_context: HostRuntimeContext,
355    pub browser: BrowserSubsystem,
356}
357
358#[derive(Debug, Clone)]
359pub struct GovernedMemoryRecord {
360    pub id: String,
361    pub run_id: String,
362    pub partition: MemoryPartition,
363    pub kind: MemoryContentKind,
364    pub content: String,
365    pub artifact_refs: Vec<String>,
366    pub classification: MemoryClassification,
367    pub metadata: Option<Value>,
368    pub source_memory_id: Option<String>,
369    pub created_at_ms: u64,
370}
371
372#[derive(Debug, Clone, Serialize)]
373pub struct MemoryAuditEvent {
374    pub audit_id: String,
375    pub action: String,
376    pub run_id: String,
377    pub memory_id: Option<String>,
378    pub source_memory_id: Option<String>,
379    pub to_tier: Option<GovernedMemoryTier>,
380    pub partition_key: String,
381    pub actor: String,
382    pub status: String,
383    #[serde(skip_serializing_if = "Option::is_none")]
384    pub detail: Option<String>,
385    pub created_at_ms: u64,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct SharedResourceRecord {
390    pub key: String,
391    pub value: Value,
392    pub rev: u64,
393    pub updated_at_ms: u64,
394    pub updated_by: String,
395    #[serde(skip_serializing_if = "Option::is_none")]
396    pub ttl_ms: Option<u64>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineSchedule {
402    IntervalSeconds { seconds: u64 },
403    Cron { expression: String },
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
407#[serde(rename_all = "snake_case", tag = "type")]
408pub enum RoutineMisfirePolicy {
409    Skip,
410    RunOnce,
411    CatchUp { max_runs: u32 },
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
415#[serde(rename_all = "snake_case")]
416pub enum RoutineStatus {
417    Active,
418    Paused,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct RoutineSpec {
423    pub routine_id: String,
424    pub name: String,
425    pub status: RoutineStatus,
426    pub schedule: RoutineSchedule,
427    pub timezone: String,
428    pub misfire_policy: RoutineMisfirePolicy,
429    pub entrypoint: String,
430    #[serde(default)]
431    pub args: Value,
432    #[serde(default)]
433    pub allowed_tools: Vec<String>,
434    #[serde(default)]
435    pub output_targets: Vec<String>,
436    pub creator_type: String,
437    pub creator_id: String,
438    pub requires_approval: bool,
439    pub external_integrations_allowed: bool,
440    #[serde(default, skip_serializing_if = "Option::is_none")]
441    pub next_fire_at_ms: Option<u64>,
442    #[serde(default, skip_serializing_if = "Option::is_none")]
443    pub last_fired_at_ms: Option<u64>,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
447pub struct RoutineHistoryEvent {
448    pub routine_id: String,
449    pub trigger_type: String,
450    pub run_count: u32,
451    pub fired_at_ms: u64,
452    pub status: String,
453    #[serde(default, skip_serializing_if = "Option::is_none")]
454    pub detail: Option<String>,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(rename_all = "snake_case")]
459pub enum RoutineRunStatus {
460    Queued,
461    PendingApproval,
462    Running,
463    Paused,
464    BlockedPolicy,
465    Denied,
466    Completed,
467    Failed,
468    Cancelled,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub struct RoutineRunArtifact {
473    pub artifact_id: String,
474    pub uri: String,
475    pub kind: String,
476    #[serde(default, skip_serializing_if = "Option::is_none")]
477    pub label: Option<String>,
478    pub created_at_ms: u64,
479    #[serde(default, skip_serializing_if = "Option::is_none")]
480    pub metadata: Option<Value>,
481}
482
483#[derive(Debug, Clone, Serialize, Deserialize)]
484pub struct RoutineRunRecord {
485    pub run_id: String,
486    pub routine_id: String,
487    pub trigger_type: String,
488    pub run_count: u32,
489    pub status: RoutineRunStatus,
490    pub created_at_ms: u64,
491    pub updated_at_ms: u64,
492    #[serde(default, skip_serializing_if = "Option::is_none")]
493    pub fired_at_ms: Option<u64>,
494    #[serde(default, skip_serializing_if = "Option::is_none")]
495    pub started_at_ms: Option<u64>,
496    #[serde(default, skip_serializing_if = "Option::is_none")]
497    pub finished_at_ms: Option<u64>,
498    pub requires_approval: bool,
499    #[serde(default, skip_serializing_if = "Option::is_none")]
500    pub approval_reason: Option<String>,
501    #[serde(default, skip_serializing_if = "Option::is_none")]
502    pub denial_reason: Option<String>,
503    #[serde(default, skip_serializing_if = "Option::is_none")]
504    pub paused_reason: Option<String>,
505    #[serde(default, skip_serializing_if = "Option::is_none")]
506    pub detail: Option<String>,
507    pub entrypoint: String,
508    #[serde(default)]
509    pub args: Value,
510    #[serde(default)]
511    pub allowed_tools: Vec<String>,
512    #[serde(default)]
513    pub output_targets: Vec<String>,
514    #[serde(default)]
515    pub artifacts: Vec<RoutineRunArtifact>,
516    #[serde(default)]
517    pub active_session_ids: Vec<String>,
518    #[serde(default, skip_serializing_if = "Option::is_none")]
519    pub latest_session_id: Option<String>,
520    #[serde(default)]
521    pub prompt_tokens: u64,
522    #[serde(default)]
523    pub completion_tokens: u64,
524    #[serde(default)]
525    pub total_tokens: u64,
526    #[serde(default)]
527    pub estimated_cost_usd: f64,
528}
529
530#[derive(Debug, Clone)]
531pub struct RoutineSessionPolicy {
532    pub session_id: String,
533    pub run_id: String,
534    pub routine_id: String,
535    pub allowed_tools: Vec<String>,
536}
537
538#[derive(Debug, Clone, Serialize)]
539pub struct RoutineTriggerPlan {
540    pub routine_id: String,
541    pub run_count: u32,
542    pub scheduled_at_ms: u64,
543    pub next_fire_at_ms: u64,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
547#[serde(rename_all = "snake_case")]
548pub enum AutomationV2Status {
549    Active,
550    Paused,
551    Draft,
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
555#[serde(rename_all = "snake_case")]
556pub enum AutomationV2ScheduleType {
557    Cron,
558    Interval,
559    Manual,
560}
561
562#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
563pub struct AutomationV2Schedule {
564    #[serde(rename = "type")]
565    pub schedule_type: AutomationV2ScheduleType,
566    #[serde(default, skip_serializing_if = "Option::is_none")]
567    pub cron_expression: Option<String>,
568    #[serde(default, skip_serializing_if = "Option::is_none")]
569    pub interval_seconds: Option<u64>,
570    pub timezone: String,
571    pub misfire_policy: RoutineMisfirePolicy,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub struct AutomationAgentToolPolicy {
576    #[serde(default)]
577    pub allowlist: Vec<String>,
578    #[serde(default)]
579    pub denylist: Vec<String>,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct AutomationAgentMcpPolicy {
584    #[serde(default)]
585    pub allowed_servers: Vec<String>,
586    #[serde(default, skip_serializing_if = "Option::is_none")]
587    pub allowed_tools: Option<Vec<String>>,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize)]
591pub struct AutomationAgentProfile {
592    pub agent_id: String,
593    #[serde(default, skip_serializing_if = "Option::is_none")]
594    pub template_id: Option<String>,
595    pub display_name: String,
596    #[serde(default, skip_serializing_if = "Option::is_none")]
597    pub avatar_url: Option<String>,
598    #[serde(default, skip_serializing_if = "Option::is_none")]
599    pub model_policy: Option<Value>,
600    #[serde(default)]
601    pub skills: Vec<String>,
602    pub tool_policy: AutomationAgentToolPolicy,
603    pub mcp_policy: AutomationAgentMcpPolicy,
604    #[serde(default, skip_serializing_if = "Option::is_none")]
605    pub approval_policy: Option<String>,
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct AutomationFlowNode {
610    pub node_id: String,
611    pub agent_id: String,
612    pub objective: String,
613    #[serde(default)]
614    pub depends_on: Vec<String>,
615    #[serde(default)]
616    pub input_refs: Vec<AutomationFlowInputRef>,
617    #[serde(default, skip_serializing_if = "Option::is_none")]
618    pub output_contract: Option<AutomationFlowOutputContract>,
619    #[serde(default, skip_serializing_if = "Option::is_none")]
620    pub retry_policy: Option<Value>,
621    #[serde(default, skip_serializing_if = "Option::is_none")]
622    pub timeout_ms: Option<u64>,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct AutomationFlowInputRef {
627    pub from_step_id: String,
628    pub alias: String,
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct AutomationFlowOutputContract {
633    pub kind: String,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct AutomationFlowSpec {
638    #[serde(default)]
639    pub nodes: Vec<AutomationFlowNode>,
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize)]
643pub struct AutomationExecutionPolicy {
644    #[serde(default, skip_serializing_if = "Option::is_none")]
645    pub max_parallel_agents: Option<u32>,
646    #[serde(default, skip_serializing_if = "Option::is_none")]
647    pub max_total_runtime_ms: Option<u64>,
648    #[serde(default, skip_serializing_if = "Option::is_none")]
649    pub max_total_tool_calls: Option<u32>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct AutomationV2Spec {
654    pub automation_id: String,
655    pub name: String,
656    #[serde(default, skip_serializing_if = "Option::is_none")]
657    pub description: Option<String>,
658    pub status: AutomationV2Status,
659    pub schedule: AutomationV2Schedule,
660    #[serde(default)]
661    pub agents: Vec<AutomationAgentProfile>,
662    pub flow: AutomationFlowSpec,
663    pub execution: AutomationExecutionPolicy,
664    #[serde(default)]
665    pub output_targets: Vec<String>,
666    pub created_at_ms: u64,
667    pub updated_at_ms: u64,
668    pub creator_id: String,
669    #[serde(default, skip_serializing_if = "Option::is_none")]
670    pub workspace_root: Option<String>,
671    #[serde(default, skip_serializing_if = "Option::is_none")]
672    pub metadata: Option<Value>,
673    #[serde(default, skip_serializing_if = "Option::is_none")]
674    pub next_fire_at_ms: Option<u64>,
675    #[serde(default, skip_serializing_if = "Option::is_none")]
676    pub last_fired_at_ms: Option<u64>,
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct WorkflowPlanStep {
681    pub step_id: String,
682    pub kind: String,
683    pub objective: String,
684    #[serde(default)]
685    pub depends_on: Vec<String>,
686    pub agent_role: String,
687    #[serde(default)]
688    pub input_refs: Vec<AutomationFlowInputRef>,
689    #[serde(default, skip_serializing_if = "Option::is_none")]
690    pub output_contract: Option<AutomationFlowOutputContract>,
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct WorkflowPlan {
695    pub plan_id: String,
696    pub planner_version: String,
697    pub plan_source: String,
698    pub original_prompt: String,
699    pub normalized_prompt: String,
700    pub confidence: String,
701    pub title: String,
702    #[serde(default, skip_serializing_if = "Option::is_none")]
703    pub description: Option<String>,
704    pub schedule: AutomationV2Schedule,
705    pub execution_target: String,
706    pub workspace_root: String,
707    #[serde(default)]
708    pub steps: Vec<WorkflowPlanStep>,
709    #[serde(default)]
710    pub requires_integrations: Vec<String>,
711    #[serde(default)]
712    pub allowed_mcp_servers: Vec<String>,
713    #[serde(default, skip_serializing_if = "Option::is_none")]
714    pub operator_preferences: Option<Value>,
715    pub save_options: Value,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct WorkflowPlanChatMessage {
720    pub role: String,
721    pub text: String,
722    pub created_at_ms: u64,
723}
724
725#[derive(Debug, Clone, Serialize, Deserialize)]
726pub struct WorkflowPlanConversation {
727    pub conversation_id: String,
728    pub plan_id: String,
729    pub created_at_ms: u64,
730    pub updated_at_ms: u64,
731    #[serde(default)]
732    pub messages: Vec<WorkflowPlanChatMessage>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736pub struct WorkflowPlanDraftRecord {
737    pub initial_plan: WorkflowPlan,
738    pub current_plan: WorkflowPlan,
739    pub conversation: WorkflowPlanConversation,
740    #[serde(default, skip_serializing_if = "Option::is_none")]
741    pub planner_diagnostics: Option<Value>,
742}
743
744#[derive(Debug, Clone, Serialize, Deserialize)]
745pub struct AutomationNodeOutput {
746    pub contract_kind: String,
747    pub summary: String,
748    pub content: Value,
749    pub created_at_ms: u64,
750    pub node_id: String,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
754#[serde(rename_all = "snake_case")]
755pub enum AutomationRunStatus {
756    Queued,
757    Running,
758    Pausing,
759    Paused,
760    Completed,
761    Failed,
762    Cancelled,
763}
764
765#[derive(Debug, Clone, Serialize, Deserialize)]
766pub struct AutomationRunCheckpoint {
767    #[serde(default)]
768    pub completed_nodes: Vec<String>,
769    #[serde(default)]
770    pub pending_nodes: Vec<String>,
771    #[serde(default)]
772    pub node_outputs: std::collections::HashMap<String, Value>,
773    #[serde(default)]
774    pub node_attempts: std::collections::HashMap<String, u32>,
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
778pub struct AutomationV2RunRecord {
779    pub run_id: String,
780    pub automation_id: String,
781    pub trigger_type: String,
782    pub status: AutomationRunStatus,
783    pub created_at_ms: u64,
784    pub updated_at_ms: u64,
785    #[serde(default, skip_serializing_if = "Option::is_none")]
786    pub started_at_ms: Option<u64>,
787    #[serde(default, skip_serializing_if = "Option::is_none")]
788    pub finished_at_ms: Option<u64>,
789    #[serde(default)]
790    pub active_session_ids: Vec<String>,
791    #[serde(default)]
792    pub active_instance_ids: Vec<String>,
793    pub checkpoint: AutomationRunCheckpoint,
794    #[serde(default, skip_serializing_if = "Option::is_none")]
795    pub automation_snapshot: Option<AutomationV2Spec>,
796    #[serde(default, skip_serializing_if = "Option::is_none")]
797    pub pause_reason: Option<String>,
798    #[serde(default, skip_serializing_if = "Option::is_none")]
799    pub resume_reason: Option<String>,
800    #[serde(default, skip_serializing_if = "Option::is_none")]
801    pub detail: Option<String>,
802    #[serde(default)]
803    pub prompt_tokens: u64,
804    #[serde(default)]
805    pub completion_tokens: u64,
806    #[serde(default)]
807    pub total_tokens: u64,
808    #[serde(default)]
809    pub estimated_cost_usd: f64,
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
813#[serde(rename_all = "snake_case")]
814pub enum BugMonitorProviderPreference {
815    Auto,
816    OfficialGithub,
817    Composio,
818    Arcade,
819}
820
821#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
822#[serde(rename_all = "snake_case")]
823pub enum BugMonitorLabelMode {
824    ReporterOnly,
825}
826
827impl Default for BugMonitorLabelMode {
828    fn default() -> Self {
829        Self::ReporterOnly
830    }
831}
832
833impl Default for BugMonitorProviderPreference {
834    fn default() -> Self {
835        Self::Auto
836    }
837}
838
839#[derive(Debug, Clone, Serialize, Deserialize)]
840pub struct BugMonitorConfig {
841    #[serde(default)]
842    pub enabled: bool,
843    #[serde(default)]
844    pub paused: bool,
845    #[serde(default, skip_serializing_if = "Option::is_none")]
846    pub workspace_root: Option<String>,
847    #[serde(default, skip_serializing_if = "Option::is_none")]
848    pub repo: Option<String>,
849    #[serde(default, skip_serializing_if = "Option::is_none")]
850    pub mcp_server: Option<String>,
851    #[serde(default)]
852    pub provider_preference: BugMonitorProviderPreference,
853    #[serde(default, skip_serializing_if = "Option::is_none")]
854    pub model_policy: Option<Value>,
855    #[serde(default = "default_true")]
856    pub auto_create_new_issues: bool,
857    #[serde(default)]
858    pub require_approval_for_new_issues: bool,
859    #[serde(default = "default_true")]
860    pub auto_comment_on_matched_open_issues: bool,
861    #[serde(default)]
862    pub label_mode: BugMonitorLabelMode,
863    #[serde(default)]
864    pub updated_at_ms: u64,
865}
866
867impl Default for BugMonitorConfig {
868    fn default() -> Self {
869        Self {
870            enabled: false,
871            paused: false,
872            workspace_root: None,
873            repo: None,
874            mcp_server: None,
875            provider_preference: BugMonitorProviderPreference::Auto,
876            model_policy: None,
877            auto_create_new_issues: true,
878            require_approval_for_new_issues: false,
879            auto_comment_on_matched_open_issues: true,
880            label_mode: BugMonitorLabelMode::ReporterOnly,
881            updated_at_ms: 0,
882        }
883    }
884}
885
886#[derive(Debug, Clone, Serialize, Deserialize, Default)]
887pub struct BugMonitorDraftRecord {
888    pub draft_id: String,
889    pub fingerprint: String,
890    pub repo: String,
891    pub status: String,
892    pub created_at_ms: u64,
893    #[serde(default, skip_serializing_if = "Option::is_none")]
894    pub triage_run_id: Option<String>,
895    #[serde(default, skip_serializing_if = "Option::is_none")]
896    pub issue_number: Option<u64>,
897    #[serde(default, skip_serializing_if = "Option::is_none")]
898    pub title: Option<String>,
899    #[serde(default, skip_serializing_if = "Option::is_none")]
900    pub detail: Option<String>,
901    #[serde(default, skip_serializing_if = "Option::is_none")]
902    pub github_status: Option<String>,
903    #[serde(default, skip_serializing_if = "Option::is_none")]
904    pub github_issue_url: Option<String>,
905    #[serde(default, skip_serializing_if = "Option::is_none")]
906    pub github_comment_url: Option<String>,
907    #[serde(default, skip_serializing_if = "Option::is_none")]
908    pub github_posted_at_ms: Option<u64>,
909    #[serde(default, skip_serializing_if = "Option::is_none")]
910    pub matched_issue_number: Option<u64>,
911    #[serde(default, skip_serializing_if = "Option::is_none")]
912    pub matched_issue_state: Option<String>,
913    #[serde(default, skip_serializing_if = "Option::is_none")]
914    pub evidence_digest: Option<String>,
915    #[serde(default, skip_serializing_if = "Option::is_none")]
916    pub last_post_error: Option<String>,
917}
918
919#[derive(Debug, Clone, Serialize, Deserialize, Default)]
920pub struct BugMonitorPostRecord {
921    pub post_id: String,
922    pub draft_id: String,
923    #[serde(default, skip_serializing_if = "Option::is_none")]
924    pub incident_id: Option<String>,
925    pub fingerprint: String,
926    pub repo: String,
927    pub operation: String,
928    pub status: String,
929    #[serde(default, skip_serializing_if = "Option::is_none")]
930    pub issue_number: Option<u64>,
931    #[serde(default, skip_serializing_if = "Option::is_none")]
932    pub issue_url: Option<String>,
933    #[serde(default, skip_serializing_if = "Option::is_none")]
934    pub comment_id: Option<String>,
935    #[serde(default, skip_serializing_if = "Option::is_none")]
936    pub comment_url: Option<String>,
937    #[serde(default, skip_serializing_if = "Option::is_none")]
938    pub evidence_digest: Option<String>,
939    pub idempotency_key: String,
940    #[serde(default, skip_serializing_if = "Option::is_none")]
941    pub response_excerpt: Option<String>,
942    #[serde(default, skip_serializing_if = "Option::is_none")]
943    pub error: Option<String>,
944    pub created_at_ms: u64,
945    pub updated_at_ms: u64,
946}
947
948#[derive(Debug, Clone, Serialize, Deserialize, Default)]
949pub struct BugMonitorIncidentRecord {
950    pub incident_id: String,
951    pub fingerprint: String,
952    pub event_type: String,
953    pub status: String,
954    pub repo: String,
955    pub workspace_root: String,
956    pub title: String,
957    #[serde(default, skip_serializing_if = "Option::is_none")]
958    pub detail: Option<String>,
959    #[serde(default)]
960    pub excerpt: Vec<String>,
961    #[serde(default, skip_serializing_if = "Option::is_none")]
962    pub source: Option<String>,
963    #[serde(default, skip_serializing_if = "Option::is_none")]
964    pub run_id: Option<String>,
965    #[serde(default, skip_serializing_if = "Option::is_none")]
966    pub session_id: Option<String>,
967    #[serde(default, skip_serializing_if = "Option::is_none")]
968    pub correlation_id: Option<String>,
969    #[serde(default, skip_serializing_if = "Option::is_none")]
970    pub component: Option<String>,
971    #[serde(default, skip_serializing_if = "Option::is_none")]
972    pub level: Option<String>,
973    #[serde(default)]
974    pub occurrence_count: u64,
975    pub created_at_ms: u64,
976    pub updated_at_ms: u64,
977    #[serde(default, skip_serializing_if = "Option::is_none")]
978    pub last_seen_at_ms: Option<u64>,
979    #[serde(default, skip_serializing_if = "Option::is_none")]
980    pub draft_id: Option<String>,
981    #[serde(default, skip_serializing_if = "Option::is_none")]
982    pub triage_run_id: Option<String>,
983    #[serde(default, skip_serializing_if = "Option::is_none")]
984    pub last_error: Option<String>,
985    #[serde(default, skip_serializing_if = "Option::is_none")]
986    pub duplicate_summary: Option<Value>,
987    #[serde(default, skip_serializing_if = "Option::is_none")]
988    pub duplicate_matches: Option<Vec<Value>>,
989    #[serde(default, skip_serializing_if = "Option::is_none")]
990    pub event_payload: Option<Value>,
991}
992
993#[derive(Debug, Clone, Serialize, Deserialize, Default)]
994pub struct BugMonitorRuntimeStatus {
995    #[serde(default)]
996    pub monitoring_active: bool,
997    #[serde(default)]
998    pub paused: bool,
999    #[serde(default)]
1000    pub pending_incidents: usize,
1001    #[serde(default)]
1002    pub total_incidents: usize,
1003    #[serde(default, skip_serializing_if = "Option::is_none")]
1004    pub last_processed_at_ms: Option<u64>,
1005    #[serde(default, skip_serializing_if = "Option::is_none")]
1006    pub last_incident_event_type: Option<String>,
1007    #[serde(default, skip_serializing_if = "Option::is_none")]
1008    pub last_runtime_error: Option<String>,
1009    #[serde(default, skip_serializing_if = "Option::is_none")]
1010    pub last_post_result: Option<String>,
1011    #[serde(default)]
1012    pub pending_posts: usize,
1013}
1014
1015#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1016pub struct BugMonitorSubmission {
1017    #[serde(default, skip_serializing_if = "Option::is_none")]
1018    pub repo: Option<String>,
1019    #[serde(default, skip_serializing_if = "Option::is_none")]
1020    pub title: Option<String>,
1021    #[serde(default, skip_serializing_if = "Option::is_none")]
1022    pub detail: Option<String>,
1023    #[serde(default, skip_serializing_if = "Option::is_none")]
1024    pub source: Option<String>,
1025    #[serde(default, skip_serializing_if = "Option::is_none")]
1026    pub run_id: Option<String>,
1027    #[serde(default, skip_serializing_if = "Option::is_none")]
1028    pub session_id: Option<String>,
1029    #[serde(default, skip_serializing_if = "Option::is_none")]
1030    pub correlation_id: Option<String>,
1031    #[serde(default, skip_serializing_if = "Option::is_none")]
1032    pub file_name: Option<String>,
1033    #[serde(default, skip_serializing_if = "Option::is_none")]
1034    pub process: Option<String>,
1035    #[serde(default, skip_serializing_if = "Option::is_none")]
1036    pub component: Option<String>,
1037    #[serde(default, skip_serializing_if = "Option::is_none")]
1038    pub event: Option<String>,
1039    #[serde(default, skip_serializing_if = "Option::is_none")]
1040    pub level: Option<String>,
1041    #[serde(default)]
1042    pub excerpt: Vec<String>,
1043    #[serde(default, skip_serializing_if = "Option::is_none")]
1044    pub fingerprint: Option<String>,
1045}
1046
1047#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1048pub struct BugMonitorCapabilityReadiness {
1049    #[serde(default)]
1050    pub github_list_issues: bool,
1051    #[serde(default)]
1052    pub github_get_issue: bool,
1053    #[serde(default)]
1054    pub github_create_issue: bool,
1055    #[serde(default)]
1056    pub github_comment_on_issue: bool,
1057}
1058
1059#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1060pub struct BugMonitorCapabilityMatch {
1061    pub capability_id: String,
1062    pub provider: String,
1063    pub tool_name: String,
1064    pub binding_index: usize,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1068pub struct BugMonitorBindingCandidate {
1069    pub capability_id: String,
1070    pub binding_tool_name: String,
1071    #[serde(default)]
1072    pub aliases: Vec<String>,
1073    #[serde(default)]
1074    pub matched: bool,
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1078pub struct BugMonitorReadiness {
1079    #[serde(default)]
1080    pub config_valid: bool,
1081    #[serde(default)]
1082    pub repo_valid: bool,
1083    #[serde(default)]
1084    pub mcp_server_present: bool,
1085    #[serde(default)]
1086    pub mcp_connected: bool,
1087    #[serde(default)]
1088    pub github_read_ready: bool,
1089    #[serde(default)]
1090    pub github_write_ready: bool,
1091    #[serde(default)]
1092    pub selected_model_ready: bool,
1093    #[serde(default)]
1094    pub ingest_ready: bool,
1095    #[serde(default)]
1096    pub publish_ready: bool,
1097    #[serde(default)]
1098    pub runtime_ready: bool,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1102pub struct BugMonitorStatus {
1103    pub config: BugMonitorConfig,
1104    pub readiness: BugMonitorReadiness,
1105    #[serde(default)]
1106    pub runtime: BugMonitorRuntimeStatus,
1107    pub required_capabilities: BugMonitorCapabilityReadiness,
1108    #[serde(default)]
1109    pub missing_required_capabilities: Vec<String>,
1110    #[serde(default)]
1111    pub resolved_capabilities: Vec<BugMonitorCapabilityMatch>,
1112    #[serde(default)]
1113    pub discovered_mcp_tools: Vec<String>,
1114    #[serde(default)]
1115    pub selected_server_binding_candidates: Vec<BugMonitorBindingCandidate>,
1116    #[serde(default, skip_serializing_if = "Option::is_none")]
1117    pub binding_source_version: Option<String>,
1118    #[serde(default, skip_serializing_if = "Option::is_none")]
1119    pub bindings_last_merged_at_ms: Option<u64>,
1120    #[serde(default, skip_serializing_if = "Option::is_none")]
1121    pub selected_model: Option<ModelSpec>,
1122    #[serde(default)]
1123    pub pending_drafts: usize,
1124    #[serde(default)]
1125    pub pending_posts: usize,
1126    #[serde(default, skip_serializing_if = "Option::is_none")]
1127    pub last_activity_at_ms: Option<u64>,
1128    #[serde(default, skip_serializing_if = "Option::is_none")]
1129    pub last_error: Option<String>,
1130}
1131
1132#[derive(Debug, Clone, Serialize)]
1133pub struct ResourceConflict {
1134    pub key: String,
1135    pub expected_rev: Option<u64>,
1136    pub current_rev: Option<u64>,
1137}
1138
1139#[derive(Debug, Clone, Serialize)]
1140#[serde(tag = "type", rename_all = "snake_case")]
1141pub enum ResourceStoreError {
1142    InvalidKey { key: String },
1143    RevisionConflict(ResourceConflict),
1144    PersistFailed { message: String },
1145}
1146
1147#[derive(Debug, Clone, Serialize)]
1148#[serde(tag = "type", rename_all = "snake_case")]
1149pub enum RoutineStoreError {
1150    InvalidRoutineId { routine_id: String },
1151    InvalidSchedule { detail: String },
1152    PersistFailed { message: String },
1153}
1154
1155#[derive(Debug, Clone)]
1156pub enum StartupStatus {
1157    Starting,
1158    Ready,
1159    Failed,
1160}
1161
1162#[derive(Debug, Clone)]
1163pub struct StartupState {
1164    pub status: StartupStatus,
1165    pub phase: String,
1166    pub started_at_ms: u64,
1167    pub attempt_id: String,
1168    pub last_error: Option<String>,
1169}
1170
1171#[derive(Debug, Clone)]
1172pub struct StartupSnapshot {
1173    pub status: StartupStatus,
1174    pub phase: String,
1175    pub started_at_ms: u64,
1176    pub attempt_id: String,
1177    pub last_error: Option<String>,
1178    pub elapsed_ms: u64,
1179}
1180
1181#[derive(Clone)]
1182pub struct AppState {
1183    pub runtime: Arc<OnceLock<RuntimeState>>,
1184    pub startup: Arc<RwLock<StartupState>>,
1185    pub in_process_mode: Arc<AtomicBool>,
1186    pub api_token: Arc<RwLock<Option<String>>>,
1187    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
1188    pub run_registry: RunRegistry,
1189    pub run_stale_ms: u64,
1190    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
1191    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
1192    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
1193    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
1194    pub shared_resources_path: PathBuf,
1195    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
1196    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
1197    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
1198    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
1199    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
1200    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
1201    pub workflow_plan_drafts:
1202        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
1203    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
1204    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
1205    pub bug_monitor_incidents:
1206        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
1207    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
1208    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
1209    pub workflows: Arc<RwLock<WorkflowRegistry>>,
1210    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
1211    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
1212    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
1213    pub routine_session_policies:
1214        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
1215    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
1216    pub token_cost_per_1k_usd: f64,
1217    pub routines_path: PathBuf,
1218    pub routine_history_path: PathBuf,
1219    pub routine_runs_path: PathBuf,
1220    pub automations_v2_path: PathBuf,
1221    pub automation_v2_runs_path: PathBuf,
1222    pub bug_monitor_config_path: PathBuf,
1223    pub bug_monitor_drafts_path: PathBuf,
1224    pub bug_monitor_incidents_path: PathBuf,
1225    pub bug_monitor_posts_path: PathBuf,
1226    pub workflow_runs_path: PathBuf,
1227    pub workflow_hook_overrides_path: PathBuf,
1228    pub agent_teams: AgentTeamRuntime,
1229    pub web_ui_enabled: Arc<AtomicBool>,
1230    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
1231    pub server_base_url: Arc<std::sync::RwLock<String>>,
1232    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
1233    pub host_runtime_context: HostRuntimeContext,
1234    pub pack_manager: Arc<PackManager>,
1235    pub capability_resolver: Arc<CapabilityResolver>,
1236    pub preset_registry: Arc<PresetRegistry>,
1237}
1238
1239#[derive(Debug, Clone)]
1240struct StatusIndexUpdate {
1241    key: String,
1242    value: Value,
1243}
1244
1245impl AppState {
1246    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
1247        Self {
1248            runtime: Arc::new(OnceLock::new()),
1249            startup: Arc::new(RwLock::new(StartupState {
1250                status: StartupStatus::Starting,
1251                phase: "boot".to_string(),
1252                started_at_ms: now_ms(),
1253                attempt_id,
1254                last_error: None,
1255            })),
1256            in_process_mode: Arc::new(AtomicBool::new(in_process)),
1257            api_token: Arc::new(RwLock::new(None)),
1258            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
1259            run_registry: RunRegistry::new(),
1260            run_stale_ms: resolve_run_stale_ms(),
1261            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
1262            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
1263            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
1264            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
1265            shared_resources_path: resolve_shared_resources_path(),
1266            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
1267            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
1268            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1269            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
1270            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1271            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
1272            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1273            bug_monitor_config: Arc::new(RwLock::new(resolve_bug_monitor_env_config())),
1274            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1275            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
1276            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1277            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
1278            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
1279            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1280            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
1281            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
1282            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
1283            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1284            routines_path: resolve_routines_path(),
1285            routine_history_path: resolve_routine_history_path(),
1286            routine_runs_path: resolve_routine_runs_path(),
1287            automations_v2_path: resolve_automations_v2_path(),
1288            automation_v2_runs_path: resolve_automation_v2_runs_path(),
1289            bug_monitor_config_path: resolve_bug_monitor_config_path(),
1290            bug_monitor_drafts_path: resolve_bug_monitor_drafts_path(),
1291            bug_monitor_incidents_path: resolve_bug_monitor_incidents_path(),
1292            bug_monitor_posts_path: resolve_bug_monitor_posts_path(),
1293            workflow_runs_path: resolve_workflow_runs_path(),
1294            workflow_hook_overrides_path: resolve_workflow_hook_overrides_path(),
1295            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
1296            web_ui_enabled: Arc::new(AtomicBool::new(false)),
1297            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
1298            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
1299            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
1300            host_runtime_context: detect_host_runtime_context(),
1301            token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
1302            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
1303            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
1304            preset_registry: Arc::new(PresetRegistry::new(
1305                PackManager::default_root(),
1306                resolve_shared_paths()
1307                    .map(|paths| paths.canonical_root)
1308                    .unwrap_or_else(|_| {
1309                        dirs::home_dir()
1310                            .unwrap_or_else(|| PathBuf::from("."))
1311                            .join(".tandem")
1312                    }),
1313            )),
1314        }
1315    }
1316
1317    pub fn is_ready(&self) -> bool {
1318        self.runtime.get().is_some()
1319    }
1320
1321    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
1322        for _ in 0..attempts {
1323            if self.is_ready() {
1324                return true;
1325            }
1326            let startup = self.startup_snapshot().await;
1327            if matches!(startup.status, StartupStatus::Failed) {
1328                return false;
1329            }
1330            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
1331        }
1332        self.is_ready()
1333    }
1334
1335    pub fn mode_label(&self) -> &'static str {
1336        if self.in_process_mode.load(Ordering::Relaxed) {
1337            "in-process"
1338        } else {
1339            "sidecar"
1340        }
1341    }
1342
1343    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
1344        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
1345        if let Ok(mut guard) = self.web_ui_prefix.write() {
1346            *guard = normalize_web_ui_prefix(&prefix);
1347        }
1348    }
1349
1350    pub fn web_ui_enabled(&self) -> bool {
1351        self.web_ui_enabled.load(Ordering::Relaxed)
1352    }
1353
1354    pub fn web_ui_prefix(&self) -> String {
1355        self.web_ui_prefix
1356            .read()
1357            .map(|v| v.clone())
1358            .unwrap_or_else(|_| "/admin".to_string())
1359    }
1360
1361    pub fn set_server_base_url(&self, base_url: String) {
1362        if let Ok(mut guard) = self.server_base_url.write() {
1363            *guard = base_url;
1364        }
1365    }
1366
1367    pub fn server_base_url(&self) -> String {
1368        self.server_base_url
1369            .read()
1370            .map(|v| v.clone())
1371            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
1372    }
1373
1374    pub async fn api_token(&self) -> Option<String> {
1375        self.api_token.read().await.clone()
1376    }
1377
1378    pub async fn set_api_token(&self, token: Option<String>) {
1379        *self.api_token.write().await = token;
1380    }
1381
1382    pub async fn startup_snapshot(&self) -> StartupSnapshot {
1383        let state = self.startup.read().await.clone();
1384        StartupSnapshot {
1385            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
1386            status: state.status,
1387            phase: state.phase,
1388            started_at_ms: state.started_at_ms,
1389            attempt_id: state.attempt_id,
1390            last_error: state.last_error,
1391        }
1392    }
1393
1394    pub fn host_runtime_context(&self) -> HostRuntimeContext {
1395        self.runtime
1396            .get()
1397            .map(|runtime| runtime.host_runtime_context.clone())
1398            .unwrap_or_else(|| self.host_runtime_context.clone())
1399    }
1400
1401    pub async fn set_phase(&self, phase: impl Into<String>) {
1402        let mut startup = self.startup.write().await;
1403        startup.phase = phase.into();
1404    }
1405
1406    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
1407        self.runtime
1408            .set(runtime)
1409            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
1410        self.register_browser_tools().await?;
1411        self.tools
1412            .register_tool(
1413                "pack_builder".to_string(),
1414                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
1415            )
1416            .await;
1417        self.engine_loop
1418            .set_spawn_agent_hook(std::sync::Arc::new(
1419                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
1420            ))
1421            .await;
1422        self.engine_loop
1423            .set_tool_policy_hook(std::sync::Arc::new(
1424                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
1425            ))
1426            .await;
1427        self.engine_loop
1428            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
1429                self.clone(),
1430            )))
1431            .await;
1432        let _ = self.load_shared_resources().await;
1433        self.load_routines().await?;
1434        let _ = self.load_routine_history().await;
1435        let _ = self.load_routine_runs().await;
1436        self.load_automations_v2().await?;
1437        let _ = self.load_automation_v2_runs().await;
1438        let _ = self.load_bug_monitor_config().await;
1439        let _ = self.load_bug_monitor_drafts().await;
1440        let _ = self.load_bug_monitor_incidents().await;
1441        let _ = self.load_bug_monitor_posts().await;
1442        let _ = self.load_workflow_runs().await;
1443        let _ = self.load_workflow_hook_overrides().await;
1444        let _ = self.reload_workflows().await;
1445        let workspace_root = self.workspace_index.snapshot().await.root;
1446        let _ = self
1447            .agent_teams
1448            .ensure_loaded_for_workspace(&workspace_root)
1449            .await;
1450        let mut startup = self.startup.write().await;
1451        startup.status = StartupStatus::Ready;
1452        startup.phase = "ready".to_string();
1453        startup.last_error = None;
1454        Ok(())
1455    }
1456
1457    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
1458        let mut startup = self.startup.write().await;
1459        startup.status = StartupStatus::Failed;
1460        startup.phase = phase.into();
1461        startup.last_error = Some(error.into());
1462    }
1463
1464    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
1465        let runtime = self.channels_runtime.lock().await;
1466        runtime.statuses.clone()
1467    }
1468
1469    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
1470        let effective = self.config.get_effective_value().await;
1471        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
1472        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
1473
1474        let mut runtime = self.channels_runtime.lock().await;
1475        if let Some(listeners) = runtime.listeners.as_mut() {
1476            listeners.abort_all();
1477        }
1478        runtime.listeners = None;
1479        runtime.statuses.clear();
1480
1481        let mut status_map = std::collections::HashMap::new();
1482        status_map.insert(
1483            "telegram".to_string(),
1484            ChannelStatus {
1485                enabled: parsed.channels.telegram.is_some(),
1486                connected: false,
1487                last_error: None,
1488                active_sessions: 0,
1489                meta: serde_json::json!({}),
1490            },
1491        );
1492        status_map.insert(
1493            "discord".to_string(),
1494            ChannelStatus {
1495                enabled: parsed.channels.discord.is_some(),
1496                connected: false,
1497                last_error: None,
1498                active_sessions: 0,
1499                meta: serde_json::json!({}),
1500            },
1501        );
1502        status_map.insert(
1503            "slack".to_string(),
1504            ChannelStatus {
1505                enabled: parsed.channels.slack.is_some(),
1506                connected: false,
1507                last_error: None,
1508                active_sessions: 0,
1509                meta: serde_json::json!({}),
1510            },
1511        );
1512
1513        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
1514            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
1515            runtime.listeners = Some(listeners);
1516            for status in status_map.values_mut() {
1517                if status.enabled {
1518                    status.connected = true;
1519                }
1520            }
1521        }
1522
1523        runtime.statuses = status_map.clone();
1524        drop(runtime);
1525
1526        self.event_bus.publish(EngineEvent::new(
1527            "channel.status.changed",
1528            serde_json::json!({ "channels": status_map }),
1529        ));
1530        Ok(())
1531    }
1532
1533    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
1534        if !self.shared_resources_path.exists() {
1535            return Ok(());
1536        }
1537        let raw = fs::read_to_string(&self.shared_resources_path).await?;
1538        let parsed =
1539            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
1540                .unwrap_or_default();
1541        let mut guard = self.shared_resources.write().await;
1542        *guard = parsed;
1543        Ok(())
1544    }
1545
1546    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1547        if let Some(parent) = self.shared_resources_path.parent() {
1548            fs::create_dir_all(parent).await?;
1549        }
1550        let payload = {
1551            let guard = self.shared_resources.read().await;
1552            serde_json::to_string_pretty(&*guard)?
1553        };
1554        fs::write(&self.shared_resources_path, payload).await?;
1555        Ok(())
1556    }
1557
1558    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1559        self.shared_resources.read().await.get(key).cloned()
1560    }
1561
1562    pub async fn list_shared_resources(
1563        &self,
1564        prefix: Option<&str>,
1565        limit: usize,
1566    ) -> Vec<SharedResourceRecord> {
1567        let limit = limit.clamp(1, 500);
1568        let mut rows = self
1569            .shared_resources
1570            .read()
1571            .await
1572            .values()
1573            .filter(|record| {
1574                if let Some(prefix) = prefix {
1575                    record.key.starts_with(prefix)
1576                } else {
1577                    true
1578                }
1579            })
1580            .cloned()
1581            .collect::<Vec<_>>();
1582        rows.sort_by(|a, b| a.key.cmp(&b.key));
1583        rows.truncate(limit);
1584        rows
1585    }
1586
1587    pub async fn put_shared_resource(
1588        &self,
1589        key: String,
1590        value: Value,
1591        if_match_rev: Option<u64>,
1592        updated_by: String,
1593        ttl_ms: Option<u64>,
1594    ) -> Result<SharedResourceRecord, ResourceStoreError> {
1595        if !is_valid_resource_key(&key) {
1596            return Err(ResourceStoreError::InvalidKey { key });
1597        }
1598
1599        let now = now_ms();
1600        let mut guard = self.shared_resources.write().await;
1601        let existing = guard.get(&key).cloned();
1602
1603        if let Some(expected) = if_match_rev {
1604            let current = existing.as_ref().map(|row| row.rev);
1605            if current != Some(expected) {
1606                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1607                    key,
1608                    expected_rev: Some(expected),
1609                    current_rev: current,
1610                }));
1611            }
1612        }
1613
1614        let next_rev = existing
1615            .as_ref()
1616            .map(|row| row.rev.saturating_add(1))
1617            .unwrap_or(1);
1618
1619        let record = SharedResourceRecord {
1620            key: key.clone(),
1621            value,
1622            rev: next_rev,
1623            updated_at_ms: now,
1624            updated_by,
1625            ttl_ms,
1626        };
1627
1628        let previous = guard.insert(key.clone(), record.clone());
1629        drop(guard);
1630
1631        if let Err(error) = self.persist_shared_resources().await {
1632            let mut rollback = self.shared_resources.write().await;
1633            if let Some(previous) = previous {
1634                rollback.insert(key, previous);
1635            } else {
1636                rollback.remove(&key);
1637            }
1638            return Err(ResourceStoreError::PersistFailed {
1639                message: error.to_string(),
1640            });
1641        }
1642
1643        Ok(record)
1644    }
1645
1646    pub async fn delete_shared_resource(
1647        &self,
1648        key: &str,
1649        if_match_rev: Option<u64>,
1650    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1651        if !is_valid_resource_key(key) {
1652            return Err(ResourceStoreError::InvalidKey {
1653                key: key.to_string(),
1654            });
1655        }
1656
1657        let mut guard = self.shared_resources.write().await;
1658        let current = guard.get(key).cloned();
1659        if let Some(expected) = if_match_rev {
1660            let current_rev = current.as_ref().map(|row| row.rev);
1661            if current_rev != Some(expected) {
1662                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1663                    key: key.to_string(),
1664                    expected_rev: Some(expected),
1665                    current_rev,
1666                }));
1667            }
1668        }
1669
1670        let removed = guard.remove(key);
1671        drop(guard);
1672
1673        if let Err(error) = self.persist_shared_resources().await {
1674            if let Some(record) = removed.clone() {
1675                self.shared_resources
1676                    .write()
1677                    .await
1678                    .insert(record.key.clone(), record);
1679            }
1680            return Err(ResourceStoreError::PersistFailed {
1681                message: error.to_string(),
1682            });
1683        }
1684
1685        Ok(removed)
1686    }
1687
1688    pub async fn load_routines(&self) -> anyhow::Result<()> {
1689        if !self.routines_path.exists() {
1690            return Ok(());
1691        }
1692        let raw = fs::read_to_string(&self.routines_path).await?;
1693        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
1694            Ok(parsed) => {
1695                let mut guard = self.routines.write().await;
1696                *guard = parsed;
1697                Ok(())
1698            }
1699            Err(primary_err) => {
1700                let backup_path = sibling_backup_path(&self.routines_path);
1701                if backup_path.exists() {
1702                    let backup_raw = fs::read_to_string(&backup_path).await?;
1703                    if let Ok(parsed_backup) = serde_json::from_str::<
1704                        std::collections::HashMap<String, RoutineSpec>,
1705                    >(&backup_raw)
1706                    {
1707                        let mut guard = self.routines.write().await;
1708                        *guard = parsed_backup;
1709                        return Ok(());
1710                    }
1711                }
1712                Err(anyhow::anyhow!(
1713                    "failed to parse routines store {}: {primary_err}",
1714                    self.routines_path.display()
1715                ))
1716            }
1717        }
1718    }
1719
1720    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1721        if !self.routine_history_path.exists() {
1722            return Ok(());
1723        }
1724        let raw = fs::read_to_string(&self.routine_history_path).await?;
1725        let parsed = serde_json::from_str::<
1726            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1727        >(&raw)
1728        .unwrap_or_default();
1729        let mut guard = self.routine_history.write().await;
1730        *guard = parsed;
1731        Ok(())
1732    }
1733
1734    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1735        if !self.routine_runs_path.exists() {
1736            return Ok(());
1737        }
1738        let raw = fs::read_to_string(&self.routine_runs_path).await?;
1739        let parsed =
1740            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1741                .unwrap_or_default();
1742        let mut guard = self.routine_runs.write().await;
1743        *guard = parsed;
1744        Ok(())
1745    }
1746
1747    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
1748        if let Some(parent) = self.routines_path.parent() {
1749            fs::create_dir_all(parent).await?;
1750        }
1751        let (payload, is_empty) = {
1752            let guard = self.routines.read().await;
1753            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
1754        };
1755        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
1756            let existing_raw = fs::read_to_string(&self.routines_path)
1757                .await
1758                .unwrap_or_default();
1759            let existing_has_rows = serde_json::from_str::<
1760                std::collections::HashMap<String, RoutineSpec>,
1761            >(&existing_raw)
1762            .map(|rows| !rows.is_empty())
1763            .unwrap_or(true);
1764            if existing_has_rows {
1765                return Err(anyhow::anyhow!(
1766                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
1767                    self.routines_path.display()
1768                ));
1769            }
1770        }
1771        let backup_path = sibling_backup_path(&self.routines_path);
1772        if self.routines_path.exists() {
1773            let _ = fs::copy(&self.routines_path, &backup_path).await;
1774        }
1775        let tmp_path = sibling_tmp_path(&self.routines_path);
1776        fs::write(&tmp_path, payload).await?;
1777        fs::rename(&tmp_path, &self.routines_path).await?;
1778        Ok(())
1779    }
1780
1781    pub async fn persist_routines(&self) -> anyhow::Result<()> {
1782        self.persist_routines_inner(false).await
1783    }
1784
1785    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1786        if let Some(parent) = self.routine_history_path.parent() {
1787            fs::create_dir_all(parent).await?;
1788        }
1789        let payload = {
1790            let guard = self.routine_history.read().await;
1791            serde_json::to_string_pretty(&*guard)?
1792        };
1793        fs::write(&self.routine_history_path, payload).await?;
1794        Ok(())
1795    }
1796
1797    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1798        if let Some(parent) = self.routine_runs_path.parent() {
1799            fs::create_dir_all(parent).await?;
1800        }
1801        let payload = {
1802            let guard = self.routine_runs.read().await;
1803            serde_json::to_string_pretty(&*guard)?
1804        };
1805        fs::write(&self.routine_runs_path, payload).await?;
1806        Ok(())
1807    }
1808
1809    pub async fn put_routine(
1810        &self,
1811        mut routine: RoutineSpec,
1812    ) -> Result<RoutineSpec, RoutineStoreError> {
1813        if routine.routine_id.trim().is_empty() {
1814            return Err(RoutineStoreError::InvalidRoutineId {
1815                routine_id: routine.routine_id,
1816            });
1817        }
1818
1819        routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1820        routine.output_targets = normalize_non_empty_list(routine.output_targets);
1821
1822        let now = now_ms();
1823        let next_schedule_fire =
1824            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1825                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1826                    detail: "invalid schedule or timezone".to_string(),
1827                })?;
1828        match routine.schedule {
1829            RoutineSchedule::IntervalSeconds { seconds } => {
1830                if seconds == 0 {
1831                    return Err(RoutineStoreError::InvalidSchedule {
1832                        detail: "interval_seconds must be > 0".to_string(),
1833                    });
1834                }
1835                let _ = seconds;
1836            }
1837            RoutineSchedule::Cron { .. } => {}
1838        }
1839        if routine.next_fire_at_ms.is_none() {
1840            routine.next_fire_at_ms = Some(next_schedule_fire);
1841        }
1842
1843        let mut guard = self.routines.write().await;
1844        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1845        drop(guard);
1846
1847        if let Err(error) = self.persist_routines().await {
1848            let mut rollback = self.routines.write().await;
1849            if let Some(previous) = previous {
1850                rollback.insert(previous.routine_id.clone(), previous);
1851            } else {
1852                rollback.remove(&routine.routine_id);
1853            }
1854            return Err(RoutineStoreError::PersistFailed {
1855                message: error.to_string(),
1856            });
1857        }
1858
1859        Ok(routine)
1860    }
1861
1862    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1863        let mut rows = self
1864            .routines
1865            .read()
1866            .await
1867            .values()
1868            .cloned()
1869            .collect::<Vec<_>>();
1870        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1871        rows
1872    }
1873
1874    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1875        self.routines.read().await.get(routine_id).cloned()
1876    }
1877
1878    pub async fn delete_routine(
1879        &self,
1880        routine_id: &str,
1881    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1882        let mut guard = self.routines.write().await;
1883        let removed = guard.remove(routine_id);
1884        drop(guard);
1885
1886        let allow_empty_overwrite = self.routines.read().await.is_empty();
1887        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
1888            if let Some(removed) = removed.clone() {
1889                self.routines
1890                    .write()
1891                    .await
1892                    .insert(removed.routine_id.clone(), removed);
1893            }
1894            return Err(RoutineStoreError::PersistFailed {
1895                message: error.to_string(),
1896            });
1897        }
1898        Ok(removed)
1899    }
1900
1901    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1902        let mut plans = Vec::new();
1903        let mut guard = self.routines.write().await;
1904        for routine in guard.values_mut() {
1905            if routine.status != RoutineStatus::Active {
1906                continue;
1907            }
1908            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1909                continue;
1910            };
1911            if now_ms < next_fire_at_ms {
1912                continue;
1913            }
1914            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1915                now_ms,
1916                next_fire_at_ms,
1917                &routine.schedule,
1918                &routine.timezone,
1919                &routine.misfire_policy,
1920            );
1921            routine.next_fire_at_ms = Some(next_fire_at_ms);
1922            if run_count == 0 {
1923                continue;
1924            }
1925            plans.push(RoutineTriggerPlan {
1926                routine_id: routine.routine_id.clone(),
1927                run_count,
1928                scheduled_at_ms: now_ms,
1929                next_fire_at_ms,
1930            });
1931        }
1932        drop(guard);
1933        let _ = self.persist_routines().await;
1934        plans
1935    }
1936
1937    pub async fn mark_routine_fired(
1938        &self,
1939        routine_id: &str,
1940        fired_at_ms: u64,
1941    ) -> Option<RoutineSpec> {
1942        let mut guard = self.routines.write().await;
1943        let routine = guard.get_mut(routine_id)?;
1944        routine.last_fired_at_ms = Some(fired_at_ms);
1945        let updated = routine.clone();
1946        drop(guard);
1947        let _ = self.persist_routines().await;
1948        Some(updated)
1949    }
1950
1951    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1952        let mut history = self.routine_history.write().await;
1953        history
1954            .entry(event.routine_id.clone())
1955            .or_default()
1956            .push(event);
1957        drop(history);
1958        let _ = self.persist_routine_history().await;
1959    }
1960
1961    pub async fn list_routine_history(
1962        &self,
1963        routine_id: &str,
1964        limit: usize,
1965    ) -> Vec<RoutineHistoryEvent> {
1966        let limit = limit.clamp(1, 500);
1967        let mut rows = self
1968            .routine_history
1969            .read()
1970            .await
1971            .get(routine_id)
1972            .cloned()
1973            .unwrap_or_default();
1974        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1975        rows.truncate(limit);
1976        rows
1977    }
1978
1979    pub async fn create_routine_run(
1980        &self,
1981        routine: &RoutineSpec,
1982        trigger_type: &str,
1983        run_count: u32,
1984        status: RoutineRunStatus,
1985        detail: Option<String>,
1986    ) -> RoutineRunRecord {
1987        let now = now_ms();
1988        let record = RoutineRunRecord {
1989            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1990            routine_id: routine.routine_id.clone(),
1991            trigger_type: trigger_type.to_string(),
1992            run_count,
1993            status,
1994            created_at_ms: now,
1995            updated_at_ms: now,
1996            fired_at_ms: Some(now),
1997            started_at_ms: None,
1998            finished_at_ms: None,
1999            requires_approval: routine.requires_approval,
2000            approval_reason: None,
2001            denial_reason: None,
2002            paused_reason: None,
2003            detail,
2004            entrypoint: routine.entrypoint.clone(),
2005            args: routine.args.clone(),
2006            allowed_tools: routine.allowed_tools.clone(),
2007            output_targets: routine.output_targets.clone(),
2008            artifacts: Vec::new(),
2009            active_session_ids: Vec::new(),
2010            latest_session_id: None,
2011            prompt_tokens: 0,
2012            completion_tokens: 0,
2013            total_tokens: 0,
2014            estimated_cost_usd: 0.0,
2015        };
2016        self.routine_runs
2017            .write()
2018            .await
2019            .insert(record.run_id.clone(), record.clone());
2020        let _ = self.persist_routine_runs().await;
2021        record
2022    }
2023
2024    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
2025        self.routine_runs.read().await.get(run_id).cloned()
2026    }
2027
2028    pub async fn list_routine_runs(
2029        &self,
2030        routine_id: Option<&str>,
2031        limit: usize,
2032    ) -> Vec<RoutineRunRecord> {
2033        let mut rows = self
2034            .routine_runs
2035            .read()
2036            .await
2037            .values()
2038            .filter(|row| {
2039                if let Some(id) = routine_id {
2040                    row.routine_id == id
2041                } else {
2042                    true
2043                }
2044            })
2045            .cloned()
2046            .collect::<Vec<_>>();
2047        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2048        rows.truncate(limit.clamp(1, 500));
2049        rows
2050    }
2051
2052    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
2053        let mut guard = self.routine_runs.write().await;
2054        let next_run_id = guard
2055            .values()
2056            .filter(|row| row.status == RoutineRunStatus::Queued)
2057            .min_by(|a, b| {
2058                a.created_at_ms
2059                    .cmp(&b.created_at_ms)
2060                    .then_with(|| a.run_id.cmp(&b.run_id))
2061            })
2062            .map(|row| row.run_id.clone())?;
2063        let now = now_ms();
2064        let row = guard.get_mut(&next_run_id)?;
2065        row.status = RoutineRunStatus::Running;
2066        row.updated_at_ms = now;
2067        row.started_at_ms = Some(now);
2068        let claimed = row.clone();
2069        drop(guard);
2070        let _ = self.persist_routine_runs().await;
2071        Some(claimed)
2072    }
2073
2074    pub async fn set_routine_session_policy(
2075        &self,
2076        session_id: String,
2077        run_id: String,
2078        routine_id: String,
2079        allowed_tools: Vec<String>,
2080    ) {
2081        let policy = RoutineSessionPolicy {
2082            session_id: session_id.clone(),
2083            run_id,
2084            routine_id,
2085            allowed_tools: normalize_allowed_tools(allowed_tools),
2086        };
2087        self.routine_session_policies
2088            .write()
2089            .await
2090            .insert(session_id, policy);
2091    }
2092
2093    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
2094        self.routine_session_policies
2095            .read()
2096            .await
2097            .get(session_id)
2098            .cloned()
2099    }
2100
2101    pub async fn clear_routine_session_policy(&self, session_id: &str) {
2102        self.routine_session_policies
2103            .write()
2104            .await
2105            .remove(session_id);
2106    }
2107
2108    pub async fn update_routine_run_status(
2109        &self,
2110        run_id: &str,
2111        status: RoutineRunStatus,
2112        reason: Option<String>,
2113    ) -> Option<RoutineRunRecord> {
2114        let mut guard = self.routine_runs.write().await;
2115        let row = guard.get_mut(run_id)?;
2116        row.status = status.clone();
2117        row.updated_at_ms = now_ms();
2118        match status {
2119            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
2120            RoutineRunStatus::Running => {
2121                row.started_at_ms.get_or_insert_with(now_ms);
2122                if let Some(detail) = reason {
2123                    row.detail = Some(detail);
2124                }
2125            }
2126            RoutineRunStatus::Denied => row.denial_reason = reason,
2127            RoutineRunStatus::Paused => row.paused_reason = reason,
2128            RoutineRunStatus::Completed
2129            | RoutineRunStatus::Failed
2130            | RoutineRunStatus::Cancelled => {
2131                row.finished_at_ms = Some(now_ms());
2132                if let Some(detail) = reason {
2133                    row.detail = Some(detail);
2134                }
2135            }
2136            _ => {
2137                if let Some(detail) = reason {
2138                    row.detail = Some(detail);
2139                }
2140            }
2141        }
2142        let updated = row.clone();
2143        drop(guard);
2144        let _ = self.persist_routine_runs().await;
2145        Some(updated)
2146    }
2147
2148    pub async fn append_routine_run_artifact(
2149        &self,
2150        run_id: &str,
2151        artifact: RoutineRunArtifact,
2152    ) -> Option<RoutineRunRecord> {
2153        let mut guard = self.routine_runs.write().await;
2154        let row = guard.get_mut(run_id)?;
2155        row.updated_at_ms = now_ms();
2156        row.artifacts.push(artifact);
2157        let updated = row.clone();
2158        drop(guard);
2159        let _ = self.persist_routine_runs().await;
2160        Some(updated)
2161    }
2162
2163    pub async fn add_active_session_id(
2164        &self,
2165        run_id: &str,
2166        session_id: String,
2167    ) -> Option<RoutineRunRecord> {
2168        let mut guard = self.routine_runs.write().await;
2169        let row = guard.get_mut(run_id)?;
2170        if !row.active_session_ids.iter().any(|id| id == &session_id) {
2171            row.active_session_ids.push(session_id);
2172        }
2173        row.latest_session_id = row.active_session_ids.last().cloned();
2174        row.updated_at_ms = now_ms();
2175        let updated = row.clone();
2176        drop(guard);
2177        let _ = self.persist_routine_runs().await;
2178        Some(updated)
2179    }
2180
2181    pub async fn clear_active_session_id(
2182        &self,
2183        run_id: &str,
2184        session_id: &str,
2185    ) -> Option<RoutineRunRecord> {
2186        let mut guard = self.routine_runs.write().await;
2187        let row = guard.get_mut(run_id)?;
2188        row.active_session_ids.retain(|id| id != session_id);
2189        row.updated_at_ms = now_ms();
2190        let updated = row.clone();
2191        drop(guard);
2192        let _ = self.persist_routine_runs().await;
2193        Some(updated)
2194    }
2195
2196    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
2197        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
2198        let mut loaded_from_alternate = false;
2199        let mut path_counts = Vec::new();
2200        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2201            if !path.exists() {
2202                path_counts.push((path, 0usize));
2203                continue;
2204            }
2205            let raw = fs::read_to_string(&path).await?;
2206            if raw.trim().is_empty() || raw.trim() == "{}" {
2207                path_counts.push((path, 0usize));
2208                continue;
2209            }
2210            let parsed = parse_automation_v2_file(&raw);
2211            path_counts.push((path.clone(), parsed.len()));
2212            if path != self.automations_v2_path {
2213                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2214            }
2215            for (automation_id, automation) in parsed {
2216                match merged.get(&automation_id) {
2217                    Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
2218                    _ => {
2219                        merged.insert(automation_id, automation);
2220                    }
2221                }
2222            }
2223        }
2224        let active_path = self.automations_v2_path.display().to_string();
2225        let path_count_summary = path_counts
2226            .iter()
2227            .map(|(path, count)| format!("{}={count}", path.display()))
2228            .collect::<Vec<_>>();
2229        tracing::info!(
2230            active_path,
2231            path_counts = ?path_count_summary,
2232            merged_count = merged.len(),
2233            "loaded automation v2 definitions"
2234        );
2235        *self.automations_v2.write().await = merged;
2236        if loaded_from_alternate {
2237            let _ = self.persist_automations_v2().await;
2238        }
2239        Ok(())
2240    }
2241
2242    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
2243        let payload = {
2244            let guard = self.automations_v2.read().await;
2245            serde_json::to_string_pretty(&*guard)?
2246        };
2247        if let Some(parent) = self.automations_v2_path.parent() {
2248            fs::create_dir_all(parent).await?;
2249        }
2250        fs::write(&self.automations_v2_path, &payload).await?;
2251        Ok(())
2252    }
2253
2254    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
2255        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
2256        let mut loaded_from_alternate = false;
2257        let mut path_counts = Vec::new();
2258        for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
2259            if !path.exists() {
2260                path_counts.push((path, 0usize));
2261                continue;
2262            }
2263            let raw = fs::read_to_string(&path).await?;
2264            if raw.trim().is_empty() || raw.trim() == "{}" {
2265                path_counts.push((path, 0usize));
2266                continue;
2267            }
2268            let parsed = parse_automation_v2_runs_file(&raw);
2269            path_counts.push((path.clone(), parsed.len()));
2270            if path != self.automation_v2_runs_path {
2271                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2272            }
2273            for (run_id, run) in parsed {
2274                match merged.get(&run_id) {
2275                    Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
2276                    _ => {
2277                        merged.insert(run_id, run);
2278                    }
2279                }
2280            }
2281        }
2282        let active_runs_path = self.automation_v2_runs_path.display().to_string();
2283        let run_path_count_summary = path_counts
2284            .iter()
2285            .map(|(path, count)| format!("{}={count}", path.display()))
2286            .collect::<Vec<_>>();
2287        tracing::info!(
2288            active_path = active_runs_path,
2289            path_counts = ?run_path_count_summary,
2290            merged_count = merged.len(),
2291            "loaded automation v2 runs"
2292        );
2293        *self.automation_v2_runs.write().await = merged;
2294        let recovered = self
2295            .recover_automation_definitions_from_run_snapshots()
2296            .await?;
2297        let automation_count = self.automations_v2.read().await.len();
2298        let run_count = self.automation_v2_runs.read().await.len();
2299        if automation_count == 0 && run_count > 0 {
2300            let active_automations_path = self.automations_v2_path.display().to_string();
2301            let active_runs_path = self.automation_v2_runs_path.display().to_string();
2302            tracing::warn!(
2303                active_automations_path,
2304                active_runs_path,
2305                run_count,
2306                "automation v2 definitions are empty while run history exists"
2307            );
2308        }
2309        if loaded_from_alternate || recovered > 0 {
2310            let _ = self.persist_automation_v2_runs().await;
2311        }
2312        Ok(())
2313    }
2314
2315    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
2316        let payload = {
2317            let guard = self.automation_v2_runs.read().await;
2318            serde_json::to_string_pretty(&*guard)?
2319        };
2320        if let Some(parent) = self.automation_v2_runs_path.parent() {
2321            fs::create_dir_all(parent).await?;
2322        }
2323        fs::write(&self.automation_v2_runs_path, &payload).await?;
2324        Ok(())
2325    }
2326
2327    async fn verify_automation_v2_persisted(
2328        &self,
2329        automation_id: &str,
2330        expected_present: bool,
2331    ) -> anyhow::Result<()> {
2332        let active_raw = if self.automations_v2_path.exists() {
2333            fs::read_to_string(&self.automations_v2_path).await?
2334        } else {
2335            String::new()
2336        };
2337        let active_parsed = parse_automation_v2_file(&active_raw);
2338        let active_present = active_parsed.contains_key(automation_id);
2339        if active_present != expected_present {
2340            let active_path = self.automations_v2_path.display().to_string();
2341            tracing::error!(
2342                automation_id,
2343                expected_present,
2344                actual_present = active_present,
2345                count = active_parsed.len(),
2346                active_path,
2347                "automation v2 persistence verification failed"
2348            );
2349            anyhow::bail!(
2350                "automation v2 persistence verification failed for `{}`",
2351                automation_id
2352            );
2353        }
2354        let mut alternate_mismatches = Vec::new();
2355        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2356            if path == self.automations_v2_path {
2357                continue;
2358            }
2359            let raw = if path.exists() {
2360                fs::read_to_string(&path).await?
2361            } else {
2362                String::new()
2363            };
2364            let parsed = parse_automation_v2_file(&raw);
2365            let present = parsed.contains_key(automation_id);
2366            if present != expected_present {
2367                alternate_mismatches.push(format!(
2368                    "{} expected_present={} actual_present={} count={}",
2369                    path.display(),
2370                    expected_present,
2371                    present,
2372                    parsed.len()
2373                ));
2374            }
2375        }
2376        if !alternate_mismatches.is_empty() {
2377            let active_path = self.automations_v2_path.display().to_string();
2378            tracing::warn!(
2379                automation_id,
2380                expected_present,
2381                mismatches = ?alternate_mismatches,
2382                active_path,
2383                "automation v2 alternate persistence paths are stale"
2384            );
2385        }
2386        Ok(())
2387    }
2388
2389    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
2390        let runs = self
2391            .automation_v2_runs
2392            .read()
2393            .await
2394            .values()
2395            .cloned()
2396            .collect::<Vec<_>>();
2397        let mut guard = self.automations_v2.write().await;
2398        let mut recovered = 0usize;
2399        for run in runs {
2400            let Some(snapshot) = run.automation_snapshot.clone() else {
2401                continue;
2402            };
2403            let should_replace = match guard.get(&run.automation_id) {
2404                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
2405                None => true,
2406            };
2407            if should_replace {
2408                if !guard.contains_key(&run.automation_id) {
2409                    recovered += 1;
2410                }
2411                guard.insert(run.automation_id.clone(), snapshot);
2412            }
2413        }
2414        drop(guard);
2415        if recovered > 0 {
2416            let active_path = self.automations_v2_path.display().to_string();
2417            tracing::warn!(
2418                recovered,
2419                active_path,
2420                "recovered automation v2 definitions from run snapshots"
2421            );
2422            self.persist_automations_v2().await?;
2423        }
2424        Ok(recovered)
2425    }
2426
2427    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
2428        let path = if self.bug_monitor_config_path.exists() {
2429            self.bug_monitor_config_path.clone()
2430        } else if legacy_failure_reporter_path("failure_reporter_config.json").exists() {
2431            legacy_failure_reporter_path("failure_reporter_config.json")
2432        } else {
2433            return Ok(());
2434        };
2435        let raw = fs::read_to_string(path).await?;
2436        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
2437            .unwrap_or_else(|_| resolve_bug_monitor_env_config());
2438        *self.bug_monitor_config.write().await = parsed;
2439        Ok(())
2440    }
2441
2442    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
2443        if let Some(parent) = self.bug_monitor_config_path.parent() {
2444            fs::create_dir_all(parent).await?;
2445        }
2446        let payload = {
2447            let guard = self.bug_monitor_config.read().await;
2448            serde_json::to_string_pretty(&*guard)?
2449        };
2450        fs::write(&self.bug_monitor_config_path, payload).await?;
2451        Ok(())
2452    }
2453
2454    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
2455        self.bug_monitor_config.read().await.clone()
2456    }
2457
2458    pub async fn put_bug_monitor_config(
2459        &self,
2460        mut config: BugMonitorConfig,
2461    ) -> anyhow::Result<BugMonitorConfig> {
2462        config.workspace_root = config
2463            .workspace_root
2464            .as_ref()
2465            .map(|v| v.trim().to_string())
2466            .filter(|v| !v.is_empty());
2467        if let Some(repo) = config.repo.as_ref() {
2468            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
2469                anyhow::bail!("repo must be in owner/repo format");
2470            }
2471        }
2472        if let Some(server) = config.mcp_server.as_ref() {
2473            let servers = self.mcp.list().await;
2474            if !servers.contains_key(server) {
2475                anyhow::bail!("unknown mcp server `{server}`");
2476            }
2477        }
2478        if let Some(model_policy) = config.model_policy.as_ref() {
2479            crate::http::routines_automations::validate_model_policy(model_policy)
2480                .map_err(anyhow::Error::msg)?;
2481        }
2482        config.updated_at_ms = now_ms();
2483        *self.bug_monitor_config.write().await = config.clone();
2484        self.persist_bug_monitor_config().await?;
2485        Ok(config)
2486    }
2487
2488    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2489        let path = if self.bug_monitor_drafts_path.exists() {
2490            self.bug_monitor_drafts_path.clone()
2491        } else if legacy_failure_reporter_path("failure_reporter_drafts.json").exists() {
2492            legacy_failure_reporter_path("failure_reporter_drafts.json")
2493        } else {
2494            return Ok(());
2495        };
2496        let raw = fs::read_to_string(path).await?;
2497        let parsed =
2498            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
2499                .unwrap_or_default();
2500        *self.bug_monitor_drafts.write().await = parsed;
2501        Ok(())
2502    }
2503
2504    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2505        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
2506            fs::create_dir_all(parent).await?;
2507        }
2508        let payload = {
2509            let guard = self.bug_monitor_drafts.read().await;
2510            serde_json::to_string_pretty(&*guard)?
2511        };
2512        fs::write(&self.bug_monitor_drafts_path, payload).await?;
2513        Ok(())
2514    }
2515
2516    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2517        let path = if self.bug_monitor_incidents_path.exists() {
2518            self.bug_monitor_incidents_path.clone()
2519        } else if legacy_failure_reporter_path("failure_reporter_incidents.json").exists() {
2520            legacy_failure_reporter_path("failure_reporter_incidents.json")
2521        } else {
2522            return Ok(());
2523        };
2524        let raw = fs::read_to_string(path).await?;
2525        let parsed = serde_json::from_str::<
2526            std::collections::HashMap<String, BugMonitorIncidentRecord>,
2527        >(&raw)
2528        .unwrap_or_default();
2529        *self.bug_monitor_incidents.write().await = parsed;
2530        Ok(())
2531    }
2532
2533    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2534        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
2535            fs::create_dir_all(parent).await?;
2536        }
2537        let payload = {
2538            let guard = self.bug_monitor_incidents.read().await;
2539            serde_json::to_string_pretty(&*guard)?
2540        };
2541        fs::write(&self.bug_monitor_incidents_path, payload).await?;
2542        Ok(())
2543    }
2544
2545    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
2546        let path = if self.bug_monitor_posts_path.exists() {
2547            self.bug_monitor_posts_path.clone()
2548        } else if legacy_failure_reporter_path("failure_reporter_posts.json").exists() {
2549            legacy_failure_reporter_path("failure_reporter_posts.json")
2550        } else {
2551            return Ok(());
2552        };
2553        let raw = fs::read_to_string(path).await?;
2554        let parsed =
2555            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
2556                .unwrap_or_default();
2557        *self.bug_monitor_posts.write().await = parsed;
2558        Ok(())
2559    }
2560
2561    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
2562        if let Some(parent) = self.bug_monitor_posts_path.parent() {
2563            fs::create_dir_all(parent).await?;
2564        }
2565        let payload = {
2566            let guard = self.bug_monitor_posts.read().await;
2567            serde_json::to_string_pretty(&*guard)?
2568        };
2569        fs::write(&self.bug_monitor_posts_path, payload).await?;
2570        Ok(())
2571    }
2572
2573    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
2574        let mut rows = self
2575            .bug_monitor_incidents
2576            .read()
2577            .await
2578            .values()
2579            .cloned()
2580            .collect::<Vec<_>>();
2581        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2582        rows.truncate(limit.clamp(1, 200));
2583        rows
2584    }
2585
2586    pub async fn get_bug_monitor_incident(
2587        &self,
2588        incident_id: &str,
2589    ) -> Option<BugMonitorIncidentRecord> {
2590        self.bug_monitor_incidents
2591            .read()
2592            .await
2593            .get(incident_id)
2594            .cloned()
2595    }
2596
2597    pub async fn put_bug_monitor_incident(
2598        &self,
2599        incident: BugMonitorIncidentRecord,
2600    ) -> anyhow::Result<BugMonitorIncidentRecord> {
2601        self.bug_monitor_incidents
2602            .write()
2603            .await
2604            .insert(incident.incident_id.clone(), incident.clone());
2605        self.persist_bug_monitor_incidents().await?;
2606        Ok(incident)
2607    }
2608
2609    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
2610        let mut rows = self
2611            .bug_monitor_posts
2612            .read()
2613            .await
2614            .values()
2615            .cloned()
2616            .collect::<Vec<_>>();
2617        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2618        rows.truncate(limit.clamp(1, 200));
2619        rows
2620    }
2621
2622    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
2623        self.bug_monitor_posts.read().await.get(post_id).cloned()
2624    }
2625
2626    pub async fn put_bug_monitor_post(
2627        &self,
2628        post: BugMonitorPostRecord,
2629    ) -> anyhow::Result<BugMonitorPostRecord> {
2630        self.bug_monitor_posts
2631            .write()
2632            .await
2633            .insert(post.post_id.clone(), post.clone());
2634        self.persist_bug_monitor_posts().await?;
2635        Ok(post)
2636    }
2637
2638    pub async fn update_bug_monitor_runtime_status(
2639        &self,
2640        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
2641    ) -> BugMonitorRuntimeStatus {
2642        let mut guard = self.bug_monitor_runtime_status.write().await;
2643        update(&mut guard);
2644        guard.clone()
2645    }
2646
2647    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
2648        let mut rows = self
2649            .bug_monitor_drafts
2650            .read()
2651            .await
2652            .values()
2653            .cloned()
2654            .collect::<Vec<_>>();
2655        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2656        rows.truncate(limit.clamp(1, 200));
2657        rows
2658    }
2659
2660    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
2661        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
2662    }
2663
2664    pub async fn put_bug_monitor_draft(
2665        &self,
2666        draft: BugMonitorDraftRecord,
2667    ) -> anyhow::Result<BugMonitorDraftRecord> {
2668        self.bug_monitor_drafts
2669            .write()
2670            .await
2671            .insert(draft.draft_id.clone(), draft.clone());
2672        self.persist_bug_monitor_drafts().await?;
2673        Ok(draft)
2674    }
2675
2676    pub async fn submit_bug_monitor_draft(
2677        &self,
2678        mut submission: BugMonitorSubmission,
2679    ) -> anyhow::Result<BugMonitorDraftRecord> {
2680        fn normalize_optional(value: Option<String>) -> Option<String> {
2681            value
2682                .map(|v| v.trim().to_string())
2683                .filter(|v| !v.is_empty())
2684        }
2685
2686        fn compute_fingerprint(parts: &[&str]) -> String {
2687            use std::hash::{Hash, Hasher};
2688
2689            let mut hasher = std::collections::hash_map::DefaultHasher::new();
2690            for part in parts {
2691                part.hash(&mut hasher);
2692            }
2693            format!("{:016x}", hasher.finish())
2694        }
2695
2696        submission.repo = normalize_optional(submission.repo);
2697        submission.title = normalize_optional(submission.title);
2698        submission.detail = normalize_optional(submission.detail);
2699        submission.source = normalize_optional(submission.source);
2700        submission.run_id = normalize_optional(submission.run_id);
2701        submission.session_id = normalize_optional(submission.session_id);
2702        submission.correlation_id = normalize_optional(submission.correlation_id);
2703        submission.file_name = normalize_optional(submission.file_name);
2704        submission.process = normalize_optional(submission.process);
2705        submission.component = normalize_optional(submission.component);
2706        submission.event = normalize_optional(submission.event);
2707        submission.level = normalize_optional(submission.level);
2708        submission.fingerprint = normalize_optional(submission.fingerprint);
2709        submission.excerpt = submission
2710            .excerpt
2711            .into_iter()
2712            .map(|line| line.trim_end().to_string())
2713            .filter(|line| !line.is_empty())
2714            .take(50)
2715            .collect();
2716
2717        let config = self.bug_monitor_config().await;
2718        let repo = submission
2719            .repo
2720            .clone()
2721            .or(config.repo.clone())
2722            .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
2723        if !is_valid_owner_repo_slug(&repo) {
2724            anyhow::bail!("Bug Monitor repo must be in owner/repo format");
2725        }
2726
2727        let title = submission.title.clone().unwrap_or_else(|| {
2728            if let Some(event) = submission.event.as_ref() {
2729                format!("Failure detected in {event}")
2730            } else if let Some(component) = submission.component.as_ref() {
2731                format!("Failure detected in {component}")
2732            } else if let Some(process) = submission.process.as_ref() {
2733                format!("Failure detected in {process}")
2734            } else if let Some(source) = submission.source.as_ref() {
2735                format!("Failure report from {source}")
2736            } else {
2737                "Failure report".to_string()
2738            }
2739        });
2740
2741        let mut detail_lines = Vec::new();
2742        if let Some(source) = submission.source.as_ref() {
2743            detail_lines.push(format!("source: {source}"));
2744        }
2745        if let Some(file_name) = submission.file_name.as_ref() {
2746            detail_lines.push(format!("file: {file_name}"));
2747        }
2748        if let Some(level) = submission.level.as_ref() {
2749            detail_lines.push(format!("level: {level}"));
2750        }
2751        if let Some(process) = submission.process.as_ref() {
2752            detail_lines.push(format!("process: {process}"));
2753        }
2754        if let Some(component) = submission.component.as_ref() {
2755            detail_lines.push(format!("component: {component}"));
2756        }
2757        if let Some(event) = submission.event.as_ref() {
2758            detail_lines.push(format!("event: {event}"));
2759        }
2760        if let Some(run_id) = submission.run_id.as_ref() {
2761            detail_lines.push(format!("run_id: {run_id}"));
2762        }
2763        if let Some(session_id) = submission.session_id.as_ref() {
2764            detail_lines.push(format!("session_id: {session_id}"));
2765        }
2766        if let Some(correlation_id) = submission.correlation_id.as_ref() {
2767            detail_lines.push(format!("correlation_id: {correlation_id}"));
2768        }
2769        if let Some(detail) = submission.detail.as_ref() {
2770            detail_lines.push(String::new());
2771            detail_lines.push(detail.clone());
2772        }
2773        if !submission.excerpt.is_empty() {
2774            if !detail_lines.is_empty() {
2775                detail_lines.push(String::new());
2776            }
2777            detail_lines.push("excerpt:".to_string());
2778            detail_lines.extend(submission.excerpt.iter().map(|line| format!("  {line}")));
2779        }
2780        let detail = if detail_lines.is_empty() {
2781            None
2782        } else {
2783            Some(detail_lines.join("\n"))
2784        };
2785
2786        let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2787            compute_fingerprint(&[
2788                repo.as_str(),
2789                title.as_str(),
2790                detail.as_deref().unwrap_or(""),
2791                submission.source.as_deref().unwrap_or(""),
2792                submission.run_id.as_deref().unwrap_or(""),
2793                submission.session_id.as_deref().unwrap_or(""),
2794                submission.correlation_id.as_deref().unwrap_or(""),
2795            ])
2796        });
2797
2798        let mut drafts = self.bug_monitor_drafts.write().await;
2799        if let Some(existing) = drafts
2800            .values()
2801            .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2802            .cloned()
2803        {
2804            return Ok(existing);
2805        }
2806
2807        let draft = BugMonitorDraftRecord {
2808            draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2809            fingerprint,
2810            repo,
2811            status: if config.require_approval_for_new_issues {
2812                "approval_required".to_string()
2813            } else {
2814                "draft_ready".to_string()
2815            },
2816            created_at_ms: now_ms(),
2817            triage_run_id: None,
2818            issue_number: None,
2819            title: Some(title),
2820            detail,
2821            github_status: None,
2822            github_issue_url: None,
2823            github_comment_url: None,
2824            github_posted_at_ms: None,
2825            matched_issue_number: None,
2826            matched_issue_state: None,
2827            evidence_digest: None,
2828            last_post_error: None,
2829        };
2830        drafts.insert(draft.draft_id.clone(), draft.clone());
2831        drop(drafts);
2832        self.persist_bug_monitor_drafts().await?;
2833        Ok(draft)
2834    }
2835
2836    pub async fn update_bug_monitor_draft_status(
2837        &self,
2838        draft_id: &str,
2839        next_status: &str,
2840        reason: Option<&str>,
2841    ) -> anyhow::Result<BugMonitorDraftRecord> {
2842        let normalized_status = next_status.trim().to_ascii_lowercase();
2843        if normalized_status != "draft_ready" && normalized_status != "denied" {
2844            anyhow::bail!("unsupported Bug Monitor draft status");
2845        }
2846
2847        let mut drafts = self.bug_monitor_drafts.write().await;
2848        let Some(draft) = drafts.get_mut(draft_id) else {
2849            anyhow::bail!("Bug Monitor draft not found");
2850        };
2851        if !draft.status.eq_ignore_ascii_case("approval_required") {
2852            anyhow::bail!("Bug Monitor draft is not waiting for approval");
2853        }
2854        draft.status = normalized_status.clone();
2855        if let Some(reason) = reason
2856            .map(|value| value.trim())
2857            .filter(|value| !value.is_empty())
2858        {
2859            let next_detail = if let Some(detail) = draft.detail.as_ref() {
2860                format!("{detail}\n\noperator_note: {reason}")
2861            } else {
2862                format!("operator_note: {reason}")
2863            };
2864            draft.detail = Some(next_detail);
2865        }
2866        let updated = draft.clone();
2867        drop(drafts);
2868        self.persist_bug_monitor_drafts().await?;
2869
2870        let event_name = if normalized_status == "draft_ready" {
2871            "bug_monitor.draft.approved"
2872        } else {
2873            "bug_monitor.draft.denied"
2874        };
2875        self.event_bus.publish(EngineEvent::new(
2876            event_name,
2877            serde_json::json!({
2878                "draft_id": updated.draft_id,
2879                "repo": updated.repo,
2880                "status": updated.status,
2881                "reason": reason,
2882            }),
2883        ));
2884        Ok(updated)
2885    }
2886
2887    pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2888        let required_capabilities = vec![
2889            "github.list_issues".to_string(),
2890            "github.get_issue".to_string(),
2891            "github.create_issue".to_string(),
2892            "github.comment_on_issue".to_string(),
2893        ];
2894        let config = self.bug_monitor_config().await;
2895        let drafts = self.bug_monitor_drafts.read().await;
2896        let incidents = self.bug_monitor_incidents.read().await;
2897        let posts = self.bug_monitor_posts.read().await;
2898        let total_incidents = incidents.len();
2899        let pending_incidents = incidents
2900            .values()
2901            .filter(|row| {
2902                matches!(
2903                    row.status.as_str(),
2904                    "queued"
2905                        | "draft_created"
2906                        | "triage_queued"
2907                        | "analysis_queued"
2908                        | "triage_pending"
2909                        | "issue_draft_pending"
2910                )
2911            })
2912            .count();
2913        let pending_drafts = drafts
2914            .values()
2915            .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2916            .count();
2917        let pending_posts = posts
2918            .values()
2919            .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2920            .count();
2921        let last_activity_at_ms = drafts
2922            .values()
2923            .map(|row| row.created_at_ms)
2924            .chain(posts.values().map(|row| row.updated_at_ms))
2925            .max();
2926        drop(drafts);
2927        drop(incidents);
2928        drop(posts);
2929        let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2930        runtime.paused = config.paused;
2931        runtime.total_incidents = total_incidents;
2932        runtime.pending_incidents = pending_incidents;
2933        runtime.pending_posts = pending_posts;
2934
2935        let mut status = BugMonitorStatus {
2936            config: config.clone(),
2937            runtime,
2938            pending_drafts,
2939            pending_posts,
2940            last_activity_at_ms,
2941            ..BugMonitorStatus::default()
2942        };
2943        let repo_valid = config
2944            .repo
2945            .as_ref()
2946            .map(|repo| is_valid_owner_repo_slug(repo))
2947            .unwrap_or(false);
2948        let servers = self.mcp.list().await;
2949        let selected_server = config
2950            .mcp_server
2951            .as_ref()
2952            .and_then(|name| servers.get(name))
2953            .cloned();
2954        let provider_catalog = self.providers.list().await;
2955        let selected_model = config
2956            .model_policy
2957            .as_ref()
2958            .and_then(|policy| policy.get("default_model"))
2959            .and_then(parse_model_spec);
2960        let selected_model_ready = selected_model
2961            .as_ref()
2962            .map(|spec| provider_catalog_has_model(&provider_catalog, spec))
2963            .unwrap_or(false);
2964        let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2965            self.mcp.server_tools(server_name).await
2966        } else {
2967            Vec::new()
2968        };
2969        let discovered_tools = self
2970            .capability_resolver
2971            .discover_from_runtime(selected_server_tools, Vec::new())
2972            .await;
2973        status.discovered_mcp_tools = discovered_tools
2974            .iter()
2975            .map(|row| row.tool_name.clone())
2976            .collect();
2977        let discovered_providers = discovered_tools
2978            .iter()
2979            .map(|row| row.provider.to_ascii_lowercase())
2980            .collect::<std::collections::HashSet<_>>();
2981        let provider_preference = match config.provider_preference {
2982            BugMonitorProviderPreference::OfficialGithub => {
2983                vec![
2984                    "mcp".to_string(),
2985                    "composio".to_string(),
2986                    "arcade".to_string(),
2987                ]
2988            }
2989            BugMonitorProviderPreference::Composio => {
2990                vec![
2991                    "composio".to_string(),
2992                    "mcp".to_string(),
2993                    "arcade".to_string(),
2994                ]
2995            }
2996            BugMonitorProviderPreference::Arcade => {
2997                vec![
2998                    "arcade".to_string(),
2999                    "mcp".to_string(),
3000                    "composio".to_string(),
3001                ]
3002            }
3003            BugMonitorProviderPreference::Auto => {
3004                vec![
3005                    "mcp".to_string(),
3006                    "composio".to_string(),
3007                    "arcade".to_string(),
3008                ]
3009            }
3010        };
3011        let capability_resolution = self
3012            .capability_resolver
3013            .resolve(
3014                crate::capability_resolver::CapabilityResolveInput {
3015                    workflow_id: Some("bug_monitor".to_string()),
3016                    required_capabilities: required_capabilities.clone(),
3017                    optional_capabilities: Vec::new(),
3018                    provider_preference,
3019                    available_tools: discovered_tools,
3020                },
3021                Vec::new(),
3022            )
3023            .await
3024            .ok();
3025        let bindings_file = self.capability_resolver.list_bindings().await.ok();
3026        if let Some(bindings) = bindings_file.as_ref() {
3027            status.binding_source_version = bindings.builtin_version.clone();
3028            status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
3029            status.selected_server_binding_candidates = bindings
3030                .bindings
3031                .iter()
3032                .filter(|binding| required_capabilities.contains(&binding.capability_id))
3033                .filter(|binding| {
3034                    discovered_providers.is_empty()
3035                        || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
3036                })
3037                .map(|binding| {
3038                    let binding_key = format!(
3039                        "{}::{}",
3040                        binding.capability_id,
3041                        binding.tool_name.to_ascii_lowercase()
3042                    );
3043                    let matched = capability_resolution
3044                        .as_ref()
3045                        .map(|resolution| {
3046                            resolution.resolved.iter().any(|row| {
3047                                row.capability_id == binding.capability_id
3048                                    && format!(
3049                                        "{}::{}",
3050                                        row.capability_id,
3051                                        row.tool_name.to_ascii_lowercase()
3052                                    ) == binding_key
3053                            })
3054                        })
3055                        .unwrap_or(false);
3056                    BugMonitorBindingCandidate {
3057                        capability_id: binding.capability_id.clone(),
3058                        binding_tool_name: binding.tool_name.clone(),
3059                        aliases: binding.tool_name_aliases.clone(),
3060                        matched,
3061                    }
3062                })
3063                .collect();
3064            status.selected_server_binding_candidates.sort_by(|a, b| {
3065                a.capability_id
3066                    .cmp(&b.capability_id)
3067                    .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
3068            });
3069        }
3070        let capability_ready = |capability_id: &str| -> bool {
3071            capability_resolution
3072                .as_ref()
3073                .map(|resolved| {
3074                    resolved
3075                        .resolved
3076                        .iter()
3077                        .any(|row| row.capability_id == capability_id)
3078                })
3079                .unwrap_or(false)
3080        };
3081        if let Some(resolution) = capability_resolution.as_ref() {
3082            status.missing_required_capabilities = resolution.missing_required.clone();
3083            status.resolved_capabilities = resolution
3084                .resolved
3085                .iter()
3086                .map(|row| BugMonitorCapabilityMatch {
3087                    capability_id: row.capability_id.clone(),
3088                    provider: row.provider.clone(),
3089                    tool_name: row.tool_name.clone(),
3090                    binding_index: row.binding_index,
3091                })
3092                .collect();
3093        } else {
3094            status.missing_required_capabilities = required_capabilities.clone();
3095        }
3096        status.required_capabilities = BugMonitorCapabilityReadiness {
3097            github_list_issues: capability_ready("github.list_issues"),
3098            github_get_issue: capability_ready("github.get_issue"),
3099            github_create_issue: capability_ready("github.create_issue"),
3100            github_comment_on_issue: capability_ready("github.comment_on_issue"),
3101        };
3102        status.selected_model = selected_model;
3103        status.readiness = BugMonitorReadiness {
3104            config_valid: repo_valid
3105                && selected_server.is_some()
3106                && status.required_capabilities.github_list_issues
3107                && status.required_capabilities.github_get_issue
3108                && status.required_capabilities.github_create_issue
3109                && status.required_capabilities.github_comment_on_issue
3110                && selected_model_ready,
3111            repo_valid,
3112            mcp_server_present: selected_server.is_some(),
3113            mcp_connected: selected_server
3114                .as_ref()
3115                .map(|row| row.connected)
3116                .unwrap_or(false),
3117            github_read_ready: status.required_capabilities.github_list_issues
3118                && status.required_capabilities.github_get_issue,
3119            github_write_ready: status.required_capabilities.github_create_issue
3120                && status.required_capabilities.github_comment_on_issue,
3121            selected_model_ready,
3122            ingest_ready: config.enabled && !config.paused && repo_valid,
3123            publish_ready: config.enabled
3124                && !config.paused
3125                && repo_valid
3126                && selected_server
3127                    .as_ref()
3128                    .map(|row| row.connected)
3129                    .unwrap_or(false)
3130                && status.required_capabilities.github_list_issues
3131                && status.required_capabilities.github_get_issue
3132                && status.required_capabilities.github_create_issue
3133                && status.required_capabilities.github_comment_on_issue
3134                && selected_model_ready,
3135            runtime_ready: config.enabled
3136                && !config.paused
3137                && repo_valid
3138                && selected_server
3139                    .as_ref()
3140                    .map(|row| row.connected)
3141                    .unwrap_or(false)
3142                && status.required_capabilities.github_list_issues
3143                && status.required_capabilities.github_get_issue
3144                && status.required_capabilities.github_create_issue
3145                && status.required_capabilities.github_comment_on_issue
3146                && selected_model_ready,
3147        };
3148        if config.enabled {
3149            if config.paused {
3150                status.last_error = Some("Bug monitor monitoring is paused.".to_string());
3151            } else if !repo_valid {
3152                status.last_error = Some("Target repo is missing or invalid.".to_string());
3153            } else if selected_server.is_none() {
3154                status.last_error = Some("Selected MCP server is missing.".to_string());
3155            } else if !status.readiness.mcp_connected {
3156                status.last_error = Some("Selected MCP server is disconnected.".to_string());
3157            } else if !selected_model_ready {
3158                status.last_error = Some(
3159                    "Selected provider/model is unavailable. Bug monitor is fail-closed."
3160                        .to_string(),
3161                );
3162            } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
3163                let missing = if status.missing_required_capabilities.is_empty() {
3164                    "unknown".to_string()
3165                } else {
3166                    status.missing_required_capabilities.join(", ")
3167                };
3168                status.last_error = Some(format!(
3169                    "Selected MCP server is missing required GitHub capabilities: {missing}"
3170                ));
3171            }
3172        }
3173        status.runtime.monitoring_active = status.readiness.ingest_ready;
3174        status
3175    }
3176
3177    pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
3178        if !self.workflow_runs_path.exists() {
3179            return Ok(());
3180        }
3181        let raw = fs::read_to_string(&self.workflow_runs_path).await?;
3182        let parsed =
3183            serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
3184                .unwrap_or_default();
3185        *self.workflow_runs.write().await = parsed;
3186        Ok(())
3187    }
3188
3189    pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
3190        if let Some(parent) = self.workflow_runs_path.parent() {
3191            fs::create_dir_all(parent).await?;
3192        }
3193        let payload = {
3194            let guard = self.workflow_runs.read().await;
3195            serde_json::to_string_pretty(&*guard)?
3196        };
3197        fs::write(&self.workflow_runs_path, payload).await?;
3198        Ok(())
3199    }
3200
3201    pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3202        if !self.workflow_hook_overrides_path.exists() {
3203            return Ok(());
3204        }
3205        let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
3206        let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
3207            .unwrap_or_default();
3208        *self.workflow_hook_overrides.write().await = parsed;
3209        Ok(())
3210    }
3211
3212    pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3213        if let Some(parent) = self.workflow_hook_overrides_path.parent() {
3214            fs::create_dir_all(parent).await?;
3215        }
3216        let payload = {
3217            let guard = self.workflow_hook_overrides.read().await;
3218            serde_json::to_string_pretty(&*guard)?
3219        };
3220        fs::write(&self.workflow_hook_overrides_path, payload).await?;
3221        Ok(())
3222    }
3223
3224    pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
3225        let mut sources = Vec::new();
3226        sources.push(WorkflowLoadSource {
3227            root: resolve_builtin_workflows_dir(),
3228            kind: WorkflowSourceKind::BuiltIn,
3229            pack_id: None,
3230        });
3231
3232        let workspace_root = self.workspace_index.snapshot().await.root;
3233        sources.push(WorkflowLoadSource {
3234            root: PathBuf::from(workspace_root).join(".tandem"),
3235            kind: WorkflowSourceKind::Workspace,
3236            pack_id: None,
3237        });
3238
3239        if let Ok(packs) = self.pack_manager.list().await {
3240            for pack in packs {
3241                sources.push(WorkflowLoadSource {
3242                    root: PathBuf::from(pack.install_path),
3243                    kind: WorkflowSourceKind::Pack,
3244                    pack_id: Some(pack.pack_id),
3245                });
3246            }
3247        }
3248
3249        let mut registry = load_workflow_registry(&sources)?;
3250        let overrides = self.workflow_hook_overrides.read().await.clone();
3251        for hook in &mut registry.hooks {
3252            if let Some(enabled) = overrides.get(&hook.binding_id) {
3253                hook.enabled = *enabled;
3254            }
3255        }
3256        for workflow in registry.workflows.values_mut() {
3257            workflow.hooks = registry
3258                .hooks
3259                .iter()
3260                .filter(|hook| hook.workflow_id == workflow.workflow_id)
3261                .cloned()
3262                .collect();
3263        }
3264        let messages = validate_workflow_registry(&registry);
3265        *self.workflows.write().await = registry;
3266        Ok(messages)
3267    }
3268
3269    pub async fn workflow_registry(&self) -> WorkflowRegistry {
3270        self.workflows.read().await.clone()
3271    }
3272
3273    pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
3274        let mut rows = self
3275            .workflows
3276            .read()
3277            .await
3278            .workflows
3279            .values()
3280            .cloned()
3281            .collect::<Vec<_>>();
3282        rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
3283        rows
3284    }
3285
3286    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
3287        self.workflows
3288            .read()
3289            .await
3290            .workflows
3291            .get(workflow_id)
3292            .cloned()
3293    }
3294
3295    pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
3296        let mut rows = self
3297            .workflows
3298            .read()
3299            .await
3300            .hooks
3301            .iter()
3302            .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
3303            .cloned()
3304            .collect::<Vec<_>>();
3305        rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
3306        rows
3307    }
3308
3309    pub async fn set_workflow_hook_enabled(
3310        &self,
3311        binding_id: &str,
3312        enabled: bool,
3313    ) -> anyhow::Result<Option<WorkflowHookBinding>> {
3314        self.workflow_hook_overrides
3315            .write()
3316            .await
3317            .insert(binding_id.to_string(), enabled);
3318        self.persist_workflow_hook_overrides().await?;
3319        let _ = self.reload_workflows().await?;
3320        Ok(self
3321            .workflows
3322            .read()
3323            .await
3324            .hooks
3325            .iter()
3326            .find(|hook| hook.binding_id == binding_id)
3327            .cloned())
3328    }
3329
3330    pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
3331        self.workflow_runs
3332            .write()
3333            .await
3334            .insert(run.run_id.clone(), run);
3335        self.persist_workflow_runs().await
3336    }
3337
3338    pub async fn update_workflow_run(
3339        &self,
3340        run_id: &str,
3341        update: impl FnOnce(&mut WorkflowRunRecord),
3342    ) -> Option<WorkflowRunRecord> {
3343        let mut guard = self.workflow_runs.write().await;
3344        let row = guard.get_mut(run_id)?;
3345        update(row);
3346        row.updated_at_ms = now_ms();
3347        if matches!(
3348            row.status,
3349            WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
3350        ) {
3351            row.finished_at_ms.get_or_insert_with(now_ms);
3352        }
3353        let out = row.clone();
3354        drop(guard);
3355        let _ = self.persist_workflow_runs().await;
3356        Some(out)
3357    }
3358
3359    pub async fn list_workflow_runs(
3360        &self,
3361        workflow_id: Option<&str>,
3362        limit: usize,
3363    ) -> Vec<WorkflowRunRecord> {
3364        let mut rows = self
3365            .workflow_runs
3366            .read()
3367            .await
3368            .values()
3369            .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
3370            .cloned()
3371            .collect::<Vec<_>>();
3372        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3373        rows.truncate(limit.clamp(1, 500));
3374        rows
3375    }
3376
3377    pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
3378        self.workflow_runs.read().await.get(run_id).cloned()
3379    }
3380
3381    pub async fn put_automation_v2(
3382        &self,
3383        mut automation: AutomationV2Spec,
3384    ) -> anyhow::Result<AutomationV2Spec> {
3385        if automation.automation_id.trim().is_empty() {
3386            anyhow::bail!("automation_id is required");
3387        }
3388        for agent in &mut automation.agents {
3389            if agent.display_name.trim().is_empty() {
3390                agent.display_name = auto_generated_agent_name(&agent.agent_id);
3391            }
3392            agent.tool_policy.allowlist =
3393                normalize_allowed_tools(agent.tool_policy.allowlist.clone());
3394            agent.tool_policy.denylist =
3395                normalize_allowed_tools(agent.tool_policy.denylist.clone());
3396            agent.mcp_policy.allowed_servers =
3397                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
3398            agent.mcp_policy.allowed_tools = agent
3399                .mcp_policy
3400                .allowed_tools
3401                .take()
3402                .map(normalize_allowed_tools);
3403        }
3404        let now = now_ms();
3405        if automation.created_at_ms == 0 {
3406            automation.created_at_ms = now;
3407        }
3408        automation.updated_at_ms = now;
3409        if automation.next_fire_at_ms.is_none() {
3410            automation.next_fire_at_ms =
3411                automation_schedule_next_fire_at_ms(&automation.schedule, now);
3412        }
3413        self.automations_v2
3414            .write()
3415            .await
3416            .insert(automation.automation_id.clone(), automation.clone());
3417        self.persist_automations_v2().await?;
3418        self.verify_automation_v2_persisted(&automation.automation_id, true)
3419            .await?;
3420        Ok(automation)
3421    }
3422
3423    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
3424        self.automations_v2.read().await.get(automation_id).cloned()
3425    }
3426
3427    pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
3428        self.workflow_plans
3429            .write()
3430            .await
3431            .insert(plan.plan_id.clone(), plan);
3432    }
3433
3434    pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
3435        self.workflow_plans.read().await.get(plan_id).cloned()
3436    }
3437
3438    pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
3439        self.workflow_plan_drafts
3440            .write()
3441            .await
3442            .insert(draft.current_plan.plan_id.clone(), draft.clone());
3443        self.put_workflow_plan(draft.current_plan).await;
3444    }
3445
3446    pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
3447        self.workflow_plan_drafts.read().await.get(plan_id).cloned()
3448    }
3449
3450    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3451        let mut rows = self
3452            .automations_v2
3453            .read()
3454            .await
3455            .values()
3456            .cloned()
3457            .collect::<Vec<_>>();
3458        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3459        rows
3460    }
3461
3462    pub async fn delete_automation_v2(
3463        &self,
3464        automation_id: &str,
3465    ) -> anyhow::Result<Option<AutomationV2Spec>> {
3466        let removed = self.automations_v2.write().await.remove(automation_id);
3467        self.persist_automations_v2().await?;
3468        self.verify_automation_v2_persisted(automation_id, false)
3469            .await?;
3470        Ok(removed)
3471    }
3472
3473    pub async fn create_automation_v2_run(
3474        &self,
3475        automation: &AutomationV2Spec,
3476        trigger_type: &str,
3477    ) -> anyhow::Result<AutomationV2RunRecord> {
3478        let now = now_ms();
3479        let pending_nodes = automation
3480            .flow
3481            .nodes
3482            .iter()
3483            .map(|n| n.node_id.clone())
3484            .collect::<Vec<_>>();
3485        let run = AutomationV2RunRecord {
3486            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
3487            automation_id: automation.automation_id.clone(),
3488            trigger_type: trigger_type.to_string(),
3489            status: AutomationRunStatus::Queued,
3490            created_at_ms: now,
3491            updated_at_ms: now,
3492            started_at_ms: None,
3493            finished_at_ms: None,
3494            active_session_ids: Vec::new(),
3495            active_instance_ids: Vec::new(),
3496            checkpoint: AutomationRunCheckpoint {
3497                completed_nodes: Vec::new(),
3498                pending_nodes,
3499                node_outputs: std::collections::HashMap::new(),
3500                node_attempts: std::collections::HashMap::new(),
3501            },
3502            automation_snapshot: Some(automation.clone()),
3503            pause_reason: None,
3504            resume_reason: None,
3505            detail: None,
3506            prompt_tokens: 0,
3507            completion_tokens: 0,
3508            total_tokens: 0,
3509            estimated_cost_usd: 0.0,
3510        };
3511        self.automation_v2_runs
3512            .write()
3513            .await
3514            .insert(run.run_id.clone(), run.clone());
3515        self.persist_automation_v2_runs().await?;
3516        Ok(run)
3517    }
3518
3519    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
3520        self.automation_v2_runs.read().await.get(run_id).cloned()
3521    }
3522
3523    pub async fn list_automation_v2_runs(
3524        &self,
3525        automation_id: Option<&str>,
3526        limit: usize,
3527    ) -> Vec<AutomationV2RunRecord> {
3528        let mut rows = self
3529            .automation_v2_runs
3530            .read()
3531            .await
3532            .values()
3533            .filter(|row| {
3534                if let Some(id) = automation_id {
3535                    row.automation_id == id
3536                } else {
3537                    true
3538                }
3539            })
3540            .cloned()
3541            .collect::<Vec<_>>();
3542        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3543        rows.truncate(limit.clamp(1, 500));
3544        rows
3545    }
3546
3547    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
3548        let mut guard = self.automation_v2_runs.write().await;
3549        let run_id = guard
3550            .values()
3551            .filter(|row| row.status == AutomationRunStatus::Queued)
3552            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
3553            .map(|row| row.run_id.clone())?;
3554        let now = now_ms();
3555        let run = guard.get_mut(&run_id)?;
3556        run.status = AutomationRunStatus::Running;
3557        run.updated_at_ms = now;
3558        run.started_at_ms.get_or_insert(now);
3559        let claimed = run.clone();
3560        drop(guard);
3561        let _ = self.persist_automation_v2_runs().await;
3562        Some(claimed)
3563    }
3564
3565    pub async fn update_automation_v2_run(
3566        &self,
3567        run_id: &str,
3568        update: impl FnOnce(&mut AutomationV2RunRecord),
3569    ) -> Option<AutomationV2RunRecord> {
3570        let mut guard = self.automation_v2_runs.write().await;
3571        let run = guard.get_mut(run_id)?;
3572        update(run);
3573        run.updated_at_ms = now_ms();
3574        if matches!(
3575            run.status,
3576            AutomationRunStatus::Completed
3577                | AutomationRunStatus::Failed
3578                | AutomationRunStatus::Cancelled
3579        ) {
3580            run.finished_at_ms.get_or_insert_with(now_ms);
3581        }
3582        let out = run.clone();
3583        drop(guard);
3584        let _ = self.persist_automation_v2_runs().await;
3585        Some(out)
3586    }
3587
3588    pub async fn add_automation_v2_session(
3589        &self,
3590        run_id: &str,
3591        session_id: &str,
3592    ) -> Option<AutomationV2RunRecord> {
3593        let updated = self
3594            .update_automation_v2_run(run_id, |row| {
3595                if !row.active_session_ids.iter().any(|id| id == session_id) {
3596                    row.active_session_ids.push(session_id.to_string());
3597                }
3598            })
3599            .await;
3600        self.automation_v2_session_runs
3601            .write()
3602            .await
3603            .insert(session_id.to_string(), run_id.to_string());
3604        updated
3605    }
3606
3607    pub async fn clear_automation_v2_session(
3608        &self,
3609        run_id: &str,
3610        session_id: &str,
3611    ) -> Option<AutomationV2RunRecord> {
3612        self.automation_v2_session_runs
3613            .write()
3614            .await
3615            .remove(session_id);
3616        self.update_automation_v2_run(run_id, |row| {
3617            row.active_session_ids.retain(|id| id != session_id);
3618        })
3619        .await
3620    }
3621
3622    pub async fn apply_provider_usage_to_runs(
3623        &self,
3624        session_id: &str,
3625        prompt_tokens: u64,
3626        completion_tokens: u64,
3627        total_tokens: u64,
3628    ) {
3629        if let Some(policy) = self.routine_session_policy(session_id).await {
3630            let rate = self.token_cost_per_1k_usd.max(0.0);
3631            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3632            let mut guard = self.routine_runs.write().await;
3633            if let Some(run) = guard.get_mut(&policy.run_id) {
3634                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3635                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3636                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3637                run.estimated_cost_usd += delta_cost;
3638                run.updated_at_ms = now_ms();
3639            }
3640            drop(guard);
3641            let _ = self.persist_routine_runs().await;
3642        }
3643
3644        let maybe_v2_run_id = self
3645            .automation_v2_session_runs
3646            .read()
3647            .await
3648            .get(session_id)
3649            .cloned();
3650        if let Some(run_id) = maybe_v2_run_id {
3651            let rate = self.token_cost_per_1k_usd.max(0.0);
3652            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3653            let mut guard = self.automation_v2_runs.write().await;
3654            if let Some(run) = guard.get_mut(&run_id) {
3655                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3656                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3657                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3658                run.estimated_cost_usd += delta_cost;
3659                run.updated_at_ms = now_ms();
3660            }
3661            drop(guard);
3662            let _ = self.persist_automation_v2_runs().await;
3663        }
3664    }
3665
3666    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
3667        let mut fired = Vec::new();
3668        let mut guard = self.automations_v2.write().await;
3669        for automation in guard.values_mut() {
3670            if automation.status != AutomationV2Status::Active {
3671                continue;
3672            }
3673            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
3674                automation.next_fire_at_ms =
3675                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3676                continue;
3677            };
3678            if now_ms < next_fire_at_ms {
3679                continue;
3680            }
3681            let run_count =
3682                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
3683            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3684            automation.next_fire_at_ms = next;
3685            automation.last_fired_at_ms = Some(now_ms);
3686            for _ in 0..run_count {
3687                fired.push(automation.automation_id.clone());
3688            }
3689        }
3690        drop(guard);
3691        let _ = self.persist_automations_v2().await;
3692        fired
3693    }
3694}
3695
3696async fn build_channels_config(
3697    state: &AppState,
3698    channels: &ChannelsConfigFile,
3699) -> Option<ChannelsConfig> {
3700    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
3701        return None;
3702    }
3703    Some(ChannelsConfig {
3704        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
3705            bot_token: cfg.bot_token,
3706            allowed_users: cfg.allowed_users,
3707            mention_only: cfg.mention_only,
3708            style_profile: cfg.style_profile,
3709        }),
3710        discord: channels.discord.clone().map(|cfg| DiscordConfig {
3711            bot_token: cfg.bot_token,
3712            guild_id: cfg.guild_id,
3713            allowed_users: cfg.allowed_users,
3714            mention_only: cfg.mention_only,
3715        }),
3716        slack: channels.slack.clone().map(|cfg| SlackConfig {
3717            bot_token: cfg.bot_token,
3718            channel_id: cfg.channel_id,
3719            allowed_users: cfg.allowed_users,
3720            mention_only: cfg.mention_only,
3721        }),
3722        server_base_url: state.server_base_url(),
3723        api_token: state.api_token().await.unwrap_or_default(),
3724        tool_policy: channels.tool_policy.clone(),
3725    })
3726}
3727
3728fn normalize_web_ui_prefix(prefix: &str) -> String {
3729    let trimmed = prefix.trim();
3730    if trimmed.is_empty() || trimmed == "/" {
3731        return "/admin".to_string();
3732    }
3733    let with_leading = if trimmed.starts_with('/') {
3734        trimmed.to_string()
3735    } else {
3736        format!("/{trimmed}")
3737    };
3738    with_leading.trim_end_matches('/').to_string()
3739}
3740
3741fn default_web_ui_prefix() -> String {
3742    "/admin".to_string()
3743}
3744
3745fn default_allow_all() -> Vec<String> {
3746    vec!["*".to_string()]
3747}
3748
3749fn default_discord_mention_only() -> bool {
3750    true
3751}
3752
3753fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
3754    normalize_non_empty_list(raw)
3755}
3756
3757fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
3758    let mut out = Vec::new();
3759    let mut seen = std::collections::HashSet::new();
3760    for item in raw {
3761        let normalized = item.trim().to_string();
3762        if normalized.is_empty() {
3763            continue;
3764        }
3765        if seen.insert(normalized.clone()) {
3766            out.push(normalized);
3767        }
3768    }
3769    out
3770}
3771
3772fn resolve_run_stale_ms() -> u64 {
3773    std::env::var("TANDEM_RUN_STALE_MS")
3774        .ok()
3775        .and_then(|v| v.trim().parse::<u64>().ok())
3776        .unwrap_or(120_000)
3777        .clamp(30_000, 600_000)
3778}
3779
3780fn resolve_token_cost_per_1k_usd() -> f64 {
3781    std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
3782        .ok()
3783        .and_then(|v| v.trim().parse::<f64>().ok())
3784        .unwrap_or(0.0)
3785        .max(0.0)
3786}
3787
3788fn default_true() -> bool {
3789    true
3790}
3791
3792fn parse_bool_env(key: &str, default: bool) -> bool {
3793    std::env::var(key)
3794        .ok()
3795        .map(|raw| {
3796            matches!(
3797                raw.trim().to_ascii_lowercase().as_str(),
3798                "1" | "true" | "yes" | "on"
3799            )
3800        })
3801        .unwrap_or(default)
3802}
3803
3804fn resolve_bug_monitor_env_config() -> BugMonitorConfig {
3805    fn env_value(new_name: &str, legacy_name: &str) -> Option<String> {
3806        std::env::var(new_name)
3807            .ok()
3808            .or_else(|| std::env::var(legacy_name).ok())
3809            .map(|v| v.trim().to_string())
3810            .filter(|v| !v.is_empty())
3811    }
3812
3813    fn env_bool(new_name: &str, legacy_name: &str, default: bool) -> bool {
3814        env_value(new_name, legacy_name)
3815            .map(|value| parse_bool_like(&value, default))
3816            .unwrap_or(default)
3817    }
3818
3819    fn parse_bool_like(value: &str, default: bool) -> bool {
3820        match value.trim().to_ascii_lowercase().as_str() {
3821            "1" | "true" | "yes" | "on" => true,
3822            "0" | "false" | "no" | "off" => false,
3823            _ => default,
3824        }
3825    }
3826
3827    let provider_preference = match env_value(
3828        "TANDEM_BUG_MONITOR_PROVIDER_PREFERENCE",
3829        "TANDEM_FAILURE_REPORTER_PROVIDER_PREFERENCE",
3830    )
3831    .unwrap_or_default()
3832    .trim()
3833    .to_ascii_lowercase()
3834    .as_str()
3835    {
3836        "official_github" | "official-github" | "github" => {
3837            BugMonitorProviderPreference::OfficialGithub
3838        }
3839        "composio" => BugMonitorProviderPreference::Composio,
3840        "arcade" => BugMonitorProviderPreference::Arcade,
3841        _ => BugMonitorProviderPreference::Auto,
3842    };
3843    let provider_id = env_value(
3844        "TANDEM_BUG_MONITOR_PROVIDER_ID",
3845        "TANDEM_FAILURE_REPORTER_PROVIDER_ID",
3846    );
3847    let model_id = env_value(
3848        "TANDEM_BUG_MONITOR_MODEL_ID",
3849        "TANDEM_FAILURE_REPORTER_MODEL_ID",
3850    );
3851    let model_policy = match (provider_id, model_id) {
3852        (Some(provider_id), Some(model_id)) => Some(json!({
3853            "default_model": {
3854                "provider_id": provider_id,
3855                "model_id": model_id,
3856            }
3857        })),
3858        _ => None,
3859    };
3860    BugMonitorConfig {
3861        enabled: env_bool(
3862            "TANDEM_BUG_MONITOR_ENABLED",
3863            "TANDEM_FAILURE_REPORTER_ENABLED",
3864            false,
3865        ),
3866        paused: env_bool(
3867            "TANDEM_BUG_MONITOR_PAUSED",
3868            "TANDEM_FAILURE_REPORTER_PAUSED",
3869            false,
3870        ),
3871        workspace_root: env_value(
3872            "TANDEM_BUG_MONITOR_WORKSPACE_ROOT",
3873            "TANDEM_FAILURE_REPORTER_WORKSPACE_ROOT",
3874        ),
3875        repo: env_value("TANDEM_BUG_MONITOR_REPO", "TANDEM_FAILURE_REPORTER_REPO"),
3876        mcp_server: env_value(
3877            "TANDEM_BUG_MONITOR_MCP_SERVER",
3878            "TANDEM_FAILURE_REPORTER_MCP_SERVER",
3879        ),
3880        provider_preference,
3881        model_policy,
3882        auto_create_new_issues: env_bool(
3883            "TANDEM_BUG_MONITOR_AUTO_CREATE_NEW_ISSUES",
3884            "TANDEM_FAILURE_REPORTER_AUTO_CREATE_NEW_ISSUES",
3885            true,
3886        ),
3887        require_approval_for_new_issues: env_bool(
3888            "TANDEM_BUG_MONITOR_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3889            "TANDEM_FAILURE_REPORTER_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3890            false,
3891        ),
3892        auto_comment_on_matched_open_issues: env_bool(
3893            "TANDEM_BUG_MONITOR_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3894            "TANDEM_FAILURE_REPORTER_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3895            true,
3896        ),
3897        label_mode: BugMonitorLabelMode::ReporterOnly,
3898        updated_at_ms: 0,
3899    }
3900}
3901
3902fn is_valid_owner_repo_slug(value: &str) -> bool {
3903    let trimmed = value.trim();
3904    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
3905        return false;
3906    }
3907    let mut parts = trimmed.split('/');
3908    let Some(owner) = parts.next() else {
3909        return false;
3910    };
3911    let Some(repo) = parts.next() else {
3912        return false;
3913    };
3914    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
3915}
3916
3917fn resolve_shared_resources_path() -> PathBuf {
3918    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3919        let trimmed = dir.trim();
3920        if !trimmed.is_empty() {
3921            return PathBuf::from(trimmed).join("shared_resources.json");
3922        }
3923    }
3924    default_state_dir().join("shared_resources.json")
3925}
3926
3927fn resolve_routines_path() -> PathBuf {
3928    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3929        let trimmed = dir.trim();
3930        if !trimmed.is_empty() {
3931            return PathBuf::from(trimmed).join("routines.json");
3932        }
3933    }
3934    default_state_dir().join("routines.json")
3935}
3936
3937fn resolve_routine_history_path() -> PathBuf {
3938    if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
3939        let trimmed = root.trim();
3940        if !trimmed.is_empty() {
3941            return PathBuf::from(trimmed).join("routine_history.json");
3942        }
3943    }
3944    default_state_dir().join("routine_history.json")
3945}
3946
3947fn resolve_routine_runs_path() -> PathBuf {
3948    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
3949        let trimmed = root.trim();
3950        if !trimmed.is_empty() {
3951            return PathBuf::from(trimmed).join("routine_runs.json");
3952        }
3953    }
3954    default_state_dir().join("routine_runs.json")
3955}
3956
3957fn resolve_automations_v2_path() -> PathBuf {
3958    resolve_canonical_data_file_path("automations_v2.json")
3959}
3960
3961fn legacy_automations_v2_path() -> Option<PathBuf> {
3962    resolve_legacy_root_file_path("automations_v2.json")
3963        .filter(|path| path != &resolve_automations_v2_path())
3964}
3965
3966fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
3967    let mut candidates = vec![active_path.clone()];
3968    if let Some(legacy_path) = legacy_automations_v2_path() {
3969        if !candidates.contains(&legacy_path) {
3970            candidates.push(legacy_path);
3971        }
3972    }
3973    let default_path = default_state_dir().join("automations_v2.json");
3974    if !candidates.contains(&default_path) {
3975        candidates.push(default_path);
3976    }
3977    candidates
3978}
3979
3980fn resolve_automation_v2_runs_path() -> PathBuf {
3981    resolve_canonical_data_file_path("automation_v2_runs.json")
3982}
3983
3984fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
3985    resolve_legacy_root_file_path("automation_v2_runs.json")
3986        .filter(|path| path != &resolve_automation_v2_runs_path())
3987}
3988
3989fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
3990    let mut candidates = vec![active_path.clone()];
3991    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
3992        if !candidates.contains(&legacy_path) {
3993            candidates.push(legacy_path);
3994        }
3995    }
3996    let default_path = default_state_dir().join("automation_v2_runs.json");
3997    if !candidates.contains(&default_path) {
3998        candidates.push(default_path);
3999    }
4000    candidates
4001}
4002
4003fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4004    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4005        .unwrap_or_default()
4006}
4007
4008fn parse_automation_v2_runs_file(
4009    raw: &str,
4010) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4011    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4012        .unwrap_or_default()
4013}
4014
4015fn resolve_canonical_data_file_path(file_name: &str) -> PathBuf {
4016    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4017        let trimmed = root.trim();
4018        if !trimmed.is_empty() {
4019            let base = PathBuf::from(trimmed);
4020            return if path_is_data_dir(&base) {
4021                base.join(file_name)
4022            } else {
4023                base.join("data").join(file_name)
4024            };
4025        }
4026    }
4027    default_state_dir().join(file_name)
4028}
4029
4030fn resolve_legacy_root_file_path(file_name: &str) -> Option<PathBuf> {
4031    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4032        let trimmed = root.trim();
4033        if !trimmed.is_empty() {
4034            let base = PathBuf::from(trimmed);
4035            if !path_is_data_dir(&base) {
4036                return Some(base.join(file_name));
4037            }
4038        }
4039    }
4040    resolve_shared_paths()
4041        .ok()
4042        .map(|paths| paths.canonical_root.join(file_name))
4043}
4044
4045fn path_is_data_dir(path: &std::path::Path) -> bool {
4046    path.file_name()
4047        .and_then(|value| value.to_str())
4048        .map(|value| value.eq_ignore_ascii_case("data"))
4049        .unwrap_or(false)
4050}
4051
4052fn resolve_workflow_runs_path() -> PathBuf {
4053    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4054        let trimmed = root.trim();
4055        if !trimmed.is_empty() {
4056            return PathBuf::from(trimmed).join("workflow_runs.json");
4057        }
4058    }
4059    default_state_dir().join("workflow_runs.json")
4060}
4061
4062fn resolve_bug_monitor_config_path() -> PathBuf {
4063    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4064        let trimmed = root.trim();
4065        if !trimmed.is_empty() {
4066            return PathBuf::from(trimmed).join("bug_monitor_config.json");
4067        }
4068    }
4069    default_state_dir().join("bug_monitor_config.json")
4070}
4071
4072fn resolve_bug_monitor_drafts_path() -> PathBuf {
4073    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4074        let trimmed = root.trim();
4075        if !trimmed.is_empty() {
4076            return PathBuf::from(trimmed).join("bug_monitor_drafts.json");
4077        }
4078    }
4079    default_state_dir().join("bug_monitor_drafts.json")
4080}
4081
4082fn resolve_bug_monitor_incidents_path() -> PathBuf {
4083    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4084        let trimmed = root.trim();
4085        if !trimmed.is_empty() {
4086            return PathBuf::from(trimmed).join("bug_monitor_incidents.json");
4087        }
4088    }
4089    default_state_dir().join("bug_monitor_incidents.json")
4090}
4091
4092fn resolve_bug_monitor_posts_path() -> PathBuf {
4093    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4094        let trimmed = root.trim();
4095        if !trimmed.is_empty() {
4096            return PathBuf::from(trimmed).join("bug_monitor_posts.json");
4097        }
4098    }
4099    default_state_dir().join("bug_monitor_posts.json")
4100}
4101
4102fn legacy_failure_reporter_path(file_name: &str) -> PathBuf {
4103    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4104        let trimmed = root.trim();
4105        if !trimmed.is_empty() {
4106            return PathBuf::from(trimmed).join(file_name);
4107        }
4108    }
4109    default_state_dir().join(file_name)
4110}
4111
4112fn resolve_workflow_hook_overrides_path() -> PathBuf {
4113    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4114        let trimmed = root.trim();
4115        if !trimmed.is_empty() {
4116            return PathBuf::from(trimmed).join("workflow_hook_overrides.json");
4117        }
4118    }
4119    default_state_dir().join("workflow_hook_overrides.json")
4120}
4121
4122fn resolve_builtin_workflows_dir() -> PathBuf {
4123    if let Ok(root) = std::env::var("TANDEM_BUILTIN_WORKFLOW_DIR") {
4124        let trimmed = root.trim();
4125        if !trimmed.is_empty() {
4126            return PathBuf::from(trimmed);
4127        }
4128    }
4129    default_state_dir().join("builtin_workflows")
4130}
4131
4132fn resolve_agent_team_audit_path() -> PathBuf {
4133    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
4134        let trimmed = base.trim();
4135        if !trimmed.is_empty() {
4136            return PathBuf::from(trimmed)
4137                .join("agent-team")
4138                .join("audit.log.jsonl");
4139        }
4140    }
4141    default_state_dir()
4142        .join("agent-team")
4143        .join("audit.log.jsonl")
4144}
4145
4146fn default_state_dir() -> PathBuf {
4147    if let Ok(paths) = resolve_shared_paths() {
4148        return paths.engine_state_dir;
4149    }
4150    if let Some(data_dir) = dirs::data_dir() {
4151        return data_dir.join("tandem").join("data");
4152    }
4153    dirs::home_dir()
4154        .map(|home| home.join(".tandem").join("data"))
4155        .unwrap_or_else(|| PathBuf::from(".tandem"))
4156}
4157
4158fn sibling_backup_path(path: &PathBuf) -> PathBuf {
4159    let base = path
4160        .file_name()
4161        .and_then(|name| name.to_str())
4162        .unwrap_or("state.json");
4163    let backup_name = format!("{base}.bak");
4164    path.with_file_name(backup_name)
4165}
4166
4167fn sibling_tmp_path(path: &PathBuf) -> PathBuf {
4168    let base = path
4169        .file_name()
4170        .and_then(|name| name.to_str())
4171        .unwrap_or("state.json");
4172    let tmp_name = format!("{base}.tmp");
4173    path.with_file_name(tmp_name)
4174}
4175
4176fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4177    match schedule {
4178        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4179        RoutineSchedule::Cron { .. } => None,
4180    }
4181}
4182
4183fn parse_timezone(timezone: &str) -> Option<Tz> {
4184    timezone.trim().parse::<Tz>().ok()
4185}
4186
4187fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4188    let tz = parse_timezone(timezone)?;
4189    let schedule = Schedule::from_str(expression).ok()?;
4190    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4191    let local_from = from_dt.with_timezone(&tz);
4192    let next = schedule.after(&local_from).next()?;
4193    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4194}
4195
4196fn compute_next_schedule_fire_at_ms(
4197    schedule: &RoutineSchedule,
4198    timezone: &str,
4199    from_ms: u64,
4200) -> Option<u64> {
4201    let _ = parse_timezone(timezone)?;
4202    match schedule {
4203        RoutineSchedule::IntervalSeconds { seconds } => {
4204            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4205        }
4206        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4207    }
4208}
4209
4210fn compute_misfire_plan_for_schedule(
4211    now_ms: u64,
4212    next_fire_at_ms: u64,
4213    schedule: &RoutineSchedule,
4214    timezone: &str,
4215    policy: &RoutineMisfirePolicy,
4216) -> (u32, u64) {
4217    match schedule {
4218        RoutineSchedule::IntervalSeconds { .. } => {
4219            let Some(interval_ms) = routine_interval_ms(schedule) else {
4220                return (0, next_fire_at_ms);
4221            };
4222            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4223        }
4224        RoutineSchedule::Cron { expression } => {
4225            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4226                .unwrap_or_else(|| now_ms.saturating_add(60_000));
4227            match policy {
4228                RoutineMisfirePolicy::Skip => (0, aligned_next),
4229                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4230                RoutineMisfirePolicy::CatchUp { max_runs } => {
4231                    let mut count = 0u32;
4232                    let mut cursor = next_fire_at_ms;
4233                    while cursor <= now_ms && count < *max_runs {
4234                        count = count.saturating_add(1);
4235                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4236                            break;
4237                        };
4238                        if next <= cursor {
4239                            break;
4240                        }
4241                        cursor = next;
4242                    }
4243                    (count, aligned_next)
4244                }
4245            }
4246        }
4247    }
4248}
4249
4250fn compute_misfire_plan(
4251    now_ms: u64,
4252    next_fire_at_ms: u64,
4253    interval_ms: u64,
4254    policy: &RoutineMisfirePolicy,
4255) -> (u32, u64) {
4256    if now_ms < next_fire_at_ms || interval_ms == 0 {
4257        return (0, next_fire_at_ms);
4258    }
4259    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4260    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4261    match policy {
4262        RoutineMisfirePolicy::Skip => (0, aligned_next),
4263        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4264        RoutineMisfirePolicy::CatchUp { max_runs } => {
4265            let count = missed.min(u64::from(*max_runs)) as u32;
4266            (count, aligned_next)
4267        }
4268    }
4269}
4270
4271fn auto_generated_agent_name(agent_id: &str) -> String {
4272    let names = [
4273        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4274    ];
4275    let digest = Sha256::digest(agent_id.as_bytes());
4276    let idx = usize::from(digest[0]) % names.len();
4277    format!("{}-{:02x}", names[idx], digest[1])
4278}
4279
4280fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4281    match schedule.schedule_type {
4282        AutomationV2ScheduleType::Manual => None,
4283        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4284            seconds: schedule.interval_seconds.unwrap_or(60),
4285        }),
4286        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4287            expression: schedule.cron_expression.clone().unwrap_or_default(),
4288        }),
4289    }
4290}
4291
4292fn automation_schedule_next_fire_at_ms(
4293    schedule: &AutomationV2Schedule,
4294    from_ms: u64,
4295) -> Option<u64> {
4296    let routine_schedule = schedule_from_automation_v2(schedule)?;
4297    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4298}
4299
4300fn automation_schedule_due_count(
4301    schedule: &AutomationV2Schedule,
4302    now_ms: u64,
4303    next_fire_at_ms: u64,
4304) -> u32 {
4305    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4306        return 0;
4307    };
4308    let (count, _) = compute_misfire_plan_for_schedule(
4309        now_ms,
4310        next_fire_at_ms,
4311        &routine_schedule,
4312        &schedule.timezone,
4313        &schedule.misfire_policy,
4314    );
4315    count.max(1)
4316}
4317
4318#[derive(Debug, Clone, PartialEq, Eq)]
4319pub enum RoutineExecutionDecision {
4320    Allowed,
4321    RequiresApproval { reason: String },
4322    Blocked { reason: String },
4323}
4324
4325pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4326    let entrypoint = routine.entrypoint.to_ascii_lowercase();
4327    if entrypoint.starts_with("connector.")
4328        || entrypoint.starts_with("integration.")
4329        || entrypoint.contains("external")
4330    {
4331        return true;
4332    }
4333    routine
4334        .args
4335        .get("uses_external_integrations")
4336        .and_then(|v| v.as_bool())
4337        .unwrap_or(false)
4338        || routine
4339            .args
4340            .get("connector_id")
4341            .and_then(|v| v.as_str())
4342            .is_some()
4343}
4344
4345pub fn evaluate_routine_execution_policy(
4346    routine: &RoutineSpec,
4347    trigger_type: &str,
4348) -> RoutineExecutionDecision {
4349    if !routine_uses_external_integrations(routine) {
4350        return RoutineExecutionDecision::Allowed;
4351    }
4352    if !routine.external_integrations_allowed {
4353        return RoutineExecutionDecision::Blocked {
4354            reason: "external integrations are disabled by policy".to_string(),
4355        };
4356    }
4357    if routine.requires_approval {
4358        return RoutineExecutionDecision::RequiresApproval {
4359            reason: format!(
4360                "manual approval required before external side effects ({})",
4361                trigger_type
4362            ),
4363        };
4364    }
4365    RoutineExecutionDecision::Allowed
4366}
4367
4368fn is_valid_resource_key(key: &str) -> bool {
4369    let trimmed = key.trim();
4370    if trimmed.is_empty() {
4371        return false;
4372    }
4373    if trimmed == "swarm.active_tasks" {
4374        return true;
4375    }
4376    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4377    if !allowed_prefix
4378        .iter()
4379        .any(|prefix| trimmed.starts_with(prefix))
4380    {
4381        return false;
4382    }
4383    !trimmed.contains("//")
4384}
4385
4386impl Deref for AppState {
4387    type Target = RuntimeState;
4388
4389    fn deref(&self) -> &Self::Target {
4390        self.runtime
4391            .get()
4392            .expect("runtime accessed before startup completion")
4393    }
4394}
4395
4396#[derive(Clone)]
4397struct ServerPromptContextHook {
4398    state: AppState,
4399}
4400
4401impl ServerPromptContextHook {
4402    fn new(state: AppState) -> Self {
4403        Self { state }
4404    }
4405
4406    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4407        let paths = resolve_shared_paths().ok()?;
4408        MemoryDatabase::new(&paths.memory_db_path).await.ok()
4409    }
4410
4411    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4412        let paths = resolve_shared_paths().ok()?;
4413        tandem_memory::MemoryManager::new(&paths.memory_db_path)
4414            .await
4415            .ok()
4416    }
4417
4418    fn hash_query(input: &str) -> String {
4419        let mut hasher = Sha256::new();
4420        hasher.update(input.as_bytes());
4421        format!("{:x}", hasher.finalize())
4422    }
4423
4424    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4425        let mut out = vec!["<memory_context>".to_string()];
4426        let mut used = 0usize;
4427        for hit in hits {
4428            let text = hit
4429                .record
4430                .content
4431                .split_whitespace()
4432                .take(60)
4433                .collect::<Vec<_>>()
4434                .join(" ");
4435            let line = format!(
4436                "- [{:.3}] {} (source={}, run={})",
4437                hit.score, text, hit.record.source_type, hit.record.run_id
4438            );
4439            used = used.saturating_add(line.len());
4440            if used > 2200 {
4441                break;
4442            }
4443            out.push(line);
4444        }
4445        out.push("</memory_context>".to_string());
4446        out.join("\n")
4447    }
4448
4449    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4450        chunk
4451            .metadata
4452            .as_ref()
4453            .and_then(|meta| meta.get("source_url"))
4454            .and_then(Value::as_str)
4455            .map(str::trim)
4456            .filter(|v| !v.is_empty())
4457            .map(ToString::to_string)
4458    }
4459
4460    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4461        if let Some(path) = chunk
4462            .metadata
4463            .as_ref()
4464            .and_then(|meta| meta.get("relative_path"))
4465            .and_then(Value::as_str)
4466            .map(str::trim)
4467            .filter(|v| !v.is_empty())
4468        {
4469            return path.to_string();
4470        }
4471        chunk
4472            .source
4473            .strip_prefix("guide_docs:")
4474            .unwrap_or(chunk.source.as_str())
4475            .to_string()
4476    }
4477
4478    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4479        let mut out = vec!["<docs_context>".to_string()];
4480        let mut used = 0usize;
4481        for hit in hits {
4482            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4483            let path = Self::extract_docs_relative_path(&hit.chunk);
4484            let text = hit
4485                .chunk
4486                .content
4487                .split_whitespace()
4488                .take(70)
4489                .collect::<Vec<_>>()
4490                .join(" ");
4491            let line = format!(
4492                "- [{:.3}] {} (doc_path={}, source_url={})",
4493                hit.similarity, text, path, url
4494            );
4495            used = used.saturating_add(line.len());
4496            if used > 2800 {
4497                break;
4498            }
4499            out.push(line);
4500        }
4501        out.push("</docs_context>".to_string());
4502        out.join("\n")
4503    }
4504
4505    async fn search_embedded_docs(
4506        &self,
4507        query: &str,
4508        limit: usize,
4509    ) -> Vec<tandem_memory::types::MemorySearchResult> {
4510        let Some(manager) = self.open_memory_manager().await else {
4511            return Vec::new();
4512        };
4513        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
4514        manager
4515            .search(
4516                query,
4517                Some(MemoryTier::Global),
4518                None,
4519                None,
4520                Some(search_limit),
4521            )
4522            .await
4523            .unwrap_or_default()
4524            .into_iter()
4525            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
4526            .take(limit)
4527            .collect()
4528    }
4529
4530    fn should_skip_memory_injection(query: &str) -> bool {
4531        let trimmed = query.trim();
4532        if trimmed.is_empty() {
4533            return true;
4534        }
4535        let lower = trimmed.to_ascii_lowercase();
4536        let social = [
4537            "hi",
4538            "hello",
4539            "hey",
4540            "thanks",
4541            "thank you",
4542            "ok",
4543            "okay",
4544            "cool",
4545            "nice",
4546            "yo",
4547            "good morning",
4548            "good afternoon",
4549            "good evening",
4550        ];
4551        lower.len() <= 32 && social.contains(&lower.as_str())
4552    }
4553
4554    fn personality_preset_text(preset: &str) -> &'static str {
4555        match preset {
4556            "concise" => {
4557                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
4558            }
4559            "friendly" => {
4560                "Default style: friendly and supportive while staying technically rigorous and concrete."
4561            }
4562            "mentor" => {
4563                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
4564            }
4565            "critical" => {
4566                "Default style: critical and risk-first. Surface failure modes and assumptions early."
4567            }
4568            _ => {
4569                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
4570            }
4571        }
4572    }
4573
4574    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
4575        let allow_agent_override = agent_name
4576            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
4577            .unwrap_or(false);
4578        let legacy_bot_name = config
4579            .get("bot_name")
4580            .and_then(Value::as_str)
4581            .map(str::trim)
4582            .filter(|v| !v.is_empty());
4583        let bot_name = config
4584            .get("identity")
4585            .and_then(|identity| identity.get("bot"))
4586            .and_then(|bot| bot.get("canonical_name"))
4587            .and_then(Value::as_str)
4588            .map(str::trim)
4589            .filter(|v| !v.is_empty())
4590            .or(legacy_bot_name)
4591            .unwrap_or("Tandem");
4592
4593        let default_profile = config
4594            .get("identity")
4595            .and_then(|identity| identity.get("personality"))
4596            .and_then(|personality| personality.get("default"));
4597        let default_preset = default_profile
4598            .and_then(|profile| profile.get("preset"))
4599            .and_then(Value::as_str)
4600            .map(str::trim)
4601            .filter(|v| !v.is_empty())
4602            .unwrap_or("balanced");
4603        let default_custom = default_profile
4604            .and_then(|profile| profile.get("custom_instructions"))
4605            .and_then(Value::as_str)
4606            .map(str::trim)
4607            .filter(|v| !v.is_empty())
4608            .map(ToString::to_string);
4609        let legacy_persona = config
4610            .get("persona")
4611            .and_then(Value::as_str)
4612            .map(str::trim)
4613            .filter(|v| !v.is_empty())
4614            .map(ToString::to_string);
4615
4616        let per_agent_profile = if allow_agent_override {
4617            agent_name.and_then(|name| {
4618                config
4619                    .get("identity")
4620                    .and_then(|identity| identity.get("personality"))
4621                    .and_then(|personality| personality.get("per_agent"))
4622                    .and_then(|per_agent| per_agent.get(name))
4623            })
4624        } else {
4625            None
4626        };
4627        let preset = per_agent_profile
4628            .and_then(|profile| profile.get("preset"))
4629            .and_then(Value::as_str)
4630            .map(str::trim)
4631            .filter(|v| !v.is_empty())
4632            .unwrap_or(default_preset);
4633        let custom = per_agent_profile
4634            .and_then(|profile| profile.get("custom_instructions"))
4635            .and_then(Value::as_str)
4636            .map(str::trim)
4637            .filter(|v| !v.is_empty())
4638            .map(ToString::to_string)
4639            .or(default_custom)
4640            .or(legacy_persona);
4641
4642        let mut lines = vec![
4643            format!("You are {bot_name}, an AI assistant."),
4644            Self::personality_preset_text(preset).to_string(),
4645        ];
4646        if let Some(custom) = custom {
4647            lines.push(format!("Additional personality instructions: {custom}"));
4648        }
4649        Some(lines.join("\n"))
4650    }
4651
4652    fn build_memory_scope_block(
4653        session_id: &str,
4654        project_id: Option<&str>,
4655        workspace_root: Option<&str>,
4656    ) -> String {
4657        let mut lines = vec![
4658            "<memory_scope>".to_string(),
4659            format!("- current_session_id: {}", session_id),
4660        ];
4661        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
4662            lines.push(format!("- current_project_id: {}", project_id));
4663        }
4664        if let Some(workspace_root) = workspace_root
4665            .map(str::trim)
4666            .filter(|value| !value.is_empty())
4667        {
4668            lines.push(format!("- workspace_root: {}", workspace_root));
4669        }
4670        lines.push(
4671            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
4672                .to_string(),
4673        );
4674        lines.push(
4675            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
4676                .to_string(),
4677        );
4678        lines.push(
4679            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
4680                .to_string(),
4681        );
4682        lines.push("</memory_scope>".to_string());
4683        lines.join("\n")
4684    }
4685}
4686
4687impl PromptContextHook for ServerPromptContextHook {
4688    fn augment_provider_messages(
4689        &self,
4690        ctx: PromptContextHookContext,
4691        mut messages: Vec<ChatMessage>,
4692    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
4693        let this = self.clone();
4694        Box::pin(async move {
4695            // Startup can invoke prompt plumbing before RuntimeState is installed.
4696            // Never panic from context hooks; fail-open and continue without augmentation.
4697            if !this.state.is_ready() {
4698                return Ok(messages);
4699            }
4700            let run = this.state.run_registry.get(&ctx.session_id).await;
4701            let Some(run) = run else {
4702                return Ok(messages);
4703            };
4704            let config = this.state.config.get_effective_value().await;
4705            if let Some(identity_block) =
4706                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
4707            {
4708                messages.push(ChatMessage {
4709                    role: "system".to_string(),
4710                    content: identity_block,
4711                    attachments: Vec::new(),
4712                });
4713            }
4714            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
4715                messages.push(ChatMessage {
4716                    role: "system".to_string(),
4717                    content: Self::build_memory_scope_block(
4718                        &ctx.session_id,
4719                        session.project_id.as_deref(),
4720                        session.workspace_root.as_deref(),
4721                    ),
4722                    attachments: Vec::new(),
4723                });
4724            }
4725            let run_id = run.run_id;
4726            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
4727            let query = messages
4728                .iter()
4729                .rev()
4730                .find(|m| m.role == "user")
4731                .map(|m| m.content.clone())
4732                .unwrap_or_default();
4733            if query.trim().is_empty() {
4734                return Ok(messages);
4735            }
4736            if Self::should_skip_memory_injection(&query) {
4737                return Ok(messages);
4738            }
4739
4740            let docs_hits = this.search_embedded_docs(&query, 6).await;
4741            if !docs_hits.is_empty() {
4742                let docs_block = Self::build_docs_memory_block(&docs_hits);
4743                messages.push(ChatMessage {
4744                    role: "system".to_string(),
4745                    content: docs_block.clone(),
4746                    attachments: Vec::new(),
4747                });
4748                this.state.event_bus.publish(EngineEvent::new(
4749                    "memory.docs.context.injected",
4750                    json!({
4751                        "runID": run_id,
4752                        "sessionID": ctx.session_id,
4753                        "messageID": ctx.message_id,
4754                        "iteration": ctx.iteration,
4755                        "count": docs_hits.len(),
4756                        "tokenSizeApprox": docs_block.split_whitespace().count(),
4757                        "sourcePrefix": "guide_docs:"
4758                    }),
4759                ));
4760                return Ok(messages);
4761            }
4762
4763            let Some(db) = this.open_memory_db().await else {
4764                return Ok(messages);
4765            };
4766            let started = now_ms();
4767            let hits = db
4768                .search_global_memory(&user_id, &query, 8, None, None, None)
4769                .await
4770                .unwrap_or_default();
4771            let latency_ms = now_ms().saturating_sub(started);
4772            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
4773            this.state.event_bus.publish(EngineEvent::new(
4774                "memory.search.performed",
4775                json!({
4776                    "runID": run_id,
4777                    "sessionID": ctx.session_id,
4778                    "messageID": ctx.message_id,
4779                    "providerID": ctx.provider_id,
4780                    "modelID": ctx.model_id,
4781                    "iteration": ctx.iteration,
4782                    "queryHash": Self::hash_query(&query),
4783                    "resultCount": hits.len(),
4784                    "scoreMin": scores.iter().copied().reduce(f64::min),
4785                    "scoreMax": scores.iter().copied().reduce(f64::max),
4786                    "scores": scores,
4787                    "latencyMs": latency_ms,
4788                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
4789                }),
4790            ));
4791
4792            if hits.is_empty() {
4793                return Ok(messages);
4794            }
4795
4796            let memory_block = Self::build_memory_block(&hits);
4797            messages.push(ChatMessage {
4798                role: "system".to_string(),
4799                content: memory_block.clone(),
4800                attachments: Vec::new(),
4801            });
4802            this.state.event_bus.publish(EngineEvent::new(
4803                "memory.context.injected",
4804                json!({
4805                    "runID": run_id,
4806                    "sessionID": ctx.session_id,
4807                    "messageID": ctx.message_id,
4808                    "iteration": ctx.iteration,
4809                    "count": hits.len(),
4810                    "tokenSizeApprox": memory_block.split_whitespace().count(),
4811                }),
4812            ));
4813            Ok(messages)
4814        })
4815    }
4816}
4817
4818fn extract_event_session_id(properties: &Value) -> Option<String> {
4819    properties
4820        .get("sessionID")
4821        .or_else(|| properties.get("sessionId"))
4822        .or_else(|| properties.get("id"))
4823        .or_else(|| {
4824            properties
4825                .get("part")
4826                .and_then(|part| part.get("sessionID"))
4827        })
4828        .or_else(|| {
4829            properties
4830                .get("part")
4831                .and_then(|part| part.get("sessionId"))
4832        })
4833        .and_then(|v| v.as_str())
4834        .map(|s| s.to_string())
4835}
4836
4837fn extract_event_run_id(properties: &Value) -> Option<String> {
4838    properties
4839        .get("runID")
4840        .or_else(|| properties.get("run_id"))
4841        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
4842        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
4843        .and_then(|v| v.as_str())
4844        .map(|s| s.to_string())
4845}
4846
4847fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
4848    let part = properties.get("part")?;
4849    let part_type = part
4850        .get("type")
4851        .and_then(|v| v.as_str())
4852        .unwrap_or_default()
4853        .to_ascii_lowercase();
4854    if part_type != "tool" && part_type != "tool-invocation" && part_type != "tool-result" {
4855        return None;
4856    }
4857    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
4858    let message_id = part
4859        .get("messageID")
4860        .or_else(|| part.get("message_id"))
4861        .and_then(|v| v.as_str())?
4862        .to_string();
4863    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
4864    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
4865        if let Some(preview) = properties
4866            .get("toolCallDelta")
4867            .and_then(|delta| delta.get("parsedArgsPreview"))
4868            .cloned()
4869        {
4870            let preview_nonempty = !preview.is_null()
4871                && !preview.as_object().is_some_and(|value| value.is_empty())
4872                && !preview
4873                    .as_str()
4874                    .map(|value| value.trim().is_empty())
4875                    .unwrap_or(false);
4876            if preview_nonempty {
4877                args = preview;
4878            }
4879        }
4880    }
4881    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
4882    {
4883        tracing::info!(
4884            message_id = %message_id,
4885            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
4886            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
4887            has_result = part.get("result").is_some(),
4888            has_error = part.get("error").is_some(),
4889            "persistable write tool part still has empty args"
4890        );
4891    }
4892    let result = part.get("result").cloned().filter(|value| !value.is_null());
4893    let error = part
4894        .get("error")
4895        .and_then(|v| v.as_str())
4896        .map(|value| value.to_string());
4897    Some((
4898        message_id,
4899        MessagePart::ToolInvocation {
4900            tool,
4901            args,
4902            result,
4903            error,
4904        },
4905    ))
4906}
4907
4908fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
4909    let session_id = extract_event_session_id(&event.properties)?;
4910    let run_id = extract_event_run_id(&event.properties);
4911    let key = format!("run/{session_id}/status");
4912
4913    let mut base = serde_json::Map::new();
4914    base.insert("sessionID".to_string(), Value::String(session_id));
4915    if let Some(run_id) = run_id {
4916        base.insert("runID".to_string(), Value::String(run_id));
4917    }
4918
4919    match event.event_type.as_str() {
4920        "session.run.started" => {
4921            base.insert("state".to_string(), Value::String("running".to_string()));
4922            base.insert("phase".to_string(), Value::String("run".to_string()));
4923            base.insert(
4924                "eventType".to_string(),
4925                Value::String("session.run.started".to_string()),
4926            );
4927            Some(StatusIndexUpdate {
4928                key,
4929                value: Value::Object(base),
4930            })
4931        }
4932        "session.run.finished" => {
4933            base.insert("state".to_string(), Value::String("finished".to_string()));
4934            base.insert("phase".to_string(), Value::String("run".to_string()));
4935            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
4936                base.insert("result".to_string(), Value::String(status.to_string()));
4937            }
4938            base.insert(
4939                "eventType".to_string(),
4940                Value::String("session.run.finished".to_string()),
4941            );
4942            Some(StatusIndexUpdate {
4943                key,
4944                value: Value::Object(base),
4945            })
4946        }
4947        "message.part.updated" => {
4948            let part_type = event
4949                .properties
4950                .get("part")
4951                .and_then(|v| v.get("type"))
4952                .and_then(|v| v.as_str())?;
4953            let part_state = event
4954                .properties
4955                .get("part")
4956                .and_then(|v| v.get("state"))
4957                .and_then(|v| v.as_str())
4958                .unwrap_or("");
4959            let (phase, tool_active) = match (part_type, part_state) {
4960                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
4961                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
4962                _ => return None,
4963            };
4964            base.insert("state".to_string(), Value::String("running".to_string()));
4965            base.insert("phase".to_string(), Value::String(phase.to_string()));
4966            base.insert("toolActive".to_string(), Value::Bool(tool_active));
4967            if let Some(tool_name) = event
4968                .properties
4969                .get("part")
4970                .and_then(|v| v.get("tool"))
4971                .and_then(|v| v.as_str())
4972            {
4973                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
4974            }
4975            base.insert(
4976                "eventType".to_string(),
4977                Value::String("message.part.updated".to_string()),
4978            );
4979            Some(StatusIndexUpdate {
4980                key,
4981                value: Value::Object(base),
4982            })
4983        }
4984        _ => None,
4985    }
4986}
4987
4988pub async fn run_session_part_persister(state: AppState) {
4989    if !state.wait_until_ready_or_failed(120, 250).await {
4990        tracing::warn!("session part persister: skipped because runtime did not become ready");
4991        return;
4992    }
4993    let Some(mut rx) = state.event_bus.take_session_part_receiver() else {
4994        tracing::warn!("session part persister: skipped because receiver was already taken");
4995        return;
4996    };
4997    while let Some(event) = rx.recv().await {
4998        if event.event_type != "message.part.updated" {
4999            continue;
5000        }
5001        // Streaming tool-call previews are useful for the live UI, but persistence
5002        // should store the finalized invocation/result events to avoid duplicating
5003        // one tool part per streamed args delta.
5004        if event.properties.get("toolCallDelta").is_some() {
5005            continue;
5006        }
5007        let Some(session_id) = extract_event_session_id(&event.properties) else {
5008            continue;
5009        };
5010        let Some((message_id, part)) = extract_persistable_tool_part(&event.properties) else {
5011            continue;
5012        };
5013        if let Err(error) = state
5014            .storage
5015            .append_message_part(&session_id, &message_id, part)
5016            .await
5017        {
5018            tracing::warn!(
5019                "session part persister failed for session={} message={}: {error:#}",
5020                session_id,
5021                message_id
5022            );
5023        }
5024    }
5025}
5026
5027pub async fn run_status_indexer(state: AppState) {
5028    if !state.wait_until_ready_or_failed(120, 250).await {
5029        tracing::warn!("status indexer: skipped because runtime did not become ready");
5030        return;
5031    }
5032    let mut rx = state.event_bus.subscribe();
5033    loop {
5034        match rx.recv().await {
5035            Ok(event) => {
5036                if let Some(update) = derive_status_index_update(&event) {
5037                    if let Err(error) = state
5038                        .put_shared_resource(
5039                            update.key,
5040                            update.value,
5041                            None,
5042                            "system.status_indexer".to_string(),
5043                            None,
5044                        )
5045                        .await
5046                    {
5047                        tracing::warn!("status indexer failed to persist update: {error:?}");
5048                    }
5049                }
5050            }
5051            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5052            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5053        }
5054    }
5055}
5056
5057pub async fn run_agent_team_supervisor(state: AppState) {
5058    if !state.wait_until_ready_or_failed(120, 250).await {
5059        tracing::warn!("agent team supervisor: skipped because runtime did not become ready");
5060        return;
5061    }
5062    let mut rx = state.event_bus.subscribe();
5063    loop {
5064        match rx.recv().await {
5065            Ok(event) => {
5066                state.agent_teams.handle_engine_event(&state, &event).await;
5067            }
5068            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5069            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5070        }
5071    }
5072}
5073
5074pub async fn run_bug_monitor(state: AppState) {
5075    if !state.wait_until_ready_or_failed(120, 250).await {
5076        tracing::warn!("bug monitor: skipped because runtime did not become ready");
5077        return;
5078    }
5079    state
5080        .update_bug_monitor_runtime_status(|runtime| {
5081            runtime.monitoring_active = false;
5082            runtime.last_runtime_error = None;
5083        })
5084        .await;
5085    let mut rx = state.event_bus.subscribe();
5086    loop {
5087        match rx.recv().await {
5088            Ok(event) => {
5089                if !is_bug_monitor_candidate_event(&event) {
5090                    continue;
5091                }
5092                let status = state.bug_monitor_status().await;
5093                if !status.config.enabled || status.config.paused || !status.readiness.repo_valid {
5094                    state
5095                        .update_bug_monitor_runtime_status(|runtime| {
5096                            runtime.monitoring_active = status.config.enabled
5097                                && !status.config.paused
5098                                && status.readiness.repo_valid;
5099                            runtime.paused = status.config.paused;
5100                            runtime.last_runtime_error = status.last_error.clone();
5101                        })
5102                        .await;
5103                    continue;
5104                }
5105                match process_bug_monitor_event(&state, &event, &status.config).await {
5106                    Ok(incident) => {
5107                        state
5108                            .update_bug_monitor_runtime_status(|runtime| {
5109                                runtime.monitoring_active = true;
5110                                runtime.paused = status.config.paused;
5111                                runtime.last_processed_at_ms = Some(now_ms());
5112                                runtime.last_incident_event_type =
5113                                    Some(incident.event_type.clone());
5114                                runtime.last_runtime_error = None;
5115                            })
5116                            .await;
5117                    }
5118                    Err(error) => {
5119                        let detail = truncate_text(&error.to_string(), 500);
5120                        state
5121                            .update_bug_monitor_runtime_status(|runtime| {
5122                                runtime.monitoring_active = true;
5123                                runtime.paused = status.config.paused;
5124                                runtime.last_processed_at_ms = Some(now_ms());
5125                                runtime.last_incident_event_type = Some(event.event_type.clone());
5126                                runtime.last_runtime_error = Some(detail.clone());
5127                            })
5128                            .await;
5129                        state.event_bus.publish(EngineEvent::new(
5130                            "bug_monitor.error",
5131                            serde_json::json!({
5132                                "eventType": event.event_type,
5133                                "detail": detail,
5134                            }),
5135                        ));
5136                    }
5137                }
5138            }
5139            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5140            Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => {
5141                state
5142                    .update_bug_monitor_runtime_status(|runtime| {
5143                        runtime.last_runtime_error =
5144                            Some(format!("Bug monitor lagged and dropped {count} events."));
5145                    })
5146                    .await;
5147            }
5148        }
5149    }
5150}
5151
5152pub async fn run_usage_aggregator(state: AppState) {
5153    if !state.wait_until_ready_or_failed(120, 250).await {
5154        tracing::warn!("usage aggregator: skipped because runtime did not become ready");
5155        return;
5156    }
5157    let mut rx = state.event_bus.subscribe();
5158    loop {
5159        match rx.recv().await {
5160            Ok(event) => {
5161                if event.event_type != "provider.usage" {
5162                    continue;
5163                }
5164                let session_id = event
5165                    .properties
5166                    .get("sessionID")
5167                    .and_then(|v| v.as_str())
5168                    .unwrap_or("");
5169                if session_id.is_empty() {
5170                    continue;
5171                }
5172                let prompt_tokens = event
5173                    .properties
5174                    .get("promptTokens")
5175                    .and_then(|v| v.as_u64())
5176                    .unwrap_or(0);
5177                let completion_tokens = event
5178                    .properties
5179                    .get("completionTokens")
5180                    .and_then(|v| v.as_u64())
5181                    .unwrap_or(0);
5182                let total_tokens = event
5183                    .properties
5184                    .get("totalTokens")
5185                    .and_then(|v| v.as_u64())
5186                    .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
5187                state
5188                    .apply_provider_usage_to_runs(
5189                        session_id,
5190                        prompt_tokens,
5191                        completion_tokens,
5192                        total_tokens,
5193                    )
5194                    .await;
5195            }
5196            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5197            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5198        }
5199    }
5200}
5201
5202fn is_bug_monitor_candidate_event(event: &EngineEvent) -> bool {
5203    if event.event_type.starts_with("bug_monitor.") {
5204        return false;
5205    }
5206    matches!(
5207        event.event_type.as_str(),
5208        "context.task.failed" | "workflow.run.failed" | "routine.run.failed" | "session.error"
5209    )
5210}
5211
5212async fn process_bug_monitor_event(
5213    state: &AppState,
5214    event: &EngineEvent,
5215    config: &BugMonitorConfig,
5216) -> anyhow::Result<BugMonitorIncidentRecord> {
5217    let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
5218    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5219        state,
5220        submission.repo.as_deref().unwrap_or_default(),
5221        submission.fingerprint.as_deref().unwrap_or_default(),
5222        submission.title.as_deref(),
5223        submission.detail.as_deref(),
5224        &submission.excerpt,
5225        3,
5226    )
5227    .await;
5228    let fingerprint = submission
5229        .fingerprint
5230        .clone()
5231        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5232    let default_workspace_root = state.workspace_index.snapshot().await.root;
5233    let workspace_root = config
5234        .workspace_root
5235        .clone()
5236        .unwrap_or(default_workspace_root);
5237    let now = now_ms();
5238
5239    let existing = state
5240        .bug_monitor_incidents
5241        .read()
5242        .await
5243        .values()
5244        .find(|row| row.fingerprint == fingerprint)
5245        .cloned();
5246
5247    let mut incident = if let Some(mut row) = existing {
5248        row.occurrence_count = row.occurrence_count.saturating_add(1);
5249        row.updated_at_ms = now;
5250        row.last_seen_at_ms = Some(now);
5251        if row.excerpt.is_empty() {
5252            row.excerpt = submission.excerpt.clone();
5253        }
5254        row
5255    } else {
5256        BugMonitorIncidentRecord {
5257            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5258            fingerprint: fingerprint.clone(),
5259            event_type: event.event_type.clone(),
5260            status: "queued".to_string(),
5261            repo: submission.repo.clone().unwrap_or_default(),
5262            workspace_root,
5263            title: submission
5264                .title
5265                .clone()
5266                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5267            detail: submission.detail.clone(),
5268            excerpt: submission.excerpt.clone(),
5269            source: submission.source.clone(),
5270            run_id: submission.run_id.clone(),
5271            session_id: submission.session_id.clone(),
5272            correlation_id: submission.correlation_id.clone(),
5273            component: submission.component.clone(),
5274            level: submission.level.clone(),
5275            occurrence_count: 1,
5276            created_at_ms: now,
5277            updated_at_ms: now,
5278            last_seen_at_ms: Some(now),
5279            draft_id: None,
5280            triage_run_id: None,
5281            last_error: None,
5282            duplicate_summary: None,
5283            duplicate_matches: None,
5284            event_payload: Some(event.properties.clone()),
5285        }
5286    };
5287    state.put_bug_monitor_incident(incident.clone()).await?;
5288
5289    if !duplicate_matches.is_empty() {
5290        incident.status = "duplicate_suppressed".to_string();
5291        let duplicate_summary =
5292            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5293        incident.duplicate_summary = Some(duplicate_summary.clone());
5294        incident.duplicate_matches = Some(duplicate_matches.clone());
5295        incident.updated_at_ms = now_ms();
5296        state.put_bug_monitor_incident(incident.clone()).await?;
5297        state.event_bus.publish(EngineEvent::new(
5298            "bug_monitor.incident.duplicate_suppressed",
5299            serde_json::json!({
5300                "incident_id": incident.incident_id,
5301                "fingerprint": incident.fingerprint,
5302                "eventType": incident.event_type,
5303                "status": incident.status,
5304                "duplicate_summary": duplicate_summary,
5305                "duplicate_matches": duplicate_matches,
5306            }),
5307        ));
5308        return Ok(incident);
5309    }
5310
5311    let draft = match state.submit_bug_monitor_draft(submission).await {
5312        Ok(draft) => draft,
5313        Err(error) => {
5314            incident.status = "draft_failed".to_string();
5315            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5316            incident.updated_at_ms = now_ms();
5317            state.put_bug_monitor_incident(incident.clone()).await?;
5318            state.event_bus.publish(EngineEvent::new(
5319                "bug_monitor.incident.detected",
5320                serde_json::json!({
5321                    "incident_id": incident.incident_id,
5322                    "fingerprint": incident.fingerprint,
5323                    "eventType": incident.event_type,
5324                    "draft_id": incident.draft_id,
5325                    "triage_run_id": incident.triage_run_id,
5326                    "status": incident.status,
5327                    "detail": incident.last_error,
5328                }),
5329            ));
5330            return Ok(incident);
5331        }
5332    };
5333    incident.draft_id = Some(draft.draft_id.clone());
5334    incident.status = "draft_created".to_string();
5335    state.put_bug_monitor_incident(incident.clone()).await?;
5336
5337    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5338        state.clone(),
5339        &draft.draft_id,
5340        true,
5341    )
5342    .await
5343    {
5344        Ok((updated_draft, _run_id, _deduped)) => {
5345            incident.triage_run_id = updated_draft.triage_run_id.clone();
5346            if incident.triage_run_id.is_some() {
5347                incident.status = "triage_queued".to_string();
5348            }
5349            incident.last_error = None;
5350        }
5351        Err(error) => {
5352            incident.status = "draft_created".to_string();
5353            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5354        }
5355    }
5356
5357    if let Some(draft_id) = incident.draft_id.clone() {
5358        let latest_draft = state
5359            .get_bug_monitor_draft(&draft_id)
5360            .await
5361            .unwrap_or(draft.clone());
5362        match crate::bug_monitor_github::publish_draft(
5363            state,
5364            &draft_id,
5365            Some(&incident.incident_id),
5366            crate::bug_monitor_github::PublishMode::Auto,
5367        )
5368        .await
5369        {
5370            Ok(outcome) => {
5371                incident.status = outcome.action;
5372                incident.last_error = None;
5373            }
5374            Err(error) => {
5375                let detail = truncate_text(&error.to_string(), 500);
5376                incident.last_error = Some(detail.clone());
5377                let mut failed_draft = latest_draft;
5378                failed_draft.status = "github_post_failed".to_string();
5379                failed_draft.github_status = Some("github_post_failed".to_string());
5380                failed_draft.last_post_error = Some(detail.clone());
5381                let evidence_digest = failed_draft.evidence_digest.clone();
5382                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5383                let _ = crate::bug_monitor_github::record_post_failure(
5384                    state,
5385                    &failed_draft,
5386                    Some(&incident.incident_id),
5387                    "auto_post",
5388                    evidence_digest.as_deref(),
5389                    &detail,
5390                )
5391                .await;
5392            }
5393        }
5394    }
5395
5396    incident.updated_at_ms = now_ms();
5397    state.put_bug_monitor_incident(incident.clone()).await?;
5398    state.event_bus.publish(EngineEvent::new(
5399        "bug_monitor.incident.detected",
5400        serde_json::json!({
5401            "incident_id": incident.incident_id,
5402            "fingerprint": incident.fingerprint,
5403            "eventType": incident.event_type,
5404            "draft_id": incident.draft_id,
5405            "triage_run_id": incident.triage_run_id,
5406            "status": incident.status,
5407        }),
5408    ));
5409    Ok(incident)
5410}
5411
5412async fn build_bug_monitor_submission_from_event(
5413    state: &AppState,
5414    config: &BugMonitorConfig,
5415    event: &EngineEvent,
5416) -> anyhow::Result<BugMonitorSubmission> {
5417    let repo = config
5418        .repo
5419        .clone()
5420        .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
5421    let default_workspace_root = state.workspace_index.snapshot().await.root;
5422    let workspace_root = config
5423        .workspace_root
5424        .clone()
5425        .unwrap_or(default_workspace_root);
5426    let reason = first_string(
5427        &event.properties,
5428        &["reason", "error", "detail", "message", "summary"],
5429    );
5430    let run_id = first_string(&event.properties, &["runID", "run_id"]);
5431    let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
5432    let correlation_id = first_string(
5433        &event.properties,
5434        &["correlationID", "correlation_id", "commandID", "command_id"],
5435    );
5436    let component = first_string(
5437        &event.properties,
5438        &[
5439            "component",
5440            "routineID",
5441            "routine_id",
5442            "workflowID",
5443            "workflow_id",
5444            "task",
5445            "title",
5446        ],
5447    );
5448    let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
5449    if excerpt.is_empty() {
5450        if let Some(reason) = reason.as_ref() {
5451            excerpt.push(reason.clone());
5452        }
5453    }
5454    let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
5455    let fingerprint = sha256_hex(&[
5456        repo.as_str(),
5457        workspace_root.as_str(),
5458        event.event_type.as_str(),
5459        reason.as_deref().unwrap_or(""),
5460        run_id.as_deref().unwrap_or(""),
5461        session_id.as_deref().unwrap_or(""),
5462        correlation_id.as_deref().unwrap_or(""),
5463        component.as_deref().unwrap_or(""),
5464        serialized.as_str(),
5465    ]);
5466    let title = if let Some(component) = component.as_ref() {
5467        format!("{} failure in {}", event.event_type, component)
5468    } else {
5469        format!("{} detected", event.event_type)
5470    };
5471    let mut detail_lines = vec![
5472        format!("event_type: {}", event.event_type),
5473        format!("workspace_root: {}", workspace_root),
5474    ];
5475    if let Some(reason) = reason.as_ref() {
5476        detail_lines.push(format!("reason: {reason}"));
5477    }
5478    if let Some(run_id) = run_id.as_ref() {
5479        detail_lines.push(format!("run_id: {run_id}"));
5480    }
5481    if let Some(session_id) = session_id.as_ref() {
5482        detail_lines.push(format!("session_id: {session_id}"));
5483    }
5484    if let Some(correlation_id) = correlation_id.as_ref() {
5485        detail_lines.push(format!("correlation_id: {correlation_id}"));
5486    }
5487    if let Some(component) = component.as_ref() {
5488        detail_lines.push(format!("component: {component}"));
5489    }
5490    if !serialized.trim().is_empty() {
5491        detail_lines.push(String::new());
5492        detail_lines.push("payload:".to_string());
5493        detail_lines.push(truncate_text(&serialized, 2_000));
5494    }
5495
5496    Ok(BugMonitorSubmission {
5497        repo: Some(repo),
5498        title: Some(title),
5499        detail: Some(detail_lines.join("\n")),
5500        source: Some("tandem_events".to_string()),
5501        run_id,
5502        session_id,
5503        correlation_id,
5504        file_name: None,
5505        process: Some("tandem-engine".to_string()),
5506        component,
5507        event: Some(event.event_type.clone()),
5508        level: Some("error".to_string()),
5509        excerpt,
5510        fingerprint: Some(fingerprint),
5511    })
5512}
5513
5514async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
5515    let mut excerpt = Vec::new();
5516    if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
5517        excerpt.push(reason);
5518    }
5519    if let Some(title) = first_string(properties, &["title", "task"]) {
5520        if !excerpt.iter().any(|row| row == &title) {
5521            excerpt.push(title);
5522        }
5523    }
5524    let logs = state.logs.read().await;
5525    for entry in logs.iter().rev().take(3) {
5526        if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
5527            excerpt.push(truncate_text(message, 240));
5528        }
5529    }
5530    excerpt.truncate(8);
5531    excerpt
5532}
5533
5534fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
5535    for key in keys {
5536        if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
5537            let trimmed = value.trim();
5538            if !trimmed.is_empty() {
5539                return Some(trimmed.to_string());
5540            }
5541        }
5542    }
5543    None
5544}
5545
5546fn sha256_hex(parts: &[&str]) -> String {
5547    let mut hasher = Sha256::new();
5548    for part in parts {
5549        hasher.update(part.as_bytes());
5550        hasher.update([0u8]);
5551    }
5552    format!("{:x}", hasher.finalize())
5553}
5554
5555pub async fn run_routine_scheduler(state: AppState) {
5556    loop {
5557        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5558        let now = now_ms();
5559        let plans = state.evaluate_routine_misfires(now).await;
5560        for plan in plans {
5561            let Some(routine) = state.get_routine(&plan.routine_id).await else {
5562                continue;
5563            };
5564            match evaluate_routine_execution_policy(&routine, "scheduled") {
5565                RoutineExecutionDecision::Allowed => {
5566                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
5567                    let run = state
5568                        .create_routine_run(
5569                            &routine,
5570                            "scheduled",
5571                            plan.run_count,
5572                            RoutineRunStatus::Queued,
5573                            None,
5574                        )
5575                        .await;
5576                    state
5577                        .append_routine_history(RoutineHistoryEvent {
5578                            routine_id: plan.routine_id.clone(),
5579                            trigger_type: "scheduled".to_string(),
5580                            run_count: plan.run_count,
5581                            fired_at_ms: now,
5582                            status: "queued".to_string(),
5583                            detail: None,
5584                        })
5585                        .await;
5586                    state.event_bus.publish(EngineEvent::new(
5587                        "routine.fired",
5588                        serde_json::json!({
5589                            "routineID": plan.routine_id,
5590                            "runID": run.run_id,
5591                            "runCount": plan.run_count,
5592                            "scheduledAtMs": plan.scheduled_at_ms,
5593                            "nextFireAtMs": plan.next_fire_at_ms,
5594                        }),
5595                    ));
5596                    state.event_bus.publish(EngineEvent::new(
5597                        "routine.run.created",
5598                        serde_json::json!({
5599                            "run": run,
5600                        }),
5601                    ));
5602                }
5603                RoutineExecutionDecision::RequiresApproval { reason } => {
5604                    let run = state
5605                        .create_routine_run(
5606                            &routine,
5607                            "scheduled",
5608                            plan.run_count,
5609                            RoutineRunStatus::PendingApproval,
5610                            Some(reason.clone()),
5611                        )
5612                        .await;
5613                    state
5614                        .append_routine_history(RoutineHistoryEvent {
5615                            routine_id: plan.routine_id.clone(),
5616                            trigger_type: "scheduled".to_string(),
5617                            run_count: plan.run_count,
5618                            fired_at_ms: now,
5619                            status: "pending_approval".to_string(),
5620                            detail: Some(reason.clone()),
5621                        })
5622                        .await;
5623                    state.event_bus.publish(EngineEvent::new(
5624                        "routine.approval_required",
5625                        serde_json::json!({
5626                            "routineID": plan.routine_id,
5627                            "runID": run.run_id,
5628                            "runCount": plan.run_count,
5629                            "triggerType": "scheduled",
5630                            "reason": reason,
5631                        }),
5632                    ));
5633                    state.event_bus.publish(EngineEvent::new(
5634                        "routine.run.created",
5635                        serde_json::json!({
5636                            "run": run,
5637                        }),
5638                    ));
5639                }
5640                RoutineExecutionDecision::Blocked { reason } => {
5641                    let run = state
5642                        .create_routine_run(
5643                            &routine,
5644                            "scheduled",
5645                            plan.run_count,
5646                            RoutineRunStatus::BlockedPolicy,
5647                            Some(reason.clone()),
5648                        )
5649                        .await;
5650                    state
5651                        .append_routine_history(RoutineHistoryEvent {
5652                            routine_id: plan.routine_id.clone(),
5653                            trigger_type: "scheduled".to_string(),
5654                            run_count: plan.run_count,
5655                            fired_at_ms: now,
5656                            status: "blocked_policy".to_string(),
5657                            detail: Some(reason.clone()),
5658                        })
5659                        .await;
5660                    state.event_bus.publish(EngineEvent::new(
5661                        "routine.blocked",
5662                        serde_json::json!({
5663                            "routineID": plan.routine_id,
5664                            "runID": run.run_id,
5665                            "runCount": plan.run_count,
5666                            "triggerType": "scheduled",
5667                            "reason": reason,
5668                        }),
5669                    ));
5670                    state.event_bus.publish(EngineEvent::new(
5671                        "routine.run.created",
5672                        serde_json::json!({
5673                            "run": run,
5674                        }),
5675                    ));
5676                }
5677            }
5678        }
5679    }
5680}
5681
5682pub async fn run_routine_executor(state: AppState) {
5683    loop {
5684        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5685        let Some(run) = state.claim_next_queued_routine_run().await else {
5686            continue;
5687        };
5688
5689        state.event_bus.publish(EngineEvent::new(
5690            "routine.run.started",
5691            serde_json::json!({
5692                "runID": run.run_id,
5693                "routineID": run.routine_id,
5694                "triggerType": run.trigger_type,
5695                "startedAtMs": now_ms(),
5696            }),
5697        ));
5698
5699        let workspace_root = state.workspace_index.snapshot().await.root;
5700        let mut session = Session::new(
5701            Some(format!("Routine {}", run.routine_id)),
5702            Some(workspace_root.clone()),
5703        );
5704        let session_id = session.id.clone();
5705        session.workspace_root = Some(workspace_root);
5706
5707        if let Err(error) = state.storage.save_session(session).await {
5708            let detail = format!("failed to create routine session: {error}");
5709            let _ = state
5710                .update_routine_run_status(
5711                    &run.run_id,
5712                    RoutineRunStatus::Failed,
5713                    Some(detail.clone()),
5714                )
5715                .await;
5716            state.event_bus.publish(EngineEvent::new(
5717                "routine.run.failed",
5718                serde_json::json!({
5719                    "runID": run.run_id,
5720                    "routineID": run.routine_id,
5721                    "reason": detail,
5722                }),
5723            ));
5724            continue;
5725        }
5726
5727        state
5728            .set_routine_session_policy(
5729                session_id.clone(),
5730                run.run_id.clone(),
5731                run.routine_id.clone(),
5732                run.allowed_tools.clone(),
5733            )
5734            .await;
5735        state
5736            .add_active_session_id(&run.run_id, session_id.clone())
5737            .await;
5738        state
5739            .engine_loop
5740            .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
5741            .await;
5742        state
5743            .engine_loop
5744            .set_session_auto_approve_permissions(&session_id, true)
5745            .await;
5746
5747        let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
5748        if let Some(spec) = selected_model.as_ref() {
5749            state.event_bus.publish(EngineEvent::new(
5750                "routine.run.model_selected",
5751                serde_json::json!({
5752                    "runID": run.run_id,
5753                    "routineID": run.routine_id,
5754                    "providerID": spec.provider_id,
5755                    "modelID": spec.model_id,
5756                    "source": model_source,
5757                }),
5758            ));
5759        }
5760
5761        let request = SendMessageRequest {
5762            parts: vec![MessagePartInput::Text {
5763                text: build_routine_prompt(&state, &run).await,
5764            }],
5765            model: selected_model,
5766            agent: None,
5767            tool_mode: None,
5768            tool_allowlist: None,
5769            context_mode: None,
5770            write_required: None,
5771        };
5772
5773        let run_result = state
5774            .engine_loop
5775            .run_prompt_async_with_context(
5776                session_id.clone(),
5777                request,
5778                Some(format!("routine:{}", run.run_id)),
5779            )
5780            .await;
5781
5782        state.clear_routine_session_policy(&session_id).await;
5783        state
5784            .clear_active_session_id(&run.run_id, &session_id)
5785            .await;
5786        state
5787            .engine_loop
5788            .clear_session_allowed_tools(&session_id)
5789            .await;
5790        state
5791            .engine_loop
5792            .clear_session_auto_approve_permissions(&session_id)
5793            .await;
5794
5795        match run_result {
5796            Ok(()) => {
5797                append_configured_output_artifacts(&state, &run).await;
5798                let _ = state
5799                    .update_routine_run_status(
5800                        &run.run_id,
5801                        RoutineRunStatus::Completed,
5802                        Some("routine run completed".to_string()),
5803                    )
5804                    .await;
5805                state.event_bus.publish(EngineEvent::new(
5806                    "routine.run.completed",
5807                    serde_json::json!({
5808                        "runID": run.run_id,
5809                        "routineID": run.routine_id,
5810                        "sessionID": session_id,
5811                        "finishedAtMs": now_ms(),
5812                    }),
5813                ));
5814            }
5815            Err(error) => {
5816                if let Some(latest) = state.get_routine_run(&run.run_id).await {
5817                    if latest.status == RoutineRunStatus::Paused {
5818                        state.event_bus.publish(EngineEvent::new(
5819                            "routine.run.paused",
5820                            serde_json::json!({
5821                                "runID": run.run_id,
5822                                "routineID": run.routine_id,
5823                                "sessionID": session_id,
5824                                "finishedAtMs": now_ms(),
5825                            }),
5826                        ));
5827                        continue;
5828                    }
5829                }
5830                let detail = truncate_text(&error.to_string(), 500);
5831                let _ = state
5832                    .update_routine_run_status(
5833                        &run.run_id,
5834                        RoutineRunStatus::Failed,
5835                        Some(detail.clone()),
5836                    )
5837                    .await;
5838                state.event_bus.publish(EngineEvent::new(
5839                    "routine.run.failed",
5840                    serde_json::json!({
5841                        "runID": run.run_id,
5842                        "routineID": run.routine_id,
5843                        "sessionID": session_id,
5844                        "reason": detail,
5845                        "finishedAtMs": now_ms(),
5846                    }),
5847                ));
5848            }
5849        }
5850    }
5851}
5852
5853pub async fn run_automation_v2_scheduler(state: AppState) {
5854    loop {
5855        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5856        let startup = state.startup_snapshot().await;
5857        if !matches!(startup.status, StartupStatus::Ready) {
5858            continue;
5859        }
5860        let now = now_ms();
5861        let due = state.evaluate_automation_v2_misfires(now).await;
5862        for automation_id in due {
5863            let Some(automation) = state.get_automation_v2(&automation_id).await else {
5864                continue;
5865            };
5866            if let Ok(run) = state
5867                .create_automation_v2_run(&automation, "scheduled")
5868                .await
5869            {
5870                state.event_bus.publish(EngineEvent::new(
5871                    "automation.v2.run.created",
5872                    serde_json::json!({
5873                        "automationID": automation_id,
5874                        "run": run,
5875                        "triggerType": "scheduled",
5876                    }),
5877                ));
5878            }
5879        }
5880    }
5881}
5882
5883fn build_automation_v2_upstream_inputs(
5884    run: &AutomationV2RunRecord,
5885    node: &AutomationFlowNode,
5886) -> anyhow::Result<Vec<Value>> {
5887    let mut inputs = Vec::new();
5888    for input_ref in &node.input_refs {
5889        let Some(output) = run.checkpoint.node_outputs.get(&input_ref.from_step_id) else {
5890            anyhow::bail!(
5891                "missing upstream output for `{}` referenced by node `{}`",
5892                input_ref.from_step_id,
5893                node.node_id
5894            );
5895        };
5896        inputs.push(json!({
5897            "alias": input_ref.alias,
5898            "from_step_id": input_ref.from_step_id,
5899            "output": output,
5900        }));
5901    }
5902    Ok(inputs)
5903}
5904
5905fn render_automation_v2_prompt(
5906    automation: &AutomationV2Spec,
5907    run_id: &str,
5908    node: &AutomationFlowNode,
5909    agent: &AutomationAgentProfile,
5910    upstream_inputs: &[Value],
5911    template_system_prompt: Option<&str>,
5912    standup_report_path: Option<&str>,
5913    memory_project_id: Option<&str>,
5914) -> String {
5915    let contract_kind = node
5916        .output_contract
5917        .as_ref()
5918        .map(|contract| contract.kind.as_str())
5919        .unwrap_or("structured_json");
5920    let mut sections = Vec::new();
5921    if let Some(system_prompt) = template_system_prompt
5922        .map(str::trim)
5923        .filter(|value| !value.is_empty())
5924    {
5925        sections.push(format!("Template system prompt:\n{}", system_prompt));
5926    }
5927    sections.push(format!(
5928        "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}\nOutput contract kind: {}",
5929        automation.automation_id, run_id, node.node_id, agent.display_name, node.objective, contract_kind
5930    ));
5931    let mut prompt = sections.join("\n\n");
5932    if !upstream_inputs.is_empty() {
5933        prompt.push_str("\n\nUpstream Inputs:");
5934        for input in upstream_inputs {
5935            let alias = input
5936                .get("alias")
5937                .and_then(Value::as_str)
5938                .unwrap_or("input");
5939            let from_step_id = input
5940                .get("from_step_id")
5941                .and_then(Value::as_str)
5942                .unwrap_or("unknown");
5943            let output = input.get("output").cloned().unwrap_or(Value::Null);
5944            let rendered =
5945                serde_json::to_string_pretty(&output).unwrap_or_else(|_| output.to_string());
5946            prompt.push_str(&format!(
5947                "\n- {}\n  from_step_id: {}\n  output:\n{}",
5948                alias,
5949                from_step_id,
5950                rendered
5951                    .lines()
5952                    .map(|line| format!("    {}", line))
5953                    .collect::<Vec<_>>()
5954                    .join("\n")
5955            ));
5956        }
5957    }
5958    if node.node_id == "notify_user" || node.objective.to_ascii_lowercase().contains("email") {
5959        prompt.push_str(
5960            "\n\nDelivery rules:\n- Prefer inline email body delivery by default.\n- Only include an email attachment when upstream inputs contain a concrete attachment artifact with a non-empty s3key or upload result.\n- Never send an attachment parameter with an empty or null s3key.\n- If no attachment artifact exists, omit the attachment parameter entirely.",
5961        );
5962    }
5963    if let Some(report_path) = standup_report_path
5964        .map(str::trim)
5965        .filter(|value| !value.is_empty())
5966    {
5967        prompt.push_str(&format!(
5968            "\n\nStandup report path:\n- Write the final markdown report to `{}` relative to the workspace root.\n- Use the `write` tool for the report.\n- The report must remain inside the workspace.",
5969            report_path
5970        ));
5971    }
5972    if let Some(project_id) = memory_project_id
5973        .map(str::trim)
5974        .filter(|value| !value.is_empty())
5975    {
5976        prompt.push_str(&format!(
5977            "\n\nMemory search scope:\n- `memory_search` defaults to the current session, current project, and global memory.\n- Current project_id: `{}`.\n- Use `tier: \"project\"` when you need recall limited to this workspace.\n- Use workspace files via `glob`, `grep`, and `read` when memory is sparse or stale.",
5978            project_id
5979        ));
5980    }
5981    prompt.push_str(
5982        "\n\nReturn a concise completion. If you produce structured content, keep it valid JSON inside the response body.",
5983    );
5984    prompt
5985}
5986
5987fn is_agent_standup_automation(automation: &AutomationV2Spec) -> bool {
5988    automation
5989        .metadata
5990        .as_ref()
5991        .and_then(|value| value.get("feature"))
5992        .and_then(Value::as_str)
5993        .map(|value| value == "agent_standup")
5994        .unwrap_or(false)
5995}
5996
5997fn resolve_standup_report_path_template(automation: &AutomationV2Spec) -> Option<String> {
5998    automation
5999        .metadata
6000        .as_ref()
6001        .and_then(|value| value.get("standup"))
6002        .and_then(|value| value.get("report_path_template"))
6003        .and_then(Value::as_str)
6004        .map(|value| value.trim().to_string())
6005        .filter(|value| !value.is_empty())
6006}
6007
6008fn resolve_standup_report_path_for_run(
6009    automation: &AutomationV2Spec,
6010    started_at_ms: u64,
6011) -> Option<String> {
6012    let template = resolve_standup_report_path_template(automation)?;
6013    if !template.contains("{{date}}") {
6014        return Some(template);
6015    }
6016    let date = chrono::DateTime::<chrono::Utc>::from_timestamp_millis(started_at_ms as i64)
6017        .unwrap_or_else(chrono::Utc::now)
6018        .format("%Y-%m-%d")
6019        .to_string();
6020    Some(template.replace("{{date}}", &date))
6021}
6022
6023fn automation_workspace_project_id(workspace_root: &str) -> String {
6024    tandem_core::workspace_project_id(workspace_root)
6025        .unwrap_or_else(|| "workspace-unknown".to_string())
6026}
6027
6028fn merge_automation_agent_allowlist(
6029    agent: &AutomationAgentProfile,
6030    template: Option<&tandem_orchestrator::AgentTemplate>,
6031) -> Vec<String> {
6032    let mut allowlist = if agent.tool_policy.allowlist.is_empty() {
6033        template
6034            .map(|value| value.capabilities.tool_allowlist.clone())
6035            .unwrap_or_default()
6036    } else {
6037        agent.tool_policy.allowlist.clone()
6038    };
6039    allowlist.sort();
6040    allowlist.dedup();
6041    allowlist
6042}
6043
6044fn resolve_automation_agent_model(
6045    agent: &AutomationAgentProfile,
6046    template: Option<&tandem_orchestrator::AgentTemplate>,
6047) -> Option<ModelSpec> {
6048    if let Some(model) = agent
6049        .model_policy
6050        .as_ref()
6051        .and_then(|policy| policy.get("default_model"))
6052        .and_then(parse_model_spec)
6053    {
6054        return Some(model);
6055    }
6056    template
6057        .and_then(|value| value.default_model.as_ref())
6058        .and_then(parse_model_spec)
6059}
6060
6061fn extract_session_text_output(session: &Session) -> String {
6062    session
6063        .messages
6064        .iter()
6065        .rev()
6066        .find(|message| matches!(message.role, MessageRole::Assistant))
6067        .map(|message| {
6068            message
6069                .parts
6070                .iter()
6071                .filter_map(|part| match part {
6072                    MessagePart::Text { text } | MessagePart::Reasoning { text } => {
6073                        Some(text.as_str())
6074                    }
6075                    MessagePart::ToolInvocation { .. } => None,
6076                })
6077                .collect::<Vec<_>>()
6078                .join("\n")
6079        })
6080        .unwrap_or_default()
6081}
6082
6083fn wrap_automation_node_output(
6084    node: &AutomationFlowNode,
6085    session_id: &str,
6086    session_text: &str,
6087) -> Value {
6088    let contract_kind = node
6089        .output_contract
6090        .as_ref()
6091        .map(|contract| contract.kind.clone())
6092        .unwrap_or_else(|| "structured_json".to_string());
6093    let summary = if session_text.trim().is_empty() {
6094        format!("Node `{}` completed successfully.", node.node_id)
6095    } else {
6096        truncate_text(session_text.trim(), 240)
6097    };
6098    let content = match contract_kind.as_str() {
6099        "report_markdown" | "text_summary" => {
6100            json!({ "text": session_text.trim(), "session_id": session_id })
6101        }
6102        "urls" => json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id }),
6103        "citations" => {
6104            json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id })
6105        }
6106        _ => json!({ "text": session_text.trim(), "session_id": session_id }),
6107    };
6108    json!(AutomationNodeOutput {
6109        contract_kind,
6110        summary,
6111        content,
6112        created_at_ms: now_ms(),
6113        node_id: node.node_id.clone(),
6114    })
6115}
6116
6117fn automation_node_max_attempts(node: &AutomationFlowNode) -> u32 {
6118    node.retry_policy
6119        .as_ref()
6120        .and_then(|value| value.get("max_attempts"))
6121        .and_then(Value::as_u64)
6122        .map(|value| value.clamp(1, 10) as u32)
6123        .unwrap_or(3)
6124}
6125
6126async fn resolve_automation_v2_workspace_root(
6127    state: &AppState,
6128    automation: &AutomationV2Spec,
6129) -> String {
6130    if let Some(workspace_root) = automation
6131        .workspace_root
6132        .as_deref()
6133        .map(str::trim)
6134        .filter(|value| !value.is_empty())
6135        .map(str::to_string)
6136    {
6137        return workspace_root;
6138    }
6139    if let Some(workspace_root) = automation
6140        .metadata
6141        .as_ref()
6142        .and_then(|row| row.get("workspace_root"))
6143        .and_then(Value::as_str)
6144        .map(str::trim)
6145        .filter(|value| !value.is_empty())
6146        .map(str::to_string)
6147    {
6148        return workspace_root;
6149    }
6150    state.workspace_index.snapshot().await.root
6151}
6152
6153async fn execute_automation_v2_node(
6154    state: &AppState,
6155    run_id: &str,
6156    automation: &AutomationV2Spec,
6157    node: &AutomationFlowNode,
6158    agent: &AutomationAgentProfile,
6159) -> anyhow::Result<Value> {
6160    let run = state
6161        .get_automation_v2_run(run_id)
6162        .await
6163        .ok_or_else(|| anyhow::anyhow!("automation run `{}` not found", run_id))?;
6164    let upstream_inputs = build_automation_v2_upstream_inputs(&run, node)?;
6165    let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
6166    let workspace_path = PathBuf::from(&workspace_root);
6167    if !workspace_path.exists() {
6168        anyhow::bail!(
6169            "workspace_root `{}` for automation `{}` does not exist",
6170            workspace_root,
6171            automation.automation_id
6172        );
6173    }
6174    if !workspace_path.is_dir() {
6175        anyhow::bail!(
6176            "workspace_root `{}` for automation `{}` is not a directory",
6177            workspace_root,
6178            automation.automation_id
6179        );
6180    }
6181    let template = if let Some(template_id) = agent.template_id.as_deref().map(str::trim) {
6182        if template_id.is_empty() {
6183            None
6184        } else {
6185            state
6186                .agent_teams
6187                .get_template_for_workspace(&workspace_root, template_id)
6188                .await?
6189                .ok_or_else(|| anyhow::anyhow!("agent template `{}` not found", template_id))
6190                .map(Some)?
6191        }
6192    } else {
6193        None
6194    };
6195    let mut session = Session::new(
6196        Some(format!(
6197            "Automation {} / {}",
6198            automation.automation_id, node.node_id
6199        )),
6200        Some(workspace_root.clone()),
6201    );
6202    let session_id = session.id.clone();
6203    let project_id = automation_workspace_project_id(&workspace_root);
6204    session.project_id = Some(project_id.clone());
6205    session.workspace_root = Some(workspace_root);
6206    state.storage.save_session(session).await?;
6207
6208    state.add_automation_v2_session(run_id, &session_id).await;
6209
6210    let mut allowlist = merge_automation_agent_allowlist(agent, template.as_ref());
6211    if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
6212        allowlist.extend(mcp_tools.clone());
6213    }
6214    state
6215        .engine_loop
6216        .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
6217        .await;
6218    state
6219        .engine_loop
6220        .set_session_auto_approve_permissions(&session_id, true)
6221        .await;
6222
6223    let model = resolve_automation_agent_model(agent, template.as_ref());
6224    let standup_report_path = if is_agent_standup_automation(automation)
6225        && node.node_id == "standup_synthesis"
6226    {
6227        resolve_standup_report_path_for_run(automation, run.started_at_ms.unwrap_or_else(now_ms))
6228    } else {
6229        None
6230    };
6231    let prompt = render_automation_v2_prompt(
6232        automation,
6233        run_id,
6234        node,
6235        agent,
6236        &upstream_inputs,
6237        template
6238            .as_ref()
6239            .and_then(|value| value.system_prompt.as_deref()),
6240        standup_report_path.as_deref(),
6241        if is_agent_standup_automation(automation) {
6242            Some(project_id.as_str())
6243        } else {
6244            None
6245        },
6246    );
6247    let req = SendMessageRequest {
6248        parts: vec![MessagePartInput::Text { text: prompt }],
6249        model,
6250        agent: None,
6251        tool_mode: None,
6252        tool_allowlist: None,
6253        context_mode: None,
6254        write_required: None,
6255    };
6256    let result = state
6257        .engine_loop
6258        .run_prompt_async_with_context(
6259            session_id.clone(),
6260            req,
6261            Some(format!("automation-v2:{run_id}")),
6262        )
6263        .await;
6264
6265    state
6266        .engine_loop
6267        .clear_session_allowed_tools(&session_id)
6268        .await;
6269    state
6270        .engine_loop
6271        .clear_session_auto_approve_permissions(&session_id)
6272        .await;
6273    state.clear_automation_v2_session(run_id, &session_id).await;
6274
6275    result?;
6276    let session = state
6277        .storage
6278        .get_session(&session_id)
6279        .await
6280        .ok_or_else(|| anyhow::anyhow!("automation session `{}` missing after run", session_id))?;
6281    let session_text = extract_session_text_output(&session);
6282    Ok(wrap_automation_node_output(
6283        node,
6284        &session_id,
6285        &session_text,
6286    ))
6287}
6288
6289pub async fn run_automation_v2_executor(state: AppState) {
6290    loop {
6291        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
6292        let Some(run) = state.claim_next_queued_automation_v2_run().await else {
6293            continue;
6294        };
6295        let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
6296            let _ = state
6297                .update_automation_v2_run(&run.run_id, |row| {
6298                    row.status = AutomationRunStatus::Failed;
6299                    row.detail = Some("automation not found".to_string());
6300                })
6301                .await;
6302            continue;
6303        };
6304        let max_parallel = automation
6305            .execution
6306            .max_parallel_agents
6307            .unwrap_or(1)
6308            .clamp(1, 16) as usize;
6309
6310        loop {
6311            let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
6312                break;
6313            };
6314            if matches!(
6315                latest.status,
6316                AutomationRunStatus::Paused
6317                    | AutomationRunStatus::Pausing
6318                    | AutomationRunStatus::Cancelled
6319                    | AutomationRunStatus::Failed
6320                    | AutomationRunStatus::Completed
6321            ) {
6322                break;
6323            }
6324            if latest.checkpoint.pending_nodes.is_empty() {
6325                let _ = state
6326                    .update_automation_v2_run(&run.run_id, |row| {
6327                        row.status = AutomationRunStatus::Completed;
6328                        row.detail = Some("automation run completed".to_string());
6329                    })
6330                    .await;
6331                break;
6332            }
6333
6334            let completed = latest
6335                .checkpoint
6336                .completed_nodes
6337                .iter()
6338                .cloned()
6339                .collect::<std::collections::HashSet<_>>();
6340            let pending = latest.checkpoint.pending_nodes.clone();
6341            let runnable = pending
6342                .iter()
6343                .filter_map(|node_id| {
6344                    let node = automation
6345                        .flow
6346                        .nodes
6347                        .iter()
6348                        .find(|n| n.node_id == *node_id)?;
6349                    if node.depends_on.iter().all(|dep| completed.contains(dep)) {
6350                        Some(node.clone())
6351                    } else {
6352                        None
6353                    }
6354                })
6355                .take(max_parallel)
6356                .collect::<Vec<_>>();
6357
6358            if runnable.is_empty() {
6359                let _ = state
6360                    .update_automation_v2_run(&run.run_id, |row| {
6361                        row.status = AutomationRunStatus::Failed;
6362                        row.detail = Some("flow deadlock: no runnable nodes".to_string());
6363                    })
6364                    .await;
6365                break;
6366            }
6367
6368            let runnable_node_ids = runnable
6369                .iter()
6370                .map(|node| node.node_id.clone())
6371                .collect::<Vec<_>>();
6372            let _ = state
6373                .update_automation_v2_run(&run.run_id, |row| {
6374                    for node_id in &runnable_node_ids {
6375                        let attempts = row
6376                            .checkpoint
6377                            .node_attempts
6378                            .entry(node_id.clone())
6379                            .or_insert(0);
6380                        *attempts += 1;
6381                    }
6382                })
6383                .await;
6384
6385            let tasks = runnable
6386                .iter()
6387                .map(|node| {
6388                    let Some(agent) = automation
6389                        .agents
6390                        .iter()
6391                        .find(|a| a.agent_id == node.agent_id)
6392                        .cloned()
6393                    else {
6394                        return futures::future::ready((
6395                            node.node_id.clone(),
6396                            Err(anyhow::anyhow!("agent not found")),
6397                        ))
6398                        .boxed();
6399                    };
6400                    let state = state.clone();
6401                    let run_id = run.run_id.clone();
6402                    let automation = automation.clone();
6403                    let node = node.clone();
6404                    async move {
6405                        let result =
6406                            execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
6407                                .await;
6408                        (node.node_id, result)
6409                    }
6410                    .boxed()
6411                })
6412                .collect::<Vec<_>>();
6413            let outcomes = join_all(tasks).await;
6414
6415            let mut terminal_failure = None::<String>;
6416            let latest_attempts = state
6417                .get_automation_v2_run(&run.run_id)
6418                .await
6419                .map(|row| row.checkpoint.node_attempts)
6420                .unwrap_or_default();
6421            for (node_id, result) in outcomes {
6422                match result {
6423                    Ok(output) => {
6424                        let _ = state
6425                            .update_automation_v2_run(&run.run_id, |row| {
6426                                row.checkpoint.pending_nodes.retain(|id| id != &node_id);
6427                                if !row
6428                                    .checkpoint
6429                                    .completed_nodes
6430                                    .iter()
6431                                    .any(|id| id == &node_id)
6432                                {
6433                                    row.checkpoint.completed_nodes.push(node_id.clone());
6434                                }
6435                                row.checkpoint.node_outputs.insert(node_id.clone(), output);
6436                            })
6437                            .await;
6438                    }
6439                    Err(error) => {
6440                        let is_paused = state
6441                            .get_automation_v2_run(&run.run_id)
6442                            .await
6443                            .map(|row| row.status == AutomationRunStatus::Paused)
6444                            .unwrap_or(false);
6445                        if is_paused {
6446                            break;
6447                        }
6448                        let detail = truncate_text(&error.to_string(), 500);
6449                        let attempts = latest_attempts.get(&node_id).copied().unwrap_or(1);
6450                        let max_attempts = automation
6451                            .flow
6452                            .nodes
6453                            .iter()
6454                            .find(|row| row.node_id == node_id)
6455                            .map(automation_node_max_attempts)
6456                            .unwrap_or(1);
6457                        if attempts >= max_attempts {
6458                            terminal_failure = Some(format!(
6459                                "node `{}` failed after {}/{} attempts: {}",
6460                                node_id, attempts, max_attempts, detail
6461                            ));
6462                            break;
6463                        }
6464                        let _ = state
6465                            .update_automation_v2_run(&run.run_id, |row| {
6466                                row.detail = Some(format!(
6467                                    "retrying node `{}` after attempt {}/{} failed: {}",
6468                                    node_id, attempts, max_attempts, detail
6469                                ));
6470                            })
6471                            .await;
6472                    }
6473                }
6474            }
6475            if let Some(detail) = terminal_failure {
6476                let _ = state
6477                    .update_automation_v2_run(&run.run_id, |row| {
6478                        row.status = AutomationRunStatus::Failed;
6479                        row.detail = Some(detail);
6480                    })
6481                    .await;
6482                break;
6483            }
6484        }
6485    }
6486}
6487
6488async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
6489    let normalized_entrypoint = run.entrypoint.trim();
6490    let known_tool = state
6491        .tools
6492        .list()
6493        .await
6494        .into_iter()
6495        .any(|schema| schema.name == normalized_entrypoint);
6496    if known_tool {
6497        let args = if run.args.is_object() {
6498            run.args.clone()
6499        } else {
6500            serde_json::json!({})
6501        };
6502        return format!("/tool {} {}", normalized_entrypoint, args);
6503    }
6504
6505    if let Some(objective) = routine_objective_from_args(run) {
6506        return build_routine_mission_prompt(run, &objective);
6507    }
6508
6509    format!(
6510        "Execute routine '{}' using entrypoint '{}' with args: {}",
6511        run.routine_id, run.entrypoint, run.args
6512    )
6513}
6514
6515fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
6516    run.args
6517        .get("prompt")
6518        .and_then(|v| v.as_str())
6519        .map(str::trim)
6520        .filter(|v| !v.is_empty())
6521        .map(ToString::to_string)
6522}
6523
6524fn routine_mode_from_args(args: &Value) -> &str {
6525    args.get("mode")
6526        .and_then(|v| v.as_str())
6527        .map(str::trim)
6528        .filter(|v| !v.is_empty())
6529        .unwrap_or("standalone")
6530}
6531
6532fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
6533    args.get("success_criteria")
6534        .and_then(|v| v.as_array())
6535        .map(|rows| {
6536            rows.iter()
6537                .filter_map(|row| row.as_str())
6538                .map(str::trim)
6539                .filter(|row| !row.is_empty())
6540                .map(ToString::to_string)
6541                .collect::<Vec<_>>()
6542        })
6543        .unwrap_or_default()
6544}
6545
6546fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
6547    let mode = routine_mode_from_args(&run.args);
6548    let success_criteria = routine_success_criteria_from_args(&run.args);
6549    let orchestrator_only_tool_calls = run
6550        .args
6551        .get("orchestrator_only_tool_calls")
6552        .and_then(|v| v.as_bool())
6553        .unwrap_or(false);
6554
6555    let mut lines = vec![
6556        format!("Automation ID: {}", run.routine_id),
6557        format!("Run ID: {}", run.run_id),
6558        format!("Mode: {}", mode),
6559        format!("Mission Objective: {}", objective),
6560    ];
6561
6562    if !success_criteria.is_empty() {
6563        lines.push("Success Criteria:".to_string());
6564        for criterion in success_criteria {
6565            lines.push(format!("- {}", criterion));
6566        }
6567    }
6568
6569    if run.allowed_tools.is_empty() {
6570        lines.push("Allowed Tools: all available by current policy".to_string());
6571    } else {
6572        lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
6573    }
6574
6575    if run.output_targets.is_empty() {
6576        lines.push("Output Targets: none configured".to_string());
6577    } else {
6578        lines.push("Output Targets:".to_string());
6579        for target in &run.output_targets {
6580            lines.push(format!("- {}", target));
6581        }
6582    }
6583
6584    if mode.eq_ignore_ascii_case("orchestrated") {
6585        lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
6586        lines
6587            .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
6588        if orchestrator_only_tool_calls {
6589            lines.push(
6590                "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
6591                    .to_string(),
6592            );
6593        }
6594    } else {
6595        lines.push("Execution Pattern: Standalone mission run".to_string());
6596    }
6597
6598    lines.push(
6599        "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
6600            .to_string(),
6601    );
6602
6603    lines.join("\n")
6604}
6605
6606fn truncate_text(input: &str, max_len: usize) -> String {
6607    if input.len() <= max_len {
6608        return input.to_string();
6609    }
6610    let mut out = input[..max_len].to_string();
6611    out.push_str("...<truncated>");
6612    out
6613}
6614
6615async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
6616    if run.output_targets.is_empty() {
6617        return;
6618    }
6619    for target in &run.output_targets {
6620        let artifact = RoutineRunArtifact {
6621            artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
6622            uri: target.clone(),
6623            kind: "output_target".to_string(),
6624            label: Some("configured output target".to_string()),
6625            created_at_ms: now_ms(),
6626            metadata: Some(serde_json::json!({
6627                "source": "routine.output_targets",
6628                "runID": run.run_id,
6629                "routineID": run.routine_id,
6630            })),
6631        };
6632        let _ = state
6633            .append_routine_run_artifact(&run.run_id, artifact.clone())
6634            .await;
6635        state.event_bus.publish(EngineEvent::new(
6636            "routine.run.artifact_added",
6637            serde_json::json!({
6638                "runID": run.run_id,
6639                "routineID": run.routine_id,
6640                "artifact": artifact,
6641            }),
6642        ));
6643    }
6644}
6645
6646fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
6647    let obj = value.as_object()?;
6648    let provider_id = obj.get("provider_id")?.as_str()?.trim();
6649    let model_id = obj.get("model_id")?.as_str()?.trim();
6650    if provider_id.is_empty() || model_id.is_empty() {
6651        return None;
6652    }
6653    Some(ModelSpec {
6654        provider_id: provider_id.to_string(),
6655        model_id: model_id.to_string(),
6656    })
6657}
6658
6659fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
6660    args.get("model_policy")
6661        .and_then(|v| v.get("role_models"))
6662        .and_then(|v| v.get(role))
6663        .and_then(parse_model_spec)
6664}
6665
6666fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
6667    args.get("model_policy")
6668        .and_then(|v| v.get("default_model"))
6669        .and_then(parse_model_spec)
6670}
6671
6672fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6673    let provider_id = config
6674        .get("default_provider")
6675        .and_then(|v| v.as_str())
6676        .map(str::trim)
6677        .filter(|v| !v.is_empty())?;
6678    let model_id = config
6679        .get("providers")
6680        .and_then(|v| v.get(provider_id))
6681        .and_then(|v| v.get("default_model"))
6682        .and_then(|v| v.as_str())
6683        .map(str::trim)
6684        .filter(|v| !v.is_empty())?;
6685    Some(ModelSpec {
6686        provider_id: provider_id.to_string(),
6687        model_id: model_id.to_string(),
6688    })
6689}
6690
6691fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
6692    providers.iter().any(|provider| {
6693        provider.id == spec.provider_id
6694            && provider
6695                .models
6696                .iter()
6697                .any(|model| model.id == spec.model_id)
6698    })
6699}
6700
6701async fn resolve_routine_model_spec_for_run(
6702    state: &AppState,
6703    run: &RoutineRunRecord,
6704) -> (Option<ModelSpec>, String) {
6705    let providers = state.providers.list().await;
6706    let mode = routine_mode_from_args(&run.args);
6707    let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
6708
6709    if mode.eq_ignore_ascii_case("orchestrated") {
6710        if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
6711            requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
6712        }
6713    }
6714    if let Some(default_model) = default_model_spec_from_args(&run.args) {
6715        requested.push((default_model, "args.model_policy.default_model"));
6716    }
6717    let effective_config = state.config.get_effective_value().await;
6718    if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
6719        requested.push((config_default, "config.default_provider"));
6720    }
6721
6722    for (candidate, source) in requested {
6723        if provider_catalog_has_model(&providers, &candidate) {
6724            return (Some(candidate), source.to_string());
6725        }
6726    }
6727
6728    let fallback = providers
6729        .into_iter()
6730        .find(|provider| !provider.models.is_empty())
6731        .and_then(|provider| {
6732            let model = provider.models.first()?;
6733            Some(ModelSpec {
6734                provider_id: provider.id,
6735                model_id: model.id.clone(),
6736            })
6737        });
6738
6739    (fallback, "provider_catalog_fallback".to_string())
6740}
6741
6742#[cfg(test)]
6743mod tests {
6744    use super::*;
6745
6746    fn test_state_with_path(path: PathBuf) -> AppState {
6747        let mut state = AppState::new_starting("test-attempt".to_string(), true);
6748        state.shared_resources_path = path;
6749        state.routines_path = tmp_routines_file("shared-state");
6750        state.routine_history_path = tmp_routines_file("routine-history");
6751        state.routine_runs_path = tmp_routines_file("routine-runs");
6752        state
6753    }
6754
6755    fn tmp_resource_file(name: &str) -> PathBuf {
6756        std::env::temp_dir().join(format!(
6757            "tandem-server-{name}-{}.json",
6758            uuid::Uuid::new_v4()
6759        ))
6760    }
6761
6762    fn tmp_routines_file(name: &str) -> PathBuf {
6763        std::env::temp_dir().join(format!(
6764            "tandem-server-routines-{name}-{}.json",
6765            uuid::Uuid::new_v4()
6766        ))
6767    }
6768
6769    #[test]
6770    fn default_model_spec_from_effective_config_reads_default_route() {
6771        let cfg = serde_json::json!({
6772            "default_provider": "openrouter",
6773            "providers": {
6774                "openrouter": {
6775                    "default_model": "google/gemini-3-flash-preview"
6776                }
6777            }
6778        });
6779        let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
6780        assert_eq!(spec.provider_id, "openrouter");
6781        assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
6782    }
6783
6784    #[test]
6785    fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
6786        let missing_provider = serde_json::json!({
6787            "providers": {
6788                "openrouter": {
6789                    "default_model": "google/gemini-3-flash-preview"
6790                }
6791            }
6792        });
6793        assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
6794
6795        let missing_model = serde_json::json!({
6796            "default_provider": "openrouter",
6797            "providers": {
6798                "openrouter": {}
6799            }
6800        });
6801        assert!(default_model_spec_from_effective_config(&missing_model).is_none());
6802    }
6803
6804    #[tokio::test]
6805    async fn shared_resource_put_increments_revision() {
6806        let path = tmp_resource_file("shared-resource-put");
6807        let state = test_state_with_path(path.clone());
6808
6809        let first = state
6810            .put_shared_resource(
6811                "project/demo/board".to_string(),
6812                serde_json::json!({"status":"todo"}),
6813                None,
6814                "agent-1".to_string(),
6815                None,
6816            )
6817            .await
6818            .expect("first put");
6819        assert_eq!(first.rev, 1);
6820
6821        let second = state
6822            .put_shared_resource(
6823                "project/demo/board".to_string(),
6824                serde_json::json!({"status":"doing"}),
6825                Some(1),
6826                "agent-2".to_string(),
6827                Some(60_000),
6828            )
6829            .await
6830            .expect("second put");
6831        assert_eq!(second.rev, 2);
6832        assert_eq!(second.updated_by, "agent-2");
6833        assert_eq!(second.ttl_ms, Some(60_000));
6834
6835        let raw = tokio::fs::read_to_string(path.clone())
6836            .await
6837            .expect("persisted");
6838        assert!(raw.contains("\"rev\": 2"));
6839        let _ = tokio::fs::remove_file(path).await;
6840    }
6841
6842    #[tokio::test]
6843    async fn shared_resource_put_detects_revision_conflict() {
6844        let path = tmp_resource_file("shared-resource-conflict");
6845        let state = test_state_with_path(path.clone());
6846
6847        let _ = state
6848            .put_shared_resource(
6849                "mission/demo/card-1".to_string(),
6850                serde_json::json!({"title":"Card 1"}),
6851                None,
6852                "agent-1".to_string(),
6853                None,
6854            )
6855            .await
6856            .expect("seed put");
6857
6858        let conflict = state
6859            .put_shared_resource(
6860                "mission/demo/card-1".to_string(),
6861                serde_json::json!({"title":"Card 1 edited"}),
6862                Some(99),
6863                "agent-2".to_string(),
6864                None,
6865            )
6866            .await
6867            .expect_err("expected conflict");
6868
6869        match conflict {
6870            ResourceStoreError::RevisionConflict(conflict) => {
6871                assert_eq!(conflict.expected_rev, Some(99));
6872                assert_eq!(conflict.current_rev, Some(1));
6873            }
6874            other => panic!("unexpected error: {other:?}"),
6875        }
6876
6877        let _ = tokio::fs::remove_file(path).await;
6878    }
6879
6880    #[tokio::test]
6881    async fn shared_resource_rejects_invalid_namespace_key() {
6882        let path = tmp_resource_file("shared-resource-invalid-key");
6883        let state = test_state_with_path(path.clone());
6884
6885        let error = state
6886            .put_shared_resource(
6887                "global/demo/key".to_string(),
6888                serde_json::json!({"x":1}),
6889                None,
6890                "agent-1".to_string(),
6891                None,
6892            )
6893            .await
6894            .expect_err("invalid key should fail");
6895
6896        match error {
6897            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
6898            other => panic!("unexpected error: {other:?}"),
6899        }
6900
6901        assert!(!path.exists());
6902    }
6903
6904    #[test]
6905    fn derive_status_index_update_for_run_started() {
6906        let event = EngineEvent::new(
6907            "session.run.started",
6908            serde_json::json!({
6909                "sessionID": "s-1",
6910                "runID": "r-1"
6911            }),
6912        );
6913        let update = derive_status_index_update(&event).expect("update");
6914        assert_eq!(update.key, "run/s-1/status");
6915        assert_eq!(
6916            update.value.get("state").and_then(|v| v.as_str()),
6917            Some("running")
6918        );
6919        assert_eq!(
6920            update.value.get("phase").and_then(|v| v.as_str()),
6921            Some("run")
6922        );
6923    }
6924
6925    #[test]
6926    fn derive_status_index_update_for_tool_invocation() {
6927        let event = EngineEvent::new(
6928            "message.part.updated",
6929            serde_json::json!({
6930                "sessionID": "s-2",
6931                "runID": "r-2",
6932                "part": { "type": "tool-invocation", "tool": "todo_write" }
6933            }),
6934        );
6935        let update = derive_status_index_update(&event).expect("update");
6936        assert_eq!(update.key, "run/s-2/status");
6937        assert_eq!(
6938            update.value.get("phase").and_then(|v| v.as_str()),
6939            Some("tool")
6940        );
6941        assert_eq!(
6942            update.value.get("toolActive").and_then(|v| v.as_bool()),
6943            Some(true)
6944        );
6945        assert_eq!(
6946            update.value.get("tool").and_then(|v| v.as_str()),
6947            Some("todo_write")
6948        );
6949    }
6950
6951    #[test]
6952    fn misfire_skip_drops_runs_and_advances_next_fire() {
6953        let (count, next_fire) =
6954            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
6955        assert_eq!(count, 0);
6956        assert_eq!(next_fire, 11_000);
6957    }
6958
6959    #[test]
6960    fn misfire_run_once_emits_single_trigger() {
6961        let (count, next_fire) =
6962            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
6963        assert_eq!(count, 1);
6964        assert_eq!(next_fire, 11_000);
6965    }
6966
6967    #[test]
6968    fn misfire_catch_up_caps_trigger_count() {
6969        let (count, next_fire) = compute_misfire_plan(
6970            25_000,
6971            5_000,
6972            1_000,
6973            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
6974        );
6975        assert_eq!(count, 3);
6976        assert_eq!(next_fire, 26_000);
6977    }
6978
6979    #[tokio::test]
6980    async fn routine_put_persists_and_loads() {
6981        let routines_path = tmp_routines_file("persist-load");
6982        let mut state = AppState::new_starting("routines-put".to_string(), true);
6983        state.routines_path = routines_path.clone();
6984
6985        let routine = RoutineSpec {
6986            routine_id: "routine-1".to_string(),
6987            name: "Digest".to_string(),
6988            status: RoutineStatus::Active,
6989            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
6990            timezone: "UTC".to_string(),
6991            misfire_policy: RoutineMisfirePolicy::RunOnce,
6992            entrypoint: "mission.default".to_string(),
6993            args: serde_json::json!({"topic":"status"}),
6994            allowed_tools: vec![],
6995            output_targets: vec![],
6996            creator_type: "user".to_string(),
6997            creator_id: "user-1".to_string(),
6998            requires_approval: true,
6999            external_integrations_allowed: false,
7000            next_fire_at_ms: Some(5_000),
7001            last_fired_at_ms: None,
7002        };
7003
7004        state.put_routine(routine).await.expect("store routine");
7005
7006        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
7007        reloaded.routines_path = routines_path.clone();
7008        reloaded.load_routines().await.expect("load routines");
7009        let list = reloaded.list_routines().await;
7010        assert_eq!(list.len(), 1);
7011        assert_eq!(list[0].routine_id, "routine-1");
7012
7013        let _ = tokio::fs::remove_file(routines_path).await;
7014    }
7015
7016    #[tokio::test]
7017    async fn persist_routines_does_not_clobber_existing_store_with_empty_state() {
7018        let routines_path = tmp_routines_file("persist-guard");
7019        let mut writer = AppState::new_starting("routines-writer".to_string(), true);
7020        writer.routines_path = routines_path.clone();
7021        writer
7022            .put_routine(RoutineSpec {
7023                routine_id: "automation-guarded".to_string(),
7024                name: "Guarded Automation".to_string(),
7025                status: RoutineStatus::Active,
7026                schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7027                timezone: "UTC".to_string(),
7028                misfire_policy: RoutineMisfirePolicy::RunOnce,
7029                entrypoint: "mission.default".to_string(),
7030                args: serde_json::json!({
7031                    "prompt": "Keep this saved across restart"
7032                }),
7033                allowed_tools: vec!["read".to_string()],
7034                output_targets: vec![],
7035                creator_type: "user".to_string(),
7036                creator_id: "user-1".to_string(),
7037                requires_approval: false,
7038                external_integrations_allowed: false,
7039                next_fire_at_ms: Some(5_000),
7040                last_fired_at_ms: None,
7041            })
7042            .await
7043            .expect("persist baseline routine");
7044
7045        let mut empty_state = AppState::new_starting("routines-empty".to_string(), true);
7046        empty_state.routines_path = routines_path.clone();
7047        let persist = empty_state.persist_routines().await;
7048        assert!(
7049            persist.is_err(),
7050            "empty state should not overwrite existing routines store"
7051        );
7052
7053        let raw = tokio::fs::read_to_string(&routines_path)
7054            .await
7055            .expect("read guarded routines file");
7056        let parsed: std::collections::HashMap<String, RoutineSpec> =
7057            serde_json::from_str(&raw).expect("parse guarded routines file");
7058        assert!(parsed.contains_key("automation-guarded"));
7059
7060        let _ = tokio::fs::remove_file(routines_path.clone()).await;
7061        let _ = tokio::fs::remove_file(sibling_backup_path(&routines_path)).await;
7062    }
7063
7064    #[tokio::test]
7065    async fn load_routines_recovers_from_backup_when_primary_corrupt() {
7066        let routines_path = tmp_routines_file("backup-recovery");
7067        let backup_path = sibling_backup_path(&routines_path);
7068        let mut state = AppState::new_starting("routines-backup-recovery".to_string(), true);
7069        state.routines_path = routines_path.clone();
7070
7071        let primary = "{ not valid json";
7072        tokio::fs::write(&routines_path, primary)
7073            .await
7074            .expect("write corrupt primary");
7075        let backup = serde_json::json!({
7076            "routine-1": {
7077                "routine_id": "routine-1",
7078                "name": "Recovered",
7079                "status": "active",
7080                "schedule": { "interval_seconds": { "seconds": 60 } },
7081                "timezone": "UTC",
7082                "misfire_policy": { "type": "run_once" },
7083                "entrypoint": "mission.default",
7084                "args": {},
7085                "allowed_tools": [],
7086                "output_targets": [],
7087                "creator_type": "user",
7088                "creator_id": "u-1",
7089                "requires_approval": true,
7090                "external_integrations_allowed": false,
7091                "next_fire_at_ms": null,
7092                "last_fired_at_ms": null
7093            }
7094        });
7095        tokio::fs::write(&backup_path, serde_json::to_string_pretty(&backup).unwrap())
7096            .await
7097            .expect("write backup");
7098
7099        state.load_routines().await.expect("load from backup");
7100        let list = state.list_routines().await;
7101        assert_eq!(list.len(), 1);
7102        assert_eq!(list[0].routine_id, "routine-1");
7103
7104        let _ = tokio::fs::remove_file(routines_path).await;
7105        let _ = tokio::fs::remove_file(backup_path).await;
7106    }
7107
7108    #[tokio::test]
7109    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
7110        let routines_path = tmp_routines_file("misfire-eval");
7111        let mut state = AppState::new_starting("routines-eval".to_string(), true);
7112        state.routines_path = routines_path.clone();
7113
7114        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
7115            routine_id: id.to_string(),
7116            name: id.to_string(),
7117            status: RoutineStatus::Active,
7118            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
7119            timezone: "UTC".to_string(),
7120            misfire_policy: policy,
7121            entrypoint: "mission.default".to_string(),
7122            args: serde_json::json!({}),
7123            allowed_tools: vec![],
7124            output_targets: vec![],
7125            creator_type: "user".to_string(),
7126            creator_id: "u-1".to_string(),
7127            requires_approval: false,
7128            external_integrations_allowed: false,
7129            next_fire_at_ms: Some(5_000),
7130            last_fired_at_ms: None,
7131        };
7132
7133        state
7134            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
7135            .await
7136            .expect("put skip");
7137        state
7138            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
7139            .await
7140            .expect("put once");
7141        state
7142            .put_routine(base(
7143                "routine-catch",
7144                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
7145            ))
7146            .await
7147            .expect("put catch");
7148
7149        let plans = state.evaluate_routine_misfires(10_500).await;
7150        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
7151        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
7152        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
7153
7154        assert!(plan_skip.is_none());
7155        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
7156        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
7157
7158        let stored = state.list_routines().await;
7159        let skip_next = stored
7160            .iter()
7161            .find(|r| r.routine_id == "routine-skip")
7162            .and_then(|r| r.next_fire_at_ms)
7163            .expect("skip next");
7164        assert!(skip_next > 10_500);
7165
7166        let _ = tokio::fs::remove_file(routines_path).await;
7167    }
7168
7169    #[test]
7170    fn routine_policy_blocks_external_side_effects_by_default() {
7171        let routine = RoutineSpec {
7172            routine_id: "routine-policy-1".to_string(),
7173            name: "Connector routine".to_string(),
7174            status: RoutineStatus::Active,
7175            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7176            timezone: "UTC".to_string(),
7177            misfire_policy: RoutineMisfirePolicy::RunOnce,
7178            entrypoint: "connector.email.reply".to_string(),
7179            args: serde_json::json!({}),
7180            allowed_tools: vec![],
7181            output_targets: vec![],
7182            creator_type: "user".to_string(),
7183            creator_id: "u-1".to_string(),
7184            requires_approval: true,
7185            external_integrations_allowed: false,
7186            next_fire_at_ms: None,
7187            last_fired_at_ms: None,
7188        };
7189
7190        let decision = evaluate_routine_execution_policy(&routine, "manual");
7191        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
7192    }
7193
7194    #[test]
7195    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
7196        let routine = RoutineSpec {
7197            routine_id: "routine-policy-2".to_string(),
7198            name: "Connector routine".to_string(),
7199            status: RoutineStatus::Active,
7200            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7201            timezone: "UTC".to_string(),
7202            misfire_policy: RoutineMisfirePolicy::RunOnce,
7203            entrypoint: "connector.email.reply".to_string(),
7204            args: serde_json::json!({}),
7205            allowed_tools: vec![],
7206            output_targets: vec![],
7207            creator_type: "user".to_string(),
7208            creator_id: "u-1".to_string(),
7209            requires_approval: true,
7210            external_integrations_allowed: true,
7211            next_fire_at_ms: None,
7212            last_fired_at_ms: None,
7213        };
7214
7215        let decision = evaluate_routine_execution_policy(&routine, "manual");
7216        assert!(matches!(
7217            decision,
7218            RoutineExecutionDecision::RequiresApproval { .. }
7219        ));
7220    }
7221
7222    #[test]
7223    fn routine_policy_allows_non_external_entrypoints() {
7224        let routine = RoutineSpec {
7225            routine_id: "routine-policy-3".to_string(),
7226            name: "Internal mission routine".to_string(),
7227            status: RoutineStatus::Active,
7228            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7229            timezone: "UTC".to_string(),
7230            misfire_policy: RoutineMisfirePolicy::RunOnce,
7231            entrypoint: "mission.default".to_string(),
7232            args: serde_json::json!({}),
7233            allowed_tools: vec![],
7234            output_targets: vec![],
7235            creator_type: "user".to_string(),
7236            creator_id: "u-1".to_string(),
7237            requires_approval: true,
7238            external_integrations_allowed: false,
7239            next_fire_at_ms: None,
7240            last_fired_at_ms: None,
7241        };
7242
7243        let decision = evaluate_routine_execution_policy(&routine, "manual");
7244        assert_eq!(decision, RoutineExecutionDecision::Allowed);
7245    }
7246
7247    #[tokio::test]
7248    async fn claim_next_queued_routine_run_marks_oldest_running() {
7249        let mut state = AppState::new_starting("routine-claim".to_string(), true);
7250        state.routine_runs_path = tmp_routines_file("routine-claim-runs");
7251
7252        let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
7253            run_id: run_id.to_string(),
7254            routine_id: "routine-claim".to_string(),
7255            trigger_type: "manual".to_string(),
7256            run_count: 1,
7257            status: RoutineRunStatus::Queued,
7258            created_at_ms,
7259            updated_at_ms: created_at_ms,
7260            fired_at_ms: Some(created_at_ms),
7261            started_at_ms: None,
7262            finished_at_ms: None,
7263            requires_approval: false,
7264            approval_reason: None,
7265            denial_reason: None,
7266            paused_reason: None,
7267            detail: None,
7268            entrypoint: "mission.default".to_string(),
7269            args: serde_json::json!({}),
7270            allowed_tools: vec![],
7271            output_targets: vec![],
7272            artifacts: vec![],
7273            active_session_ids: vec![],
7274            latest_session_id: None,
7275            prompt_tokens: 0,
7276            completion_tokens: 0,
7277            total_tokens: 0,
7278            estimated_cost_usd: 0.0,
7279        };
7280
7281        {
7282            let mut guard = state.routine_runs.write().await;
7283            guard.insert("run-late".to_string(), mk("run-late", 2_000));
7284            guard.insert("run-early".to_string(), mk("run-early", 1_000));
7285        }
7286        state.persist_routine_runs().await.expect("persist");
7287
7288        let claimed = state
7289            .claim_next_queued_routine_run()
7290            .await
7291            .expect("claimed run");
7292        assert_eq!(claimed.run_id, "run-early");
7293        assert_eq!(claimed.status, RoutineRunStatus::Running);
7294        assert!(claimed.started_at_ms.is_some());
7295    }
7296
7297    #[tokio::test]
7298    async fn routine_session_policy_roundtrip_normalizes_tools() {
7299        let state = AppState::new_starting("routine-policy-hook".to_string(), true);
7300        state
7301            .set_routine_session_policy(
7302                "session-routine-1".to_string(),
7303                "run-1".to_string(),
7304                "routine-1".to_string(),
7305                vec![
7306                    "read".to_string(),
7307                    " mcp.arcade.search ".to_string(),
7308                    "read".to_string(),
7309                    "".to_string(),
7310                ],
7311            )
7312            .await;
7313
7314        let policy = state
7315            .routine_session_policy("session-routine-1")
7316            .await
7317            .expect("policy");
7318        assert_eq!(
7319            policy.allowed_tools,
7320            vec!["read".to_string(), "mcp.arcade.search".to_string()]
7321        );
7322    }
7323
7324    #[tokio::test]
7325    async fn routine_run_preserves_latest_session_id_after_session_clears() {
7326        let state = AppState::new_starting("routine-latest-session".to_string(), true);
7327        let routine = RoutineSpec {
7328            routine_id: "routine-session-link".to_string(),
7329            name: "Routine Session Link".to_string(),
7330            status: RoutineStatus::Active,
7331            schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7332            timezone: "UTC".to_string(),
7333            misfire_policy: RoutineMisfirePolicy::Skip,
7334            entrypoint: "mission.default".to_string(),
7335            args: serde_json::json!({}),
7336            allowed_tools: vec![],
7337            output_targets: vec![],
7338            creator_type: "user".to_string(),
7339            creator_id: "test".to_string(),
7340            requires_approval: false,
7341            external_integrations_allowed: false,
7342            next_fire_at_ms: None,
7343            last_fired_at_ms: None,
7344        };
7345
7346        let run = state
7347            .create_routine_run(&routine, "manual", 1, RoutineRunStatus::Queued, None)
7348            .await;
7349        state
7350            .add_active_session_id(&run.run_id, "session-123".to_string())
7351            .await
7352            .expect("active session added");
7353        state
7354            .clear_active_session_id(&run.run_id, "session-123")
7355            .await
7356            .expect("active session cleared");
7357
7358        let updated = state
7359            .get_routine_run(&run.run_id)
7360            .await
7361            .expect("run exists");
7362        assert!(updated.active_session_ids.is_empty());
7363        assert_eq!(updated.latest_session_id.as_deref(), Some("session-123"));
7364    }
7365
7366    #[test]
7367    fn routine_mission_prompt_includes_orchestrated_contract() {
7368        let run = RoutineRunRecord {
7369            run_id: "run-orchestrated-1".to_string(),
7370            routine_id: "automation-orchestrated".to_string(),
7371            trigger_type: "manual".to_string(),
7372            run_count: 1,
7373            status: RoutineRunStatus::Queued,
7374            created_at_ms: 1_000,
7375            updated_at_ms: 1_000,
7376            fired_at_ms: Some(1_000),
7377            started_at_ms: None,
7378            finished_at_ms: None,
7379            requires_approval: true,
7380            approval_reason: None,
7381            denial_reason: None,
7382            paused_reason: None,
7383            detail: None,
7384            entrypoint: "mission.default".to_string(),
7385            args: serde_json::json!({
7386                "prompt": "Coordinate a multi-step release readiness check.",
7387                "mode": "orchestrated",
7388                "success_criteria": ["All blockers listed", "Output artifact written"],
7389                "orchestrator_only_tool_calls": true
7390            }),
7391            allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
7392            output_targets: vec!["file://reports/release-readiness.md".to_string()],
7393            artifacts: vec![],
7394            active_session_ids: vec![],
7395            latest_session_id: None,
7396            prompt_tokens: 0,
7397            completion_tokens: 0,
7398            total_tokens: 0,
7399            estimated_cost_usd: 0.0,
7400        };
7401
7402        let objective = routine_objective_from_args(&run).expect("objective");
7403        let prompt = build_routine_mission_prompt(&run, &objective);
7404
7405        assert!(prompt.contains("Mode: orchestrated"));
7406        assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
7407        assert!(prompt.contains("only the orchestrator may execute tools"));
7408        assert!(prompt.contains("Allowed Tools: read, webfetch"));
7409        assert!(prompt.contains("file://reports/release-readiness.md"));
7410    }
7411
7412    #[test]
7413    fn routine_mission_prompt_includes_standalone_defaults() {
7414        let run = RoutineRunRecord {
7415            run_id: "run-standalone-1".to_string(),
7416            routine_id: "automation-standalone".to_string(),
7417            trigger_type: "manual".to_string(),
7418            run_count: 1,
7419            status: RoutineRunStatus::Queued,
7420            created_at_ms: 2_000,
7421            updated_at_ms: 2_000,
7422            fired_at_ms: Some(2_000),
7423            started_at_ms: None,
7424            finished_at_ms: None,
7425            requires_approval: false,
7426            approval_reason: None,
7427            denial_reason: None,
7428            paused_reason: None,
7429            detail: None,
7430            entrypoint: "mission.default".to_string(),
7431            args: serde_json::json!({
7432                "prompt": "Summarize top engineering updates.",
7433                "success_criteria": ["Three bullet summary"]
7434            }),
7435            allowed_tools: vec![],
7436            output_targets: vec![],
7437            artifacts: vec![],
7438            active_session_ids: vec![],
7439            latest_session_id: None,
7440            prompt_tokens: 0,
7441            completion_tokens: 0,
7442            total_tokens: 0,
7443            estimated_cost_usd: 0.0,
7444        };
7445
7446        let objective = routine_objective_from_args(&run).expect("objective");
7447        let prompt = build_routine_mission_prompt(&run, &objective);
7448
7449        assert!(prompt.contains("Mode: standalone"));
7450        assert!(prompt.contains("Execution Pattern: Standalone mission run"));
7451        assert!(prompt.contains("Allowed Tools: all available by current policy"));
7452        assert!(prompt.contains("Output Targets: none configured"));
7453    }
7454
7455    #[test]
7456    fn shared_resource_key_validator_accepts_swarm_active_tasks() {
7457        assert!(is_valid_resource_key("swarm.active_tasks"));
7458        assert!(is_valid_resource_key("project/demo"));
7459        assert!(!is_valid_resource_key("swarm//active_tasks"));
7460        assert!(!is_valid_resource_key("misc/demo"));
7461    }
7462}