Skip to main content

tandem_server/
lib.rs

1#![recursion_limit = "512"]
2
3use std::ops::Deref;
4use std::path::PathBuf;
5use std::str::FromStr;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, OnceLock};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use chrono::{TimeZone, Utc};
11use chrono_tz::Tz;
12use cron::Schedule;
13use futures::future::{join_all, BoxFuture};
14use futures::FutureExt;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use sha2::{Digest, Sha256};
18use tandem_memory::types::MemoryTier;
19use tandem_memory::{GovernedMemoryTier, MemoryClassification, MemoryContentKind, MemoryPartition};
20use tandem_orchestrator::MissionState;
21use tandem_types::{
22    EngineEvent, HostOs, HostRuntimeContext, MessagePart, MessagePartInput, MessageRole, ModelSpec,
23    PathStyle, SendMessageRequest, Session, ShellFamily,
24};
25use tokio::fs;
26use tokio::sync::RwLock;
27
28use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
29use tandem_core::{
30    resolve_shared_paths, AgentRegistry, CancellationRegistry, ConfigStore, EngineLoop, EventBus,
31    PermissionManager, PluginRegistry, PromptContextHook, PromptContextHookContext, Storage,
32};
33use tandem_memory::db::MemoryDatabase;
34use tandem_providers::ChatMessage;
35use tandem_providers::ProviderRegistry;
36use tandem_runtime::{LspManager, McpRegistry, PtyManager, WorkspaceIndex};
37use tandem_tools::ToolRegistry;
38use tandem_workflows::{
39    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
40    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
41    WorkflowRunStatus, WorkflowSourceKind, WorkflowSourceRef, WorkflowSpec,
42    WorkflowValidationMessage,
43};
44
45mod agent_teams;
46mod browser;
47mod bug_monitor_github;
48mod capability_resolver;
49mod http;
50mod mcp_catalog;
51mod pack_builder;
52mod pack_manager;
53mod preset_composer;
54mod preset_registry;
55mod preset_summary;
56pub mod webui;
57mod workflows;
58
59pub use agent_teams::AgentTeamRuntime;
60pub use browser::{
61    install_browser_sidecar, BrowserHealthSummary, BrowserSidecarInstallResult,
62    BrowserSmokeTestResult, BrowserSubsystem,
63};
64pub use capability_resolver::CapabilityResolver;
65pub use http::serve;
66pub use pack_manager::PackManager;
67pub use preset_composer::PromptComposeInput;
68pub use preset_registry::PresetRegistry;
69pub use workflows::{
70    canonical_workflow_event_names, dispatch_workflow_event, execute_hook_binding,
71    execute_workflow, parse_workflow_action, run_workflow_dispatcher, simulate_workflow_event,
72};
73
74pub(crate) fn normalize_absolute_workspace_root(raw: &str) -> Result<String, String> {
75    let trimmed = raw.trim();
76    if trimmed.is_empty() {
77        return Err("workspace_root is required".to_string());
78    }
79    let as_path = PathBuf::from(trimmed);
80    if !as_path.is_absolute() {
81        return Err("workspace_root must be an absolute path".to_string());
82    }
83    tandem_core::normalize_workspace_path(trimmed)
84        .ok_or_else(|| "workspace_root is invalid".to_string())
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, Default)]
88pub struct ChannelStatus {
89    pub enabled: bool,
90    pub connected: bool,
91    pub last_error: Option<String>,
92    pub active_sessions: u64,
93    pub meta: Value,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
97pub struct WebUiConfig {
98    #[serde(default)]
99    pub enabled: bool,
100    #[serde(default = "default_web_ui_prefix")]
101    pub path_prefix: String,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, Default)]
105pub struct ChannelsConfigFile {
106    pub telegram: Option<TelegramConfigFile>,
107    pub discord: Option<DiscordConfigFile>,
108    pub slack: Option<SlackConfigFile>,
109    #[serde(default)]
110    pub tool_policy: tandem_channels::config::ChannelToolPolicy,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct TelegramConfigFile {
115    pub bot_token: String,
116    #[serde(default = "default_allow_all")]
117    pub allowed_users: Vec<String>,
118    #[serde(default)]
119    pub mention_only: bool,
120    #[serde(default)]
121    pub style_profile: tandem_channels::config::TelegramStyleProfile,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DiscordConfigFile {
126    pub bot_token: String,
127    #[serde(default)]
128    pub guild_id: Option<String>,
129    #[serde(default = "default_allow_all")]
130    pub allowed_users: Vec<String>,
131    #[serde(default = "default_discord_mention_only")]
132    pub mention_only: bool,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct SlackConfigFile {
137    pub bot_token: String,
138    pub channel_id: String,
139    #[serde(default = "default_allow_all")]
140    pub allowed_users: Vec<String>,
141    #[serde(default)]
142    pub mention_only: bool,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, Default)]
146struct EffectiveAppConfig {
147    #[serde(default)]
148    pub channels: ChannelsConfigFile,
149    #[serde(default)]
150    pub web_ui: WebUiConfig,
151    #[serde(default)]
152    pub browser: tandem_core::BrowserConfig,
153    #[serde(default)]
154    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
155}
156
157#[derive(Default)]
158pub struct ChannelRuntime {
159    pub listeners: Option<tokio::task::JoinSet<()>>,
160    pub statuses: std::collections::HashMap<String, ChannelStatus>,
161}
162
163#[derive(Debug, Clone)]
164pub struct EngineLease {
165    pub lease_id: String,
166    pub client_id: String,
167    pub client_type: String,
168    pub acquired_at_ms: u64,
169    pub last_renewed_at_ms: u64,
170    pub ttl_ms: u64,
171}
172
173impl EngineLease {
174    pub fn is_expired(&self, now_ms: u64) -> bool {
175        now_ms.saturating_sub(self.last_renewed_at_ms) > self.ttl_ms
176    }
177}
178
179#[derive(Debug, Clone, Serialize)]
180pub struct ActiveRun {
181    #[serde(rename = "runID")]
182    pub run_id: String,
183    #[serde(rename = "startedAtMs")]
184    pub started_at_ms: u64,
185    #[serde(rename = "lastActivityAtMs")]
186    pub last_activity_at_ms: u64,
187    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
188    pub client_id: Option<String>,
189    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
190    pub agent_id: Option<String>,
191    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
192    pub agent_profile: Option<String>,
193}
194
195#[derive(Clone, Default)]
196pub struct RunRegistry {
197    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
198}
199
200impl RunRegistry {
201    pub fn new() -> Self {
202        Self::default()
203    }
204
205    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
206        self.active.read().await.get(session_id).cloned()
207    }
208
209    pub async fn acquire(
210        &self,
211        session_id: &str,
212        run_id: String,
213        client_id: Option<String>,
214        agent_id: Option<String>,
215        agent_profile: Option<String>,
216    ) -> std::result::Result<ActiveRun, ActiveRun> {
217        let mut guard = self.active.write().await;
218        if let Some(existing) = guard.get(session_id).cloned() {
219            return Err(existing);
220        }
221        let now = now_ms();
222        let run = ActiveRun {
223            run_id,
224            started_at_ms: now,
225            last_activity_at_ms: now,
226            client_id,
227            agent_id,
228            agent_profile,
229        };
230        guard.insert(session_id.to_string(), run.clone());
231        Ok(run)
232    }
233
234    pub async fn touch(&self, session_id: &str, run_id: &str) {
235        let mut guard = self.active.write().await;
236        if let Some(run) = guard.get_mut(session_id) {
237            if run.run_id == run_id {
238                run.last_activity_at_ms = now_ms();
239            }
240        }
241    }
242
243    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
244        let mut guard = self.active.write().await;
245        if let Some(run) = guard.get(session_id) {
246            if run.run_id == run_id {
247                return guard.remove(session_id);
248            }
249        }
250        None
251    }
252
253    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
254        self.active.write().await.remove(session_id)
255    }
256
257    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
258        let now = now_ms();
259        let mut guard = self.active.write().await;
260        let stale_ids = guard
261            .iter()
262            .filter_map(|(session_id, run)| {
263                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
264                    Some(session_id.clone())
265                } else {
266                    None
267                }
268            })
269            .collect::<Vec<_>>();
270        let mut out = Vec::with_capacity(stale_ids.len());
271        for session_id in stale_ids {
272            if let Some(run) = guard.remove(&session_id) {
273                out.push((session_id, run));
274            }
275        }
276        out
277    }
278}
279
280pub fn now_ms() -> u64 {
281    SystemTime::now()
282        .duration_since(UNIX_EPOCH)
283        .map(|d| d.as_millis() as u64)
284        .unwrap_or(0)
285}
286
287pub fn build_id() -> String {
288    if let Some(explicit) = option_env!("TANDEM_BUILD_ID") {
289        let trimmed = explicit.trim();
290        if !trimmed.is_empty() {
291            return trimmed.to_string();
292        }
293    }
294    if let Some(git_sha) = option_env!("VERGEN_GIT_SHA") {
295        let trimmed = git_sha.trim();
296        if !trimmed.is_empty() {
297            return format!("{}+{}", env!("CARGO_PKG_VERSION"), trimmed);
298        }
299    }
300    env!("CARGO_PKG_VERSION").to_string()
301}
302
303pub fn detect_host_runtime_context() -> HostRuntimeContext {
304    let os = if cfg!(target_os = "windows") {
305        HostOs::Windows
306    } else if cfg!(target_os = "macos") {
307        HostOs::Macos
308    } else {
309        HostOs::Linux
310    };
311    let (shell_family, path_style) = match os {
312        HostOs::Windows => (ShellFamily::Powershell, PathStyle::Windows),
313        HostOs::Linux | HostOs::Macos => (ShellFamily::Posix, PathStyle::Posix),
314    };
315    HostRuntimeContext {
316        os,
317        arch: std::env::consts::ARCH.to_string(),
318        shell_family,
319        path_style,
320    }
321}
322
323pub fn binary_path_for_health() -> Option<String> {
324    #[cfg(debug_assertions)]
325    {
326        std::env::current_exe()
327            .ok()
328            .map(|p| p.to_string_lossy().to_string())
329    }
330    #[cfg(not(debug_assertions))]
331    {
332        None
333    }
334}
335
336#[derive(Clone)]
337pub struct RuntimeState {
338    pub storage: Arc<Storage>,
339    pub config: ConfigStore,
340    pub event_bus: EventBus,
341    pub providers: ProviderRegistry,
342    pub plugins: PluginRegistry,
343    pub agents: AgentRegistry,
344    pub tools: ToolRegistry,
345    pub permissions: PermissionManager,
346    pub mcp: McpRegistry,
347    pub pty: PtyManager,
348    pub lsp: LspManager,
349    pub auth: Arc<RwLock<std::collections::HashMap<String, String>>>,
350    pub logs: Arc<RwLock<Vec<Value>>>,
351    pub workspace_index: WorkspaceIndex,
352    pub cancellations: CancellationRegistry,
353    pub engine_loop: EngineLoop,
354    pub host_runtime_context: HostRuntimeContext,
355    pub browser: BrowserSubsystem,
356}
357
358#[derive(Debug, Clone)]
359pub struct GovernedMemoryRecord {
360    pub id: String,
361    pub run_id: String,
362    pub partition: MemoryPartition,
363    pub kind: MemoryContentKind,
364    pub content: String,
365    pub artifact_refs: Vec<String>,
366    pub classification: MemoryClassification,
367    pub metadata: Option<Value>,
368    pub source_memory_id: Option<String>,
369    pub created_at_ms: u64,
370}
371
372#[derive(Debug, Clone, Serialize)]
373pub struct MemoryAuditEvent {
374    pub audit_id: String,
375    pub action: String,
376    pub run_id: String,
377    pub memory_id: Option<String>,
378    pub source_memory_id: Option<String>,
379    pub to_tier: Option<GovernedMemoryTier>,
380    pub partition_key: String,
381    pub actor: String,
382    pub status: String,
383    #[serde(skip_serializing_if = "Option::is_none")]
384    pub detail: Option<String>,
385    pub created_at_ms: u64,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct SharedResourceRecord {
390    pub key: String,
391    pub value: Value,
392    pub rev: u64,
393    pub updated_at_ms: u64,
394    pub updated_by: String,
395    #[serde(skip_serializing_if = "Option::is_none")]
396    pub ttl_ms: Option<u64>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
400#[serde(rename_all = "snake_case")]
401pub enum RoutineSchedule {
402    IntervalSeconds { seconds: u64 },
403    Cron { expression: String },
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
407#[serde(rename_all = "snake_case", tag = "type")]
408pub enum RoutineMisfirePolicy {
409    Skip,
410    RunOnce,
411    CatchUp { max_runs: u32 },
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
415#[serde(rename_all = "snake_case")]
416pub enum RoutineStatus {
417    Active,
418    Paused,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
422pub struct RoutineSpec {
423    pub routine_id: String,
424    pub name: String,
425    pub status: RoutineStatus,
426    pub schedule: RoutineSchedule,
427    pub timezone: String,
428    pub misfire_policy: RoutineMisfirePolicy,
429    pub entrypoint: String,
430    #[serde(default)]
431    pub args: Value,
432    #[serde(default)]
433    pub allowed_tools: Vec<String>,
434    #[serde(default)]
435    pub output_targets: Vec<String>,
436    pub creator_type: String,
437    pub creator_id: String,
438    pub requires_approval: bool,
439    pub external_integrations_allowed: bool,
440    #[serde(default, skip_serializing_if = "Option::is_none")]
441    pub next_fire_at_ms: Option<u64>,
442    #[serde(default, skip_serializing_if = "Option::is_none")]
443    pub last_fired_at_ms: Option<u64>,
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
447pub struct RoutineHistoryEvent {
448    pub routine_id: String,
449    pub trigger_type: String,
450    pub run_count: u32,
451    pub fired_at_ms: u64,
452    pub status: String,
453    #[serde(default, skip_serializing_if = "Option::is_none")]
454    pub detail: Option<String>,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(rename_all = "snake_case")]
459pub enum RoutineRunStatus {
460    Queued,
461    PendingApproval,
462    Running,
463    Paused,
464    BlockedPolicy,
465    Denied,
466    Completed,
467    Failed,
468    Cancelled,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub struct RoutineRunArtifact {
473    pub artifact_id: String,
474    pub uri: String,
475    pub kind: String,
476    #[serde(default, skip_serializing_if = "Option::is_none")]
477    pub label: Option<String>,
478    pub created_at_ms: u64,
479    #[serde(default, skip_serializing_if = "Option::is_none")]
480    pub metadata: Option<Value>,
481}
482
483#[derive(Debug, Clone, Serialize, Deserialize)]
484pub struct RoutineRunRecord {
485    pub run_id: String,
486    pub routine_id: String,
487    pub trigger_type: String,
488    pub run_count: u32,
489    pub status: RoutineRunStatus,
490    pub created_at_ms: u64,
491    pub updated_at_ms: u64,
492    #[serde(default, skip_serializing_if = "Option::is_none")]
493    pub fired_at_ms: Option<u64>,
494    #[serde(default, skip_serializing_if = "Option::is_none")]
495    pub started_at_ms: Option<u64>,
496    #[serde(default, skip_serializing_if = "Option::is_none")]
497    pub finished_at_ms: Option<u64>,
498    pub requires_approval: bool,
499    #[serde(default, skip_serializing_if = "Option::is_none")]
500    pub approval_reason: Option<String>,
501    #[serde(default, skip_serializing_if = "Option::is_none")]
502    pub denial_reason: Option<String>,
503    #[serde(default, skip_serializing_if = "Option::is_none")]
504    pub paused_reason: Option<String>,
505    #[serde(default, skip_serializing_if = "Option::is_none")]
506    pub detail: Option<String>,
507    pub entrypoint: String,
508    #[serde(default)]
509    pub args: Value,
510    #[serde(default)]
511    pub allowed_tools: Vec<String>,
512    #[serde(default)]
513    pub output_targets: Vec<String>,
514    #[serde(default)]
515    pub artifacts: Vec<RoutineRunArtifact>,
516    #[serde(default)]
517    pub active_session_ids: Vec<String>,
518    #[serde(default, skip_serializing_if = "Option::is_none")]
519    pub latest_session_id: Option<String>,
520    #[serde(default)]
521    pub prompt_tokens: u64,
522    #[serde(default)]
523    pub completion_tokens: u64,
524    #[serde(default)]
525    pub total_tokens: u64,
526    #[serde(default)]
527    pub estimated_cost_usd: f64,
528}
529
530#[derive(Debug, Clone)]
531pub struct RoutineSessionPolicy {
532    pub session_id: String,
533    pub run_id: String,
534    pub routine_id: String,
535    pub allowed_tools: Vec<String>,
536}
537
538#[derive(Debug, Clone, Serialize)]
539pub struct RoutineTriggerPlan {
540    pub routine_id: String,
541    pub run_count: u32,
542    pub scheduled_at_ms: u64,
543    pub next_fire_at_ms: u64,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
547#[serde(rename_all = "snake_case")]
548pub enum AutomationV2Status {
549    Active,
550    Paused,
551    Draft,
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
555#[serde(rename_all = "snake_case")]
556pub enum AutomationV2ScheduleType {
557    Cron,
558    Interval,
559    Manual,
560}
561
562#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
563pub struct AutomationV2Schedule {
564    #[serde(rename = "type")]
565    pub schedule_type: AutomationV2ScheduleType,
566    #[serde(default, skip_serializing_if = "Option::is_none")]
567    pub cron_expression: Option<String>,
568    #[serde(default, skip_serializing_if = "Option::is_none")]
569    pub interval_seconds: Option<u64>,
570    pub timezone: String,
571    pub misfire_policy: RoutineMisfirePolicy,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub struct AutomationAgentToolPolicy {
576    #[serde(default)]
577    pub allowlist: Vec<String>,
578    #[serde(default)]
579    pub denylist: Vec<String>,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct AutomationAgentMcpPolicy {
584    #[serde(default)]
585    pub allowed_servers: Vec<String>,
586    #[serde(default, skip_serializing_if = "Option::is_none")]
587    pub allowed_tools: Option<Vec<String>>,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize)]
591pub struct AutomationAgentProfile {
592    pub agent_id: String,
593    #[serde(default, skip_serializing_if = "Option::is_none")]
594    pub template_id: Option<String>,
595    pub display_name: String,
596    #[serde(default, skip_serializing_if = "Option::is_none")]
597    pub avatar_url: Option<String>,
598    #[serde(default, skip_serializing_if = "Option::is_none")]
599    pub model_policy: Option<Value>,
600    #[serde(default)]
601    pub skills: Vec<String>,
602    pub tool_policy: AutomationAgentToolPolicy,
603    pub mcp_policy: AutomationAgentMcpPolicy,
604    #[serde(default, skip_serializing_if = "Option::is_none")]
605    pub approval_policy: Option<String>,
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct AutomationFlowNode {
610    pub node_id: String,
611    pub agent_id: String,
612    pub objective: String,
613    #[serde(default)]
614    pub depends_on: Vec<String>,
615    #[serde(default)]
616    pub input_refs: Vec<AutomationFlowInputRef>,
617    #[serde(default, skip_serializing_if = "Option::is_none")]
618    pub output_contract: Option<AutomationFlowOutputContract>,
619    #[serde(default, skip_serializing_if = "Option::is_none")]
620    pub retry_policy: Option<Value>,
621    #[serde(default, skip_serializing_if = "Option::is_none")]
622    pub timeout_ms: Option<u64>,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct AutomationFlowInputRef {
627    pub from_step_id: String,
628    pub alias: String,
629}
630
631#[derive(Debug, Clone, Serialize, Deserialize)]
632pub struct AutomationFlowOutputContract {
633    pub kind: String,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct AutomationFlowSpec {
638    #[serde(default)]
639    pub nodes: Vec<AutomationFlowNode>,
640}
641
642#[derive(Debug, Clone, Serialize, Deserialize)]
643pub struct AutomationExecutionPolicy {
644    #[serde(default, skip_serializing_if = "Option::is_none")]
645    pub max_parallel_agents: Option<u32>,
646    #[serde(default, skip_serializing_if = "Option::is_none")]
647    pub max_total_runtime_ms: Option<u64>,
648    #[serde(default, skip_serializing_if = "Option::is_none")]
649    pub max_total_tool_calls: Option<u32>,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct AutomationV2Spec {
654    pub automation_id: String,
655    pub name: String,
656    #[serde(default, skip_serializing_if = "Option::is_none")]
657    pub description: Option<String>,
658    pub status: AutomationV2Status,
659    pub schedule: AutomationV2Schedule,
660    #[serde(default)]
661    pub agents: Vec<AutomationAgentProfile>,
662    pub flow: AutomationFlowSpec,
663    pub execution: AutomationExecutionPolicy,
664    #[serde(default)]
665    pub output_targets: Vec<String>,
666    pub created_at_ms: u64,
667    pub updated_at_ms: u64,
668    pub creator_id: String,
669    #[serde(default, skip_serializing_if = "Option::is_none")]
670    pub workspace_root: Option<String>,
671    #[serde(default, skip_serializing_if = "Option::is_none")]
672    pub metadata: Option<Value>,
673    #[serde(default, skip_serializing_if = "Option::is_none")]
674    pub next_fire_at_ms: Option<u64>,
675    #[serde(default, skip_serializing_if = "Option::is_none")]
676    pub last_fired_at_ms: Option<u64>,
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct WorkflowPlanStep {
681    pub step_id: String,
682    pub kind: String,
683    pub objective: String,
684    #[serde(default)]
685    pub depends_on: Vec<String>,
686    pub agent_role: String,
687    #[serde(default)]
688    pub input_refs: Vec<AutomationFlowInputRef>,
689    #[serde(default, skip_serializing_if = "Option::is_none")]
690    pub output_contract: Option<AutomationFlowOutputContract>,
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct WorkflowPlan {
695    pub plan_id: String,
696    pub planner_version: String,
697    pub plan_source: String,
698    pub original_prompt: String,
699    pub normalized_prompt: String,
700    pub confidence: String,
701    pub title: String,
702    #[serde(default, skip_serializing_if = "Option::is_none")]
703    pub description: Option<String>,
704    pub schedule: AutomationV2Schedule,
705    pub execution_target: String,
706    pub workspace_root: String,
707    #[serde(default)]
708    pub steps: Vec<WorkflowPlanStep>,
709    #[serde(default)]
710    pub requires_integrations: Vec<String>,
711    #[serde(default)]
712    pub allowed_mcp_servers: Vec<String>,
713    #[serde(default, skip_serializing_if = "Option::is_none")]
714    pub operator_preferences: Option<Value>,
715    pub save_options: Value,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct WorkflowPlanChatMessage {
720    pub role: String,
721    pub text: String,
722    pub created_at_ms: u64,
723}
724
725#[derive(Debug, Clone, Serialize, Deserialize)]
726pub struct WorkflowPlanConversation {
727    pub conversation_id: String,
728    pub plan_id: String,
729    pub created_at_ms: u64,
730    pub updated_at_ms: u64,
731    #[serde(default)]
732    pub messages: Vec<WorkflowPlanChatMessage>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736pub struct WorkflowPlanDraftRecord {
737    pub initial_plan: WorkflowPlan,
738    pub current_plan: WorkflowPlan,
739    pub conversation: WorkflowPlanConversation,
740    #[serde(default, skip_serializing_if = "Option::is_none")]
741    pub planner_diagnostics: Option<Value>,
742}
743
744#[derive(Debug, Clone, Serialize, Deserialize)]
745pub struct AutomationNodeOutput {
746    pub contract_kind: String,
747    pub summary: String,
748    pub content: Value,
749    pub created_at_ms: u64,
750    pub node_id: String,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
754#[serde(rename_all = "snake_case")]
755pub enum AutomationRunStatus {
756    Queued,
757    Running,
758    Pausing,
759    Paused,
760    Completed,
761    Failed,
762    Cancelled,
763}
764
765#[derive(Debug, Clone, Serialize, Deserialize)]
766pub struct AutomationRunCheckpoint {
767    #[serde(default)]
768    pub completed_nodes: Vec<String>,
769    #[serde(default)]
770    pub pending_nodes: Vec<String>,
771    #[serde(default)]
772    pub node_outputs: std::collections::HashMap<String, Value>,
773    #[serde(default)]
774    pub node_attempts: std::collections::HashMap<String, u32>,
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
778pub struct AutomationV2RunRecord {
779    pub run_id: String,
780    pub automation_id: String,
781    pub trigger_type: String,
782    pub status: AutomationRunStatus,
783    pub created_at_ms: u64,
784    pub updated_at_ms: u64,
785    #[serde(default, skip_serializing_if = "Option::is_none")]
786    pub started_at_ms: Option<u64>,
787    #[serde(default, skip_serializing_if = "Option::is_none")]
788    pub finished_at_ms: Option<u64>,
789    #[serde(default)]
790    pub active_session_ids: Vec<String>,
791    #[serde(default)]
792    pub active_instance_ids: Vec<String>,
793    pub checkpoint: AutomationRunCheckpoint,
794    #[serde(default, skip_serializing_if = "Option::is_none")]
795    pub automation_snapshot: Option<AutomationV2Spec>,
796    #[serde(default, skip_serializing_if = "Option::is_none")]
797    pub pause_reason: Option<String>,
798    #[serde(default, skip_serializing_if = "Option::is_none")]
799    pub resume_reason: Option<String>,
800    #[serde(default, skip_serializing_if = "Option::is_none")]
801    pub detail: Option<String>,
802    #[serde(default)]
803    pub prompt_tokens: u64,
804    #[serde(default)]
805    pub completion_tokens: u64,
806    #[serde(default)]
807    pub total_tokens: u64,
808    #[serde(default)]
809    pub estimated_cost_usd: f64,
810}
811
812#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
813#[serde(rename_all = "snake_case")]
814pub enum BugMonitorProviderPreference {
815    Auto,
816    OfficialGithub,
817    Composio,
818    Arcade,
819}
820
821#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
822#[serde(rename_all = "snake_case")]
823pub enum BugMonitorLabelMode {
824    ReporterOnly,
825}
826
827impl Default for BugMonitorLabelMode {
828    fn default() -> Self {
829        Self::ReporterOnly
830    }
831}
832
833impl Default for BugMonitorProviderPreference {
834    fn default() -> Self {
835        Self::Auto
836    }
837}
838
839#[derive(Debug, Clone, Serialize, Deserialize)]
840pub struct BugMonitorConfig {
841    #[serde(default)]
842    pub enabled: bool,
843    #[serde(default)]
844    pub paused: bool,
845    #[serde(default, skip_serializing_if = "Option::is_none")]
846    pub workspace_root: Option<String>,
847    #[serde(default, skip_serializing_if = "Option::is_none")]
848    pub repo: Option<String>,
849    #[serde(default, skip_serializing_if = "Option::is_none")]
850    pub mcp_server: Option<String>,
851    #[serde(default)]
852    pub provider_preference: BugMonitorProviderPreference,
853    #[serde(default, skip_serializing_if = "Option::is_none")]
854    pub model_policy: Option<Value>,
855    #[serde(default = "default_true")]
856    pub auto_create_new_issues: bool,
857    #[serde(default)]
858    pub require_approval_for_new_issues: bool,
859    #[serde(default = "default_true")]
860    pub auto_comment_on_matched_open_issues: bool,
861    #[serde(default)]
862    pub label_mode: BugMonitorLabelMode,
863    #[serde(default)]
864    pub updated_at_ms: u64,
865}
866
867impl Default for BugMonitorConfig {
868    fn default() -> Self {
869        Self {
870            enabled: false,
871            paused: false,
872            workspace_root: None,
873            repo: None,
874            mcp_server: None,
875            provider_preference: BugMonitorProviderPreference::Auto,
876            model_policy: None,
877            auto_create_new_issues: true,
878            require_approval_for_new_issues: false,
879            auto_comment_on_matched_open_issues: true,
880            label_mode: BugMonitorLabelMode::ReporterOnly,
881            updated_at_ms: 0,
882        }
883    }
884}
885
886#[derive(Debug, Clone, Serialize, Deserialize, Default)]
887pub struct BugMonitorDraftRecord {
888    pub draft_id: String,
889    pub fingerprint: String,
890    pub repo: String,
891    pub status: String,
892    pub created_at_ms: u64,
893    #[serde(default, skip_serializing_if = "Option::is_none")]
894    pub triage_run_id: Option<String>,
895    #[serde(default, skip_serializing_if = "Option::is_none")]
896    pub issue_number: Option<u64>,
897    #[serde(default, skip_serializing_if = "Option::is_none")]
898    pub title: Option<String>,
899    #[serde(default, skip_serializing_if = "Option::is_none")]
900    pub detail: Option<String>,
901    #[serde(default, skip_serializing_if = "Option::is_none")]
902    pub github_status: Option<String>,
903    #[serde(default, skip_serializing_if = "Option::is_none")]
904    pub github_issue_url: Option<String>,
905    #[serde(default, skip_serializing_if = "Option::is_none")]
906    pub github_comment_url: Option<String>,
907    #[serde(default, skip_serializing_if = "Option::is_none")]
908    pub github_posted_at_ms: Option<u64>,
909    #[serde(default, skip_serializing_if = "Option::is_none")]
910    pub matched_issue_number: Option<u64>,
911    #[serde(default, skip_serializing_if = "Option::is_none")]
912    pub matched_issue_state: Option<String>,
913    #[serde(default, skip_serializing_if = "Option::is_none")]
914    pub evidence_digest: Option<String>,
915    #[serde(default, skip_serializing_if = "Option::is_none")]
916    pub last_post_error: Option<String>,
917}
918
919#[derive(Debug, Clone, Serialize, Deserialize, Default)]
920pub struct BugMonitorPostRecord {
921    pub post_id: String,
922    pub draft_id: String,
923    #[serde(default, skip_serializing_if = "Option::is_none")]
924    pub incident_id: Option<String>,
925    pub fingerprint: String,
926    pub repo: String,
927    pub operation: String,
928    pub status: String,
929    #[serde(default, skip_serializing_if = "Option::is_none")]
930    pub issue_number: Option<u64>,
931    #[serde(default, skip_serializing_if = "Option::is_none")]
932    pub issue_url: Option<String>,
933    #[serde(default, skip_serializing_if = "Option::is_none")]
934    pub comment_id: Option<String>,
935    #[serde(default, skip_serializing_if = "Option::is_none")]
936    pub comment_url: Option<String>,
937    #[serde(default, skip_serializing_if = "Option::is_none")]
938    pub evidence_digest: Option<String>,
939    pub idempotency_key: String,
940    #[serde(default, skip_serializing_if = "Option::is_none")]
941    pub response_excerpt: Option<String>,
942    #[serde(default, skip_serializing_if = "Option::is_none")]
943    pub error: Option<String>,
944    pub created_at_ms: u64,
945    pub updated_at_ms: u64,
946}
947
948#[derive(Debug, Clone, Serialize, Deserialize, Default)]
949pub struct BugMonitorIncidentRecord {
950    pub incident_id: String,
951    pub fingerprint: String,
952    pub event_type: String,
953    pub status: String,
954    pub repo: String,
955    pub workspace_root: String,
956    pub title: String,
957    #[serde(default, skip_serializing_if = "Option::is_none")]
958    pub detail: Option<String>,
959    #[serde(default)]
960    pub excerpt: Vec<String>,
961    #[serde(default, skip_serializing_if = "Option::is_none")]
962    pub source: Option<String>,
963    #[serde(default, skip_serializing_if = "Option::is_none")]
964    pub run_id: Option<String>,
965    #[serde(default, skip_serializing_if = "Option::is_none")]
966    pub session_id: Option<String>,
967    #[serde(default, skip_serializing_if = "Option::is_none")]
968    pub correlation_id: Option<String>,
969    #[serde(default, skip_serializing_if = "Option::is_none")]
970    pub component: Option<String>,
971    #[serde(default, skip_serializing_if = "Option::is_none")]
972    pub level: Option<String>,
973    #[serde(default)]
974    pub occurrence_count: u64,
975    pub created_at_ms: u64,
976    pub updated_at_ms: u64,
977    #[serde(default, skip_serializing_if = "Option::is_none")]
978    pub last_seen_at_ms: Option<u64>,
979    #[serde(default, skip_serializing_if = "Option::is_none")]
980    pub draft_id: Option<String>,
981    #[serde(default, skip_serializing_if = "Option::is_none")]
982    pub triage_run_id: Option<String>,
983    #[serde(default, skip_serializing_if = "Option::is_none")]
984    pub last_error: Option<String>,
985    #[serde(default, skip_serializing_if = "Option::is_none")]
986    pub duplicate_summary: Option<Value>,
987    #[serde(default, skip_serializing_if = "Option::is_none")]
988    pub duplicate_matches: Option<Vec<Value>>,
989    #[serde(default, skip_serializing_if = "Option::is_none")]
990    pub event_payload: Option<Value>,
991}
992
993#[derive(Debug, Clone, Serialize, Deserialize, Default)]
994pub struct BugMonitorRuntimeStatus {
995    #[serde(default)]
996    pub monitoring_active: bool,
997    #[serde(default)]
998    pub paused: bool,
999    #[serde(default)]
1000    pub pending_incidents: usize,
1001    #[serde(default)]
1002    pub total_incidents: usize,
1003    #[serde(default, skip_serializing_if = "Option::is_none")]
1004    pub last_processed_at_ms: Option<u64>,
1005    #[serde(default, skip_serializing_if = "Option::is_none")]
1006    pub last_incident_event_type: Option<String>,
1007    #[serde(default, skip_serializing_if = "Option::is_none")]
1008    pub last_runtime_error: Option<String>,
1009    #[serde(default, skip_serializing_if = "Option::is_none")]
1010    pub last_post_result: Option<String>,
1011    #[serde(default)]
1012    pub pending_posts: usize,
1013}
1014
1015#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1016pub struct BugMonitorSubmission {
1017    #[serde(default, skip_serializing_if = "Option::is_none")]
1018    pub repo: Option<String>,
1019    #[serde(default, skip_serializing_if = "Option::is_none")]
1020    pub title: Option<String>,
1021    #[serde(default, skip_serializing_if = "Option::is_none")]
1022    pub detail: Option<String>,
1023    #[serde(default, skip_serializing_if = "Option::is_none")]
1024    pub source: Option<String>,
1025    #[serde(default, skip_serializing_if = "Option::is_none")]
1026    pub run_id: Option<String>,
1027    #[serde(default, skip_serializing_if = "Option::is_none")]
1028    pub session_id: Option<String>,
1029    #[serde(default, skip_serializing_if = "Option::is_none")]
1030    pub correlation_id: Option<String>,
1031    #[serde(default, skip_serializing_if = "Option::is_none")]
1032    pub file_name: Option<String>,
1033    #[serde(default, skip_serializing_if = "Option::is_none")]
1034    pub process: Option<String>,
1035    #[serde(default, skip_serializing_if = "Option::is_none")]
1036    pub component: Option<String>,
1037    #[serde(default, skip_serializing_if = "Option::is_none")]
1038    pub event: Option<String>,
1039    #[serde(default, skip_serializing_if = "Option::is_none")]
1040    pub level: Option<String>,
1041    #[serde(default)]
1042    pub excerpt: Vec<String>,
1043    #[serde(default, skip_serializing_if = "Option::is_none")]
1044    pub fingerprint: Option<String>,
1045}
1046
1047#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1048pub struct BugMonitorCapabilityReadiness {
1049    #[serde(default)]
1050    pub github_list_issues: bool,
1051    #[serde(default)]
1052    pub github_get_issue: bool,
1053    #[serde(default)]
1054    pub github_create_issue: bool,
1055    #[serde(default)]
1056    pub github_comment_on_issue: bool,
1057}
1058
1059#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1060pub struct BugMonitorCapabilityMatch {
1061    pub capability_id: String,
1062    pub provider: String,
1063    pub tool_name: String,
1064    pub binding_index: usize,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1068pub struct BugMonitorBindingCandidate {
1069    pub capability_id: String,
1070    pub binding_tool_name: String,
1071    #[serde(default)]
1072    pub aliases: Vec<String>,
1073    #[serde(default)]
1074    pub matched: bool,
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1078pub struct BugMonitorReadiness {
1079    #[serde(default)]
1080    pub config_valid: bool,
1081    #[serde(default)]
1082    pub repo_valid: bool,
1083    #[serde(default)]
1084    pub mcp_server_present: bool,
1085    #[serde(default)]
1086    pub mcp_connected: bool,
1087    #[serde(default)]
1088    pub github_read_ready: bool,
1089    #[serde(default)]
1090    pub github_write_ready: bool,
1091    #[serde(default)]
1092    pub selected_model_ready: bool,
1093    #[serde(default)]
1094    pub ingest_ready: bool,
1095    #[serde(default)]
1096    pub publish_ready: bool,
1097    #[serde(default)]
1098    pub runtime_ready: bool,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1102pub struct BugMonitorStatus {
1103    pub config: BugMonitorConfig,
1104    pub readiness: BugMonitorReadiness,
1105    #[serde(default)]
1106    pub runtime: BugMonitorRuntimeStatus,
1107    pub required_capabilities: BugMonitorCapabilityReadiness,
1108    #[serde(default)]
1109    pub missing_required_capabilities: Vec<String>,
1110    #[serde(default)]
1111    pub resolved_capabilities: Vec<BugMonitorCapabilityMatch>,
1112    #[serde(default)]
1113    pub discovered_mcp_tools: Vec<String>,
1114    #[serde(default)]
1115    pub selected_server_binding_candidates: Vec<BugMonitorBindingCandidate>,
1116    #[serde(default, skip_serializing_if = "Option::is_none")]
1117    pub binding_source_version: Option<String>,
1118    #[serde(default, skip_serializing_if = "Option::is_none")]
1119    pub bindings_last_merged_at_ms: Option<u64>,
1120    #[serde(default, skip_serializing_if = "Option::is_none")]
1121    pub selected_model: Option<ModelSpec>,
1122    #[serde(default)]
1123    pub pending_drafts: usize,
1124    #[serde(default)]
1125    pub pending_posts: usize,
1126    #[serde(default, skip_serializing_if = "Option::is_none")]
1127    pub last_activity_at_ms: Option<u64>,
1128    #[serde(default, skip_serializing_if = "Option::is_none")]
1129    pub last_error: Option<String>,
1130}
1131
1132#[derive(Debug, Clone, Serialize)]
1133pub struct ResourceConflict {
1134    pub key: String,
1135    pub expected_rev: Option<u64>,
1136    pub current_rev: Option<u64>,
1137}
1138
1139#[derive(Debug, Clone, Serialize)]
1140#[serde(tag = "type", rename_all = "snake_case")]
1141pub enum ResourceStoreError {
1142    InvalidKey { key: String },
1143    RevisionConflict(ResourceConflict),
1144    PersistFailed { message: String },
1145}
1146
1147#[derive(Debug, Clone, Serialize)]
1148#[serde(tag = "type", rename_all = "snake_case")]
1149pub enum RoutineStoreError {
1150    InvalidRoutineId { routine_id: String },
1151    InvalidSchedule { detail: String },
1152    PersistFailed { message: String },
1153}
1154
1155#[derive(Debug, Clone)]
1156pub enum StartupStatus {
1157    Starting,
1158    Ready,
1159    Failed,
1160}
1161
1162#[derive(Debug, Clone)]
1163pub struct StartupState {
1164    pub status: StartupStatus,
1165    pub phase: String,
1166    pub started_at_ms: u64,
1167    pub attempt_id: String,
1168    pub last_error: Option<String>,
1169}
1170
1171#[derive(Debug, Clone)]
1172pub struct StartupSnapshot {
1173    pub status: StartupStatus,
1174    pub phase: String,
1175    pub started_at_ms: u64,
1176    pub attempt_id: String,
1177    pub last_error: Option<String>,
1178    pub elapsed_ms: u64,
1179}
1180
1181#[derive(Clone)]
1182pub struct AppState {
1183    pub runtime: Arc<OnceLock<RuntimeState>>,
1184    pub startup: Arc<RwLock<StartupState>>,
1185    pub in_process_mode: Arc<AtomicBool>,
1186    pub api_token: Arc<RwLock<Option<String>>>,
1187    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
1188    pub run_registry: RunRegistry,
1189    pub run_stale_ms: u64,
1190    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
1191    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
1192    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
1193    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
1194    pub shared_resources_path: PathBuf,
1195    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
1196    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
1197    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
1198    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
1199    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
1200    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
1201    pub workflow_plan_drafts:
1202        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
1203    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
1204    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
1205    pub bug_monitor_incidents:
1206        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
1207    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
1208    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
1209    pub workflows: Arc<RwLock<WorkflowRegistry>>,
1210    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
1211    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
1212    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
1213    pub routine_session_policies:
1214        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
1215    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
1216    pub token_cost_per_1k_usd: f64,
1217    pub routines_path: PathBuf,
1218    pub routine_history_path: PathBuf,
1219    pub routine_runs_path: PathBuf,
1220    pub automations_v2_path: PathBuf,
1221    pub automation_v2_runs_path: PathBuf,
1222    pub bug_monitor_config_path: PathBuf,
1223    pub bug_monitor_drafts_path: PathBuf,
1224    pub bug_monitor_incidents_path: PathBuf,
1225    pub bug_monitor_posts_path: PathBuf,
1226    pub workflow_runs_path: PathBuf,
1227    pub workflow_hook_overrides_path: PathBuf,
1228    pub agent_teams: AgentTeamRuntime,
1229    pub web_ui_enabled: Arc<AtomicBool>,
1230    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
1231    pub server_base_url: Arc<std::sync::RwLock<String>>,
1232    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
1233    pub host_runtime_context: HostRuntimeContext,
1234    pub pack_manager: Arc<PackManager>,
1235    pub capability_resolver: Arc<CapabilityResolver>,
1236    pub preset_registry: Arc<PresetRegistry>,
1237}
1238
1239#[derive(Debug, Clone)]
1240struct StatusIndexUpdate {
1241    key: String,
1242    value: Value,
1243}
1244
1245impl AppState {
1246    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
1247        Self {
1248            runtime: Arc::new(OnceLock::new()),
1249            startup: Arc::new(RwLock::new(StartupState {
1250                status: StartupStatus::Starting,
1251                phase: "boot".to_string(),
1252                started_at_ms: now_ms(),
1253                attempt_id,
1254                last_error: None,
1255            })),
1256            in_process_mode: Arc::new(AtomicBool::new(in_process)),
1257            api_token: Arc::new(RwLock::new(None)),
1258            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
1259            run_registry: RunRegistry::new(),
1260            run_stale_ms: resolve_run_stale_ms(),
1261            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
1262            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
1263            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
1264            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
1265            shared_resources_path: resolve_shared_resources_path(),
1266            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
1267            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
1268            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1269            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
1270            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1271            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
1272            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1273            bug_monitor_config: Arc::new(RwLock::new(resolve_bug_monitor_env_config())),
1274            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1275            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
1276            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
1277            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
1278            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
1279            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1280            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
1281            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
1282            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
1283            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
1284            routines_path: resolve_routines_path(),
1285            routine_history_path: resolve_routine_history_path(),
1286            routine_runs_path: resolve_routine_runs_path(),
1287            automations_v2_path: resolve_automations_v2_path(),
1288            automation_v2_runs_path: resolve_automation_v2_runs_path(),
1289            bug_monitor_config_path: resolve_bug_monitor_config_path(),
1290            bug_monitor_drafts_path: resolve_bug_monitor_drafts_path(),
1291            bug_monitor_incidents_path: resolve_bug_monitor_incidents_path(),
1292            bug_monitor_posts_path: resolve_bug_monitor_posts_path(),
1293            workflow_runs_path: resolve_workflow_runs_path(),
1294            workflow_hook_overrides_path: resolve_workflow_hook_overrides_path(),
1295            agent_teams: AgentTeamRuntime::new(resolve_agent_team_audit_path()),
1296            web_ui_enabled: Arc::new(AtomicBool::new(false)),
1297            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
1298            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
1299            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
1300            host_runtime_context: detect_host_runtime_context(),
1301            token_cost_per_1k_usd: resolve_token_cost_per_1k_usd(),
1302            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
1303            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
1304            preset_registry: Arc::new(PresetRegistry::new(
1305                PackManager::default_root(),
1306                resolve_shared_paths()
1307                    .map(|paths| paths.canonical_root)
1308                    .unwrap_or_else(|_| {
1309                        dirs::home_dir()
1310                            .unwrap_or_else(|| PathBuf::from("."))
1311                            .join(".tandem")
1312                    }),
1313            )),
1314        }
1315    }
1316
1317    pub fn is_ready(&self) -> bool {
1318        self.runtime.get().is_some()
1319    }
1320
1321    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
1322        for _ in 0..attempts {
1323            if self.is_ready() {
1324                return true;
1325            }
1326            let startup = self.startup_snapshot().await;
1327            if matches!(startup.status, StartupStatus::Failed) {
1328                return false;
1329            }
1330            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
1331        }
1332        self.is_ready()
1333    }
1334
1335    pub fn mode_label(&self) -> &'static str {
1336        if self.in_process_mode.load(Ordering::Relaxed) {
1337            "in-process"
1338        } else {
1339            "sidecar"
1340        }
1341    }
1342
1343    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
1344        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
1345        if let Ok(mut guard) = self.web_ui_prefix.write() {
1346            *guard = normalize_web_ui_prefix(&prefix);
1347        }
1348    }
1349
1350    pub fn web_ui_enabled(&self) -> bool {
1351        self.web_ui_enabled.load(Ordering::Relaxed)
1352    }
1353
1354    pub fn web_ui_prefix(&self) -> String {
1355        self.web_ui_prefix
1356            .read()
1357            .map(|v| v.clone())
1358            .unwrap_or_else(|_| "/admin".to_string())
1359    }
1360
1361    pub fn set_server_base_url(&self, base_url: String) {
1362        if let Ok(mut guard) = self.server_base_url.write() {
1363            *guard = base_url;
1364        }
1365    }
1366
1367    pub fn server_base_url(&self) -> String {
1368        self.server_base_url
1369            .read()
1370            .map(|v| v.clone())
1371            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
1372    }
1373
1374    pub async fn api_token(&self) -> Option<String> {
1375        self.api_token.read().await.clone()
1376    }
1377
1378    pub async fn set_api_token(&self, token: Option<String>) {
1379        *self.api_token.write().await = token;
1380    }
1381
1382    pub async fn startup_snapshot(&self) -> StartupSnapshot {
1383        let state = self.startup.read().await.clone();
1384        StartupSnapshot {
1385            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
1386            status: state.status,
1387            phase: state.phase,
1388            started_at_ms: state.started_at_ms,
1389            attempt_id: state.attempt_id,
1390            last_error: state.last_error,
1391        }
1392    }
1393
1394    pub fn host_runtime_context(&self) -> HostRuntimeContext {
1395        self.runtime
1396            .get()
1397            .map(|runtime| runtime.host_runtime_context.clone())
1398            .unwrap_or_else(|| self.host_runtime_context.clone())
1399    }
1400
1401    pub async fn set_phase(&self, phase: impl Into<String>) {
1402        let mut startup = self.startup.write().await;
1403        startup.phase = phase.into();
1404    }
1405
1406    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
1407        self.runtime
1408            .set(runtime)
1409            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
1410        self.register_browser_tools().await?;
1411        self.tools
1412            .register_tool(
1413                "pack_builder".to_string(),
1414                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
1415            )
1416            .await;
1417        self.engine_loop
1418            .set_spawn_agent_hook(std::sync::Arc::new(
1419                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
1420            ))
1421            .await;
1422        self.engine_loop
1423            .set_tool_policy_hook(std::sync::Arc::new(
1424                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
1425            ))
1426            .await;
1427        self.engine_loop
1428            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
1429                self.clone(),
1430            )))
1431            .await;
1432        let _ = self.load_shared_resources().await;
1433        self.load_routines().await?;
1434        let _ = self.load_routine_history().await;
1435        let _ = self.load_routine_runs().await;
1436        self.load_automations_v2().await?;
1437        let _ = self.load_automation_v2_runs().await;
1438        let _ = self.load_bug_monitor_config().await;
1439        let _ = self.load_bug_monitor_drafts().await;
1440        let _ = self.load_bug_monitor_incidents().await;
1441        let _ = self.load_bug_monitor_posts().await;
1442        let _ = self.load_workflow_runs().await;
1443        let _ = self.load_workflow_hook_overrides().await;
1444        let _ = self.reload_workflows().await;
1445        let workspace_root = self.workspace_index.snapshot().await.root;
1446        let _ = self
1447            .agent_teams
1448            .ensure_loaded_for_workspace(&workspace_root)
1449            .await;
1450        let mut startup = self.startup.write().await;
1451        startup.status = StartupStatus::Ready;
1452        startup.phase = "ready".to_string();
1453        startup.last_error = None;
1454        Ok(())
1455    }
1456
1457    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
1458        let mut startup = self.startup.write().await;
1459        startup.status = StartupStatus::Failed;
1460        startup.phase = phase.into();
1461        startup.last_error = Some(error.into());
1462    }
1463
1464    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
1465        let runtime = self.channels_runtime.lock().await;
1466        runtime.statuses.clone()
1467    }
1468
1469    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
1470        let effective = self.config.get_effective_value().await;
1471        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
1472        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
1473
1474        let mut runtime = self.channels_runtime.lock().await;
1475        if let Some(listeners) = runtime.listeners.as_mut() {
1476            listeners.abort_all();
1477        }
1478        runtime.listeners = None;
1479        runtime.statuses.clear();
1480
1481        let mut status_map = std::collections::HashMap::new();
1482        status_map.insert(
1483            "telegram".to_string(),
1484            ChannelStatus {
1485                enabled: parsed.channels.telegram.is_some(),
1486                connected: false,
1487                last_error: None,
1488                active_sessions: 0,
1489                meta: serde_json::json!({}),
1490            },
1491        );
1492        status_map.insert(
1493            "discord".to_string(),
1494            ChannelStatus {
1495                enabled: parsed.channels.discord.is_some(),
1496                connected: false,
1497                last_error: None,
1498                active_sessions: 0,
1499                meta: serde_json::json!({}),
1500            },
1501        );
1502        status_map.insert(
1503            "slack".to_string(),
1504            ChannelStatus {
1505                enabled: parsed.channels.slack.is_some(),
1506                connected: false,
1507                last_error: None,
1508                active_sessions: 0,
1509                meta: serde_json::json!({}),
1510            },
1511        );
1512
1513        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
1514            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
1515            runtime.listeners = Some(listeners);
1516            for status in status_map.values_mut() {
1517                if status.enabled {
1518                    status.connected = true;
1519                }
1520            }
1521        }
1522
1523        runtime.statuses = status_map.clone();
1524        drop(runtime);
1525
1526        self.event_bus.publish(EngineEvent::new(
1527            "channel.status.changed",
1528            serde_json::json!({ "channels": status_map }),
1529        ));
1530        Ok(())
1531    }
1532
1533    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
1534        if !self.shared_resources_path.exists() {
1535            return Ok(());
1536        }
1537        let raw = fs::read_to_string(&self.shared_resources_path).await?;
1538        let parsed =
1539            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
1540                .unwrap_or_default();
1541        let mut guard = self.shared_resources.write().await;
1542        *guard = parsed;
1543        Ok(())
1544    }
1545
1546    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
1547        if let Some(parent) = self.shared_resources_path.parent() {
1548            fs::create_dir_all(parent).await?;
1549        }
1550        let payload = {
1551            let guard = self.shared_resources.read().await;
1552            serde_json::to_string_pretty(&*guard)?
1553        };
1554        fs::write(&self.shared_resources_path, payload).await?;
1555        Ok(())
1556    }
1557
1558    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
1559        self.shared_resources.read().await.get(key).cloned()
1560    }
1561
1562    pub async fn list_shared_resources(
1563        &self,
1564        prefix: Option<&str>,
1565        limit: usize,
1566    ) -> Vec<SharedResourceRecord> {
1567        let limit = limit.clamp(1, 500);
1568        let mut rows = self
1569            .shared_resources
1570            .read()
1571            .await
1572            .values()
1573            .filter(|record| {
1574                if let Some(prefix) = prefix {
1575                    record.key.starts_with(prefix)
1576                } else {
1577                    true
1578                }
1579            })
1580            .cloned()
1581            .collect::<Vec<_>>();
1582        rows.sort_by(|a, b| a.key.cmp(&b.key));
1583        rows.truncate(limit);
1584        rows
1585    }
1586
1587    pub async fn put_shared_resource(
1588        &self,
1589        key: String,
1590        value: Value,
1591        if_match_rev: Option<u64>,
1592        updated_by: String,
1593        ttl_ms: Option<u64>,
1594    ) -> Result<SharedResourceRecord, ResourceStoreError> {
1595        if !is_valid_resource_key(&key) {
1596            return Err(ResourceStoreError::InvalidKey { key });
1597        }
1598
1599        let now = now_ms();
1600        let mut guard = self.shared_resources.write().await;
1601        let existing = guard.get(&key).cloned();
1602
1603        if let Some(expected) = if_match_rev {
1604            let current = existing.as_ref().map(|row| row.rev);
1605            if current != Some(expected) {
1606                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1607                    key,
1608                    expected_rev: Some(expected),
1609                    current_rev: current,
1610                }));
1611            }
1612        }
1613
1614        let next_rev = existing
1615            .as_ref()
1616            .map(|row| row.rev.saturating_add(1))
1617            .unwrap_or(1);
1618
1619        let record = SharedResourceRecord {
1620            key: key.clone(),
1621            value,
1622            rev: next_rev,
1623            updated_at_ms: now,
1624            updated_by,
1625            ttl_ms,
1626        };
1627
1628        let previous = guard.insert(key.clone(), record.clone());
1629        drop(guard);
1630
1631        if let Err(error) = self.persist_shared_resources().await {
1632            let mut rollback = self.shared_resources.write().await;
1633            if let Some(previous) = previous {
1634                rollback.insert(key, previous);
1635            } else {
1636                rollback.remove(&key);
1637            }
1638            return Err(ResourceStoreError::PersistFailed {
1639                message: error.to_string(),
1640            });
1641        }
1642
1643        Ok(record)
1644    }
1645
1646    pub async fn delete_shared_resource(
1647        &self,
1648        key: &str,
1649        if_match_rev: Option<u64>,
1650    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
1651        if !is_valid_resource_key(key) {
1652            return Err(ResourceStoreError::InvalidKey {
1653                key: key.to_string(),
1654            });
1655        }
1656
1657        let mut guard = self.shared_resources.write().await;
1658        let current = guard.get(key).cloned();
1659        if let Some(expected) = if_match_rev {
1660            let current_rev = current.as_ref().map(|row| row.rev);
1661            if current_rev != Some(expected) {
1662                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
1663                    key: key.to_string(),
1664                    expected_rev: Some(expected),
1665                    current_rev,
1666                }));
1667            }
1668        }
1669
1670        let removed = guard.remove(key);
1671        drop(guard);
1672
1673        if let Err(error) = self.persist_shared_resources().await {
1674            if let Some(record) = removed.clone() {
1675                self.shared_resources
1676                    .write()
1677                    .await
1678                    .insert(record.key.clone(), record);
1679            }
1680            return Err(ResourceStoreError::PersistFailed {
1681                message: error.to_string(),
1682            });
1683        }
1684
1685        Ok(removed)
1686    }
1687
1688    pub async fn load_routines(&self) -> anyhow::Result<()> {
1689        if !self.routines_path.exists() {
1690            return Ok(());
1691        }
1692        let raw = fs::read_to_string(&self.routines_path).await?;
1693        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
1694            Ok(parsed) => {
1695                let mut guard = self.routines.write().await;
1696                *guard = parsed;
1697                Ok(())
1698            }
1699            Err(primary_err) => {
1700                let backup_path = sibling_backup_path(&self.routines_path);
1701                if backup_path.exists() {
1702                    let backup_raw = fs::read_to_string(&backup_path).await?;
1703                    if let Ok(parsed_backup) = serde_json::from_str::<
1704                        std::collections::HashMap<String, RoutineSpec>,
1705                    >(&backup_raw)
1706                    {
1707                        let mut guard = self.routines.write().await;
1708                        *guard = parsed_backup;
1709                        return Ok(());
1710                    }
1711                }
1712                Err(anyhow::anyhow!(
1713                    "failed to parse routines store {}: {primary_err}",
1714                    self.routines_path.display()
1715                ))
1716            }
1717        }
1718    }
1719
1720    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
1721        if !self.routine_history_path.exists() {
1722            return Ok(());
1723        }
1724        let raw = fs::read_to_string(&self.routine_history_path).await?;
1725        let parsed = serde_json::from_str::<
1726            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
1727        >(&raw)
1728        .unwrap_or_default();
1729        let mut guard = self.routine_history.write().await;
1730        *guard = parsed;
1731        Ok(())
1732    }
1733
1734    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
1735        if !self.routine_runs_path.exists() {
1736            return Ok(());
1737        }
1738        let raw = fs::read_to_string(&self.routine_runs_path).await?;
1739        let parsed =
1740            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
1741                .unwrap_or_default();
1742        let mut guard = self.routine_runs.write().await;
1743        *guard = parsed;
1744        Ok(())
1745    }
1746
1747    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
1748        if let Some(parent) = self.routines_path.parent() {
1749            fs::create_dir_all(parent).await?;
1750        }
1751        let (payload, is_empty) = {
1752            let guard = self.routines.read().await;
1753            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
1754        };
1755        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
1756            let existing_raw = fs::read_to_string(&self.routines_path)
1757                .await
1758                .unwrap_or_default();
1759            let existing_has_rows = serde_json::from_str::<
1760                std::collections::HashMap<String, RoutineSpec>,
1761            >(&existing_raw)
1762            .map(|rows| !rows.is_empty())
1763            .unwrap_or(true);
1764            if existing_has_rows {
1765                return Err(anyhow::anyhow!(
1766                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
1767                    self.routines_path.display()
1768                ));
1769            }
1770        }
1771        let backup_path = sibling_backup_path(&self.routines_path);
1772        if self.routines_path.exists() {
1773            let _ = fs::copy(&self.routines_path, &backup_path).await;
1774        }
1775        let tmp_path = sibling_tmp_path(&self.routines_path);
1776        fs::write(&tmp_path, payload).await?;
1777        fs::rename(&tmp_path, &self.routines_path).await?;
1778        Ok(())
1779    }
1780
1781    pub async fn persist_routines(&self) -> anyhow::Result<()> {
1782        self.persist_routines_inner(false).await
1783    }
1784
1785    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
1786        if let Some(parent) = self.routine_history_path.parent() {
1787            fs::create_dir_all(parent).await?;
1788        }
1789        let payload = {
1790            let guard = self.routine_history.read().await;
1791            serde_json::to_string_pretty(&*guard)?
1792        };
1793        fs::write(&self.routine_history_path, payload).await?;
1794        Ok(())
1795    }
1796
1797    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
1798        if let Some(parent) = self.routine_runs_path.parent() {
1799            fs::create_dir_all(parent).await?;
1800        }
1801        let payload = {
1802            let guard = self.routine_runs.read().await;
1803            serde_json::to_string_pretty(&*guard)?
1804        };
1805        fs::write(&self.routine_runs_path, payload).await?;
1806        Ok(())
1807    }
1808
1809    pub async fn put_routine(
1810        &self,
1811        mut routine: RoutineSpec,
1812    ) -> Result<RoutineSpec, RoutineStoreError> {
1813        if routine.routine_id.trim().is_empty() {
1814            return Err(RoutineStoreError::InvalidRoutineId {
1815                routine_id: routine.routine_id,
1816            });
1817        }
1818
1819        routine.allowed_tools = normalize_allowed_tools(routine.allowed_tools);
1820        routine.output_targets = normalize_non_empty_list(routine.output_targets);
1821
1822        let now = now_ms();
1823        let next_schedule_fire =
1824            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
1825                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
1826                    detail: "invalid schedule or timezone".to_string(),
1827                })?;
1828        match routine.schedule {
1829            RoutineSchedule::IntervalSeconds { seconds } => {
1830                if seconds == 0 {
1831                    return Err(RoutineStoreError::InvalidSchedule {
1832                        detail: "interval_seconds must be > 0".to_string(),
1833                    });
1834                }
1835                let _ = seconds;
1836            }
1837            RoutineSchedule::Cron { .. } => {}
1838        }
1839        if routine.next_fire_at_ms.is_none() {
1840            routine.next_fire_at_ms = Some(next_schedule_fire);
1841        }
1842
1843        let mut guard = self.routines.write().await;
1844        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
1845        drop(guard);
1846
1847        if let Err(error) = self.persist_routines().await {
1848            let mut rollback = self.routines.write().await;
1849            if let Some(previous) = previous {
1850                rollback.insert(previous.routine_id.clone(), previous);
1851            } else {
1852                rollback.remove(&routine.routine_id);
1853            }
1854            return Err(RoutineStoreError::PersistFailed {
1855                message: error.to_string(),
1856            });
1857        }
1858
1859        Ok(routine)
1860    }
1861
1862    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
1863        let mut rows = self
1864            .routines
1865            .read()
1866            .await
1867            .values()
1868            .cloned()
1869            .collect::<Vec<_>>();
1870        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
1871        rows
1872    }
1873
1874    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
1875        self.routines.read().await.get(routine_id).cloned()
1876    }
1877
1878    pub async fn delete_routine(
1879        &self,
1880        routine_id: &str,
1881    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
1882        let mut guard = self.routines.write().await;
1883        let removed = guard.remove(routine_id);
1884        drop(guard);
1885
1886        let allow_empty_overwrite = self.routines.read().await.is_empty();
1887        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
1888            if let Some(removed) = removed.clone() {
1889                self.routines
1890                    .write()
1891                    .await
1892                    .insert(removed.routine_id.clone(), removed);
1893            }
1894            return Err(RoutineStoreError::PersistFailed {
1895                message: error.to_string(),
1896            });
1897        }
1898        Ok(removed)
1899    }
1900
1901    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
1902        let mut plans = Vec::new();
1903        let mut guard = self.routines.write().await;
1904        for routine in guard.values_mut() {
1905            if routine.status != RoutineStatus::Active {
1906                continue;
1907            }
1908            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
1909                continue;
1910            };
1911            if now_ms < next_fire_at_ms {
1912                continue;
1913            }
1914            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
1915                now_ms,
1916                next_fire_at_ms,
1917                &routine.schedule,
1918                &routine.timezone,
1919                &routine.misfire_policy,
1920            );
1921            routine.next_fire_at_ms = Some(next_fire_at_ms);
1922            if run_count == 0 {
1923                continue;
1924            }
1925            plans.push(RoutineTriggerPlan {
1926                routine_id: routine.routine_id.clone(),
1927                run_count,
1928                scheduled_at_ms: now_ms,
1929                next_fire_at_ms,
1930            });
1931        }
1932        drop(guard);
1933        let _ = self.persist_routines().await;
1934        plans
1935    }
1936
1937    pub async fn mark_routine_fired(
1938        &self,
1939        routine_id: &str,
1940        fired_at_ms: u64,
1941    ) -> Option<RoutineSpec> {
1942        let mut guard = self.routines.write().await;
1943        let routine = guard.get_mut(routine_id)?;
1944        routine.last_fired_at_ms = Some(fired_at_ms);
1945        let updated = routine.clone();
1946        drop(guard);
1947        let _ = self.persist_routines().await;
1948        Some(updated)
1949    }
1950
1951    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
1952        let mut history = self.routine_history.write().await;
1953        history
1954            .entry(event.routine_id.clone())
1955            .or_default()
1956            .push(event);
1957        drop(history);
1958        let _ = self.persist_routine_history().await;
1959    }
1960
1961    pub async fn list_routine_history(
1962        &self,
1963        routine_id: &str,
1964        limit: usize,
1965    ) -> Vec<RoutineHistoryEvent> {
1966        let limit = limit.clamp(1, 500);
1967        let mut rows = self
1968            .routine_history
1969            .read()
1970            .await
1971            .get(routine_id)
1972            .cloned()
1973            .unwrap_or_default();
1974        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1975        rows.truncate(limit);
1976        rows
1977    }
1978
1979    pub async fn create_routine_run(
1980        &self,
1981        routine: &RoutineSpec,
1982        trigger_type: &str,
1983        run_count: u32,
1984        status: RoutineRunStatus,
1985        detail: Option<String>,
1986    ) -> RoutineRunRecord {
1987        let now = now_ms();
1988        let record = RoutineRunRecord {
1989            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1990            routine_id: routine.routine_id.clone(),
1991            trigger_type: trigger_type.to_string(),
1992            run_count,
1993            status,
1994            created_at_ms: now,
1995            updated_at_ms: now,
1996            fired_at_ms: Some(now),
1997            started_at_ms: None,
1998            finished_at_ms: None,
1999            requires_approval: routine.requires_approval,
2000            approval_reason: None,
2001            denial_reason: None,
2002            paused_reason: None,
2003            detail,
2004            entrypoint: routine.entrypoint.clone(),
2005            args: routine.args.clone(),
2006            allowed_tools: routine.allowed_tools.clone(),
2007            output_targets: routine.output_targets.clone(),
2008            artifacts: Vec::new(),
2009            active_session_ids: Vec::new(),
2010            latest_session_id: None,
2011            prompt_tokens: 0,
2012            completion_tokens: 0,
2013            total_tokens: 0,
2014            estimated_cost_usd: 0.0,
2015        };
2016        self.routine_runs
2017            .write()
2018            .await
2019            .insert(record.run_id.clone(), record.clone());
2020        let _ = self.persist_routine_runs().await;
2021        record
2022    }
2023
2024    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
2025        self.routine_runs.read().await.get(run_id).cloned()
2026    }
2027
2028    pub async fn list_routine_runs(
2029        &self,
2030        routine_id: Option<&str>,
2031        limit: usize,
2032    ) -> Vec<RoutineRunRecord> {
2033        let mut rows = self
2034            .routine_runs
2035            .read()
2036            .await
2037            .values()
2038            .filter(|row| {
2039                if let Some(id) = routine_id {
2040                    row.routine_id == id
2041                } else {
2042                    true
2043                }
2044            })
2045            .cloned()
2046            .collect::<Vec<_>>();
2047        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2048        rows.truncate(limit.clamp(1, 500));
2049        rows
2050    }
2051
2052    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
2053        let mut guard = self.routine_runs.write().await;
2054        let next_run_id = guard
2055            .values()
2056            .filter(|row| row.status == RoutineRunStatus::Queued)
2057            .min_by(|a, b| {
2058                a.created_at_ms
2059                    .cmp(&b.created_at_ms)
2060                    .then_with(|| a.run_id.cmp(&b.run_id))
2061            })
2062            .map(|row| row.run_id.clone())?;
2063        let now = now_ms();
2064        let row = guard.get_mut(&next_run_id)?;
2065        row.status = RoutineRunStatus::Running;
2066        row.updated_at_ms = now;
2067        row.started_at_ms = Some(now);
2068        let claimed = row.clone();
2069        drop(guard);
2070        let _ = self.persist_routine_runs().await;
2071        Some(claimed)
2072    }
2073
2074    pub async fn set_routine_session_policy(
2075        &self,
2076        session_id: String,
2077        run_id: String,
2078        routine_id: String,
2079        allowed_tools: Vec<String>,
2080    ) {
2081        let policy = RoutineSessionPolicy {
2082            session_id: session_id.clone(),
2083            run_id,
2084            routine_id,
2085            allowed_tools: normalize_allowed_tools(allowed_tools),
2086        };
2087        self.routine_session_policies
2088            .write()
2089            .await
2090            .insert(session_id, policy);
2091    }
2092
2093    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
2094        self.routine_session_policies
2095            .read()
2096            .await
2097            .get(session_id)
2098            .cloned()
2099    }
2100
2101    pub async fn clear_routine_session_policy(&self, session_id: &str) {
2102        self.routine_session_policies
2103            .write()
2104            .await
2105            .remove(session_id);
2106    }
2107
2108    pub async fn update_routine_run_status(
2109        &self,
2110        run_id: &str,
2111        status: RoutineRunStatus,
2112        reason: Option<String>,
2113    ) -> Option<RoutineRunRecord> {
2114        let mut guard = self.routine_runs.write().await;
2115        let row = guard.get_mut(run_id)?;
2116        row.status = status.clone();
2117        row.updated_at_ms = now_ms();
2118        match status {
2119            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
2120            RoutineRunStatus::Running => {
2121                row.started_at_ms.get_or_insert_with(now_ms);
2122                if let Some(detail) = reason {
2123                    row.detail = Some(detail);
2124                }
2125            }
2126            RoutineRunStatus::Denied => row.denial_reason = reason,
2127            RoutineRunStatus::Paused => row.paused_reason = reason,
2128            RoutineRunStatus::Completed
2129            | RoutineRunStatus::Failed
2130            | RoutineRunStatus::Cancelled => {
2131                row.finished_at_ms = Some(now_ms());
2132                if let Some(detail) = reason {
2133                    row.detail = Some(detail);
2134                }
2135            }
2136            _ => {
2137                if let Some(detail) = reason {
2138                    row.detail = Some(detail);
2139                }
2140            }
2141        }
2142        let updated = row.clone();
2143        drop(guard);
2144        let _ = self.persist_routine_runs().await;
2145        Some(updated)
2146    }
2147
2148    pub async fn append_routine_run_artifact(
2149        &self,
2150        run_id: &str,
2151        artifact: RoutineRunArtifact,
2152    ) -> Option<RoutineRunRecord> {
2153        let mut guard = self.routine_runs.write().await;
2154        let row = guard.get_mut(run_id)?;
2155        row.updated_at_ms = now_ms();
2156        row.artifacts.push(artifact);
2157        let updated = row.clone();
2158        drop(guard);
2159        let _ = self.persist_routine_runs().await;
2160        Some(updated)
2161    }
2162
2163    pub async fn add_active_session_id(
2164        &self,
2165        run_id: &str,
2166        session_id: String,
2167    ) -> Option<RoutineRunRecord> {
2168        let mut guard = self.routine_runs.write().await;
2169        let row = guard.get_mut(run_id)?;
2170        if !row.active_session_ids.iter().any(|id| id == &session_id) {
2171            row.active_session_ids.push(session_id);
2172        }
2173        row.latest_session_id = row.active_session_ids.last().cloned();
2174        row.updated_at_ms = now_ms();
2175        let updated = row.clone();
2176        drop(guard);
2177        let _ = self.persist_routine_runs().await;
2178        Some(updated)
2179    }
2180
2181    pub async fn clear_active_session_id(
2182        &self,
2183        run_id: &str,
2184        session_id: &str,
2185    ) -> Option<RoutineRunRecord> {
2186        let mut guard = self.routine_runs.write().await;
2187        let row = guard.get_mut(run_id)?;
2188        row.active_session_ids.retain(|id| id != session_id);
2189        row.updated_at_ms = now_ms();
2190        let updated = row.clone();
2191        drop(guard);
2192        let _ = self.persist_routine_runs().await;
2193        Some(updated)
2194    }
2195
2196    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
2197        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
2198        let mut loaded_from_alternate = false;
2199        let mut path_counts = Vec::new();
2200        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
2201            if !path.exists() {
2202                path_counts.push((path, 0usize));
2203                continue;
2204            }
2205            let raw = fs::read_to_string(&path).await?;
2206            if raw.trim().is_empty() || raw.trim() == "{}" {
2207                path_counts.push((path, 0usize));
2208                continue;
2209            }
2210            let parsed = parse_automation_v2_file(&raw);
2211            path_counts.push((path.clone(), parsed.len()));
2212            if path != self.automations_v2_path {
2213                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2214            }
2215            for (automation_id, automation) in parsed {
2216                match merged.get(&automation_id) {
2217                    Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
2218                    _ => {
2219                        merged.insert(automation_id, automation);
2220                    }
2221                }
2222            }
2223        }
2224        let active_path = self.automations_v2_path.display().to_string();
2225        let path_count_summary = path_counts
2226            .iter()
2227            .map(|(path, count)| format!("{}={count}", path.display()))
2228            .collect::<Vec<_>>();
2229        tracing::info!(
2230            active_path,
2231            path_counts = ?path_count_summary,
2232            merged_count = merged.len(),
2233            "loaded automation v2 definitions"
2234        );
2235        *self.automations_v2.write().await = merged;
2236        if loaded_from_alternate {
2237            let _ = self.persist_automations_v2().await;
2238        }
2239        Ok(())
2240    }
2241
2242    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
2243        let payload = {
2244            let guard = self.automations_v2.read().await;
2245            serde_json::to_string_pretty(&*guard)?
2246        };
2247        if let Some(parent) = self.automations_v2_path.parent() {
2248            fs::create_dir_all(parent).await?;
2249        }
2250        fs::write(&self.automations_v2_path, &payload).await?;
2251        Ok(())
2252    }
2253
2254    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
2255        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
2256        let mut loaded_from_alternate = false;
2257        let mut path_counts = Vec::new();
2258        for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
2259            if !path.exists() {
2260                path_counts.push((path, 0usize));
2261                continue;
2262            }
2263            let raw = fs::read_to_string(&path).await?;
2264            if raw.trim().is_empty() || raw.trim() == "{}" {
2265                path_counts.push((path, 0usize));
2266                continue;
2267            }
2268            let parsed = parse_automation_v2_runs_file(&raw);
2269            path_counts.push((path.clone(), parsed.len()));
2270            if path != self.automation_v2_runs_path {
2271                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
2272            }
2273            for (run_id, run) in parsed {
2274                match merged.get(&run_id) {
2275                    Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
2276                    _ => {
2277                        merged.insert(run_id, run);
2278                    }
2279                }
2280            }
2281        }
2282        let active_runs_path = self.automation_v2_runs_path.display().to_string();
2283        let run_path_count_summary = path_counts
2284            .iter()
2285            .map(|(path, count)| format!("{}={count}", path.display()))
2286            .collect::<Vec<_>>();
2287        tracing::info!(
2288            active_path = active_runs_path,
2289            path_counts = ?run_path_count_summary,
2290            merged_count = merged.len(),
2291            "loaded automation v2 runs"
2292        );
2293        *self.automation_v2_runs.write().await = merged;
2294        let recovered = self
2295            .recover_automation_definitions_from_run_snapshots()
2296            .await?;
2297        let automation_count = self.automations_v2.read().await.len();
2298        let run_count = self.automation_v2_runs.read().await.len();
2299        if automation_count == 0 && run_count > 0 {
2300            let active_automations_path = self.automations_v2_path.display().to_string();
2301            let active_runs_path = self.automation_v2_runs_path.display().to_string();
2302            tracing::warn!(
2303                active_automations_path,
2304                active_runs_path,
2305                run_count,
2306                "automation v2 definitions are empty while run history exists"
2307            );
2308        }
2309        if loaded_from_alternate || recovered > 0 {
2310            let _ = self.persist_automation_v2_runs().await;
2311        }
2312        Ok(())
2313    }
2314
2315    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
2316        let payload = {
2317            let guard = self.automation_v2_runs.read().await;
2318            serde_json::to_string_pretty(&*guard)?
2319        };
2320        if let Some(parent) = self.automation_v2_runs_path.parent() {
2321            fs::create_dir_all(parent).await?;
2322        }
2323        fs::write(&self.automation_v2_runs_path, &payload).await?;
2324        Ok(())
2325    }
2326
2327    async fn verify_automation_v2_persisted(
2328        &self,
2329        automation_id: &str,
2330        expected_present: bool,
2331    ) -> anyhow::Result<()> {
2332        let candidate_paths = candidate_automations_v2_paths(&self.automations_v2_path);
2333        let mut mismatches = Vec::new();
2334        for path in candidate_paths {
2335            let raw = if path.exists() {
2336                fs::read_to_string(&path).await?
2337            } else {
2338                String::new()
2339            };
2340            let parsed = parse_automation_v2_file(&raw);
2341            let present = parsed.contains_key(automation_id);
2342            if present != expected_present {
2343                mismatches.push(format!(
2344                    "{} expected_present={} actual_present={} count={}",
2345                    path.display(),
2346                    expected_present,
2347                    present,
2348                    parsed.len()
2349                ));
2350            }
2351        }
2352        if !mismatches.is_empty() {
2353            let active_path = self.automations_v2_path.display().to_string();
2354            tracing::error!(
2355                automation_id,
2356                expected_present,
2357                mismatches = ?mismatches,
2358                active_path,
2359                "automation v2 persistence verification failed"
2360            );
2361            anyhow::bail!(
2362                "automation v2 persistence verification failed for `{}`",
2363                automation_id
2364            );
2365        }
2366        Ok(())
2367    }
2368
2369    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
2370        let runs = self
2371            .automation_v2_runs
2372            .read()
2373            .await
2374            .values()
2375            .cloned()
2376            .collect::<Vec<_>>();
2377        let mut guard = self.automations_v2.write().await;
2378        let mut recovered = 0usize;
2379        for run in runs {
2380            let Some(snapshot) = run.automation_snapshot.clone() else {
2381                continue;
2382            };
2383            let should_replace = match guard.get(&run.automation_id) {
2384                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
2385                None => true,
2386            };
2387            if should_replace {
2388                if !guard.contains_key(&run.automation_id) {
2389                    recovered += 1;
2390                }
2391                guard.insert(run.automation_id.clone(), snapshot);
2392            }
2393        }
2394        drop(guard);
2395        if recovered > 0 {
2396            let active_path = self.automations_v2_path.display().to_string();
2397            tracing::warn!(
2398                recovered,
2399                active_path,
2400                "recovered automation v2 definitions from run snapshots"
2401            );
2402            self.persist_automations_v2().await?;
2403        }
2404        Ok(recovered)
2405    }
2406
2407    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
2408        let path = if self.bug_monitor_config_path.exists() {
2409            self.bug_monitor_config_path.clone()
2410        } else if legacy_failure_reporter_path("failure_reporter_config.json").exists() {
2411            legacy_failure_reporter_path("failure_reporter_config.json")
2412        } else {
2413            return Ok(());
2414        };
2415        let raw = fs::read_to_string(path).await?;
2416        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
2417            .unwrap_or_else(|_| resolve_bug_monitor_env_config());
2418        *self.bug_monitor_config.write().await = parsed;
2419        Ok(())
2420    }
2421
2422    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
2423        if let Some(parent) = self.bug_monitor_config_path.parent() {
2424            fs::create_dir_all(parent).await?;
2425        }
2426        let payload = {
2427            let guard = self.bug_monitor_config.read().await;
2428            serde_json::to_string_pretty(&*guard)?
2429        };
2430        fs::write(&self.bug_monitor_config_path, payload).await?;
2431        Ok(())
2432    }
2433
2434    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
2435        self.bug_monitor_config.read().await.clone()
2436    }
2437
2438    pub async fn put_bug_monitor_config(
2439        &self,
2440        mut config: BugMonitorConfig,
2441    ) -> anyhow::Result<BugMonitorConfig> {
2442        config.workspace_root = config
2443            .workspace_root
2444            .as_ref()
2445            .map(|v| v.trim().to_string())
2446            .filter(|v| !v.is_empty());
2447        if let Some(repo) = config.repo.as_ref() {
2448            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
2449                anyhow::bail!("repo must be in owner/repo format");
2450            }
2451        }
2452        if let Some(server) = config.mcp_server.as_ref() {
2453            let servers = self.mcp.list().await;
2454            if !servers.contains_key(server) {
2455                anyhow::bail!("unknown mcp server `{server}`");
2456            }
2457        }
2458        if let Some(model_policy) = config.model_policy.as_ref() {
2459            crate::http::routines_automations::validate_model_policy(model_policy)
2460                .map_err(anyhow::Error::msg)?;
2461        }
2462        config.updated_at_ms = now_ms();
2463        *self.bug_monitor_config.write().await = config.clone();
2464        self.persist_bug_monitor_config().await?;
2465        Ok(config)
2466    }
2467
2468    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2469        let path = if self.bug_monitor_drafts_path.exists() {
2470            self.bug_monitor_drafts_path.clone()
2471        } else if legacy_failure_reporter_path("failure_reporter_drafts.json").exists() {
2472            legacy_failure_reporter_path("failure_reporter_drafts.json")
2473        } else {
2474            return Ok(());
2475        };
2476        let raw = fs::read_to_string(path).await?;
2477        let parsed =
2478            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
2479                .unwrap_or_default();
2480        *self.bug_monitor_drafts.write().await = parsed;
2481        Ok(())
2482    }
2483
2484    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
2485        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
2486            fs::create_dir_all(parent).await?;
2487        }
2488        let payload = {
2489            let guard = self.bug_monitor_drafts.read().await;
2490            serde_json::to_string_pretty(&*guard)?
2491        };
2492        fs::write(&self.bug_monitor_drafts_path, payload).await?;
2493        Ok(())
2494    }
2495
2496    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2497        let path = if self.bug_monitor_incidents_path.exists() {
2498            self.bug_monitor_incidents_path.clone()
2499        } else if legacy_failure_reporter_path("failure_reporter_incidents.json").exists() {
2500            legacy_failure_reporter_path("failure_reporter_incidents.json")
2501        } else {
2502            return Ok(());
2503        };
2504        let raw = fs::read_to_string(path).await?;
2505        let parsed = serde_json::from_str::<
2506            std::collections::HashMap<String, BugMonitorIncidentRecord>,
2507        >(&raw)
2508        .unwrap_or_default();
2509        *self.bug_monitor_incidents.write().await = parsed;
2510        Ok(())
2511    }
2512
2513    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
2514        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
2515            fs::create_dir_all(parent).await?;
2516        }
2517        let payload = {
2518            let guard = self.bug_monitor_incidents.read().await;
2519            serde_json::to_string_pretty(&*guard)?
2520        };
2521        fs::write(&self.bug_monitor_incidents_path, payload).await?;
2522        Ok(())
2523    }
2524
2525    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
2526        let path = if self.bug_monitor_posts_path.exists() {
2527            self.bug_monitor_posts_path.clone()
2528        } else if legacy_failure_reporter_path("failure_reporter_posts.json").exists() {
2529            legacy_failure_reporter_path("failure_reporter_posts.json")
2530        } else {
2531            return Ok(());
2532        };
2533        let raw = fs::read_to_string(path).await?;
2534        let parsed =
2535            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
2536                .unwrap_or_default();
2537        *self.bug_monitor_posts.write().await = parsed;
2538        Ok(())
2539    }
2540
2541    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
2542        if let Some(parent) = self.bug_monitor_posts_path.parent() {
2543            fs::create_dir_all(parent).await?;
2544        }
2545        let payload = {
2546            let guard = self.bug_monitor_posts.read().await;
2547            serde_json::to_string_pretty(&*guard)?
2548        };
2549        fs::write(&self.bug_monitor_posts_path, payload).await?;
2550        Ok(())
2551    }
2552
2553    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
2554        let mut rows = self
2555            .bug_monitor_incidents
2556            .read()
2557            .await
2558            .values()
2559            .cloned()
2560            .collect::<Vec<_>>();
2561        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2562        rows.truncate(limit.clamp(1, 200));
2563        rows
2564    }
2565
2566    pub async fn get_bug_monitor_incident(
2567        &self,
2568        incident_id: &str,
2569    ) -> Option<BugMonitorIncidentRecord> {
2570        self.bug_monitor_incidents
2571            .read()
2572            .await
2573            .get(incident_id)
2574            .cloned()
2575    }
2576
2577    pub async fn put_bug_monitor_incident(
2578        &self,
2579        incident: BugMonitorIncidentRecord,
2580    ) -> anyhow::Result<BugMonitorIncidentRecord> {
2581        self.bug_monitor_incidents
2582            .write()
2583            .await
2584            .insert(incident.incident_id.clone(), incident.clone());
2585        self.persist_bug_monitor_incidents().await?;
2586        Ok(incident)
2587    }
2588
2589    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
2590        let mut rows = self
2591            .bug_monitor_posts
2592            .read()
2593            .await
2594            .values()
2595            .cloned()
2596            .collect::<Vec<_>>();
2597        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2598        rows.truncate(limit.clamp(1, 200));
2599        rows
2600    }
2601
2602    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
2603        self.bug_monitor_posts.read().await.get(post_id).cloned()
2604    }
2605
2606    pub async fn put_bug_monitor_post(
2607        &self,
2608        post: BugMonitorPostRecord,
2609    ) -> anyhow::Result<BugMonitorPostRecord> {
2610        self.bug_monitor_posts
2611            .write()
2612            .await
2613            .insert(post.post_id.clone(), post.clone());
2614        self.persist_bug_monitor_posts().await?;
2615        Ok(post)
2616    }
2617
2618    pub async fn update_bug_monitor_runtime_status(
2619        &self,
2620        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
2621    ) -> BugMonitorRuntimeStatus {
2622        let mut guard = self.bug_monitor_runtime_status.write().await;
2623        update(&mut guard);
2624        guard.clone()
2625    }
2626
2627    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
2628        let mut rows = self
2629            .bug_monitor_drafts
2630            .read()
2631            .await
2632            .values()
2633            .cloned()
2634            .collect::<Vec<_>>();
2635        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2636        rows.truncate(limit.clamp(1, 200));
2637        rows
2638    }
2639
2640    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
2641        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
2642    }
2643
2644    pub async fn put_bug_monitor_draft(
2645        &self,
2646        draft: BugMonitorDraftRecord,
2647    ) -> anyhow::Result<BugMonitorDraftRecord> {
2648        self.bug_monitor_drafts
2649            .write()
2650            .await
2651            .insert(draft.draft_id.clone(), draft.clone());
2652        self.persist_bug_monitor_drafts().await?;
2653        Ok(draft)
2654    }
2655
2656    pub async fn submit_bug_monitor_draft(
2657        &self,
2658        mut submission: BugMonitorSubmission,
2659    ) -> anyhow::Result<BugMonitorDraftRecord> {
2660        fn normalize_optional(value: Option<String>) -> Option<String> {
2661            value
2662                .map(|v| v.trim().to_string())
2663                .filter(|v| !v.is_empty())
2664        }
2665
2666        fn compute_fingerprint(parts: &[&str]) -> String {
2667            use std::hash::{Hash, Hasher};
2668
2669            let mut hasher = std::collections::hash_map::DefaultHasher::new();
2670            for part in parts {
2671                part.hash(&mut hasher);
2672            }
2673            format!("{:016x}", hasher.finish())
2674        }
2675
2676        submission.repo = normalize_optional(submission.repo);
2677        submission.title = normalize_optional(submission.title);
2678        submission.detail = normalize_optional(submission.detail);
2679        submission.source = normalize_optional(submission.source);
2680        submission.run_id = normalize_optional(submission.run_id);
2681        submission.session_id = normalize_optional(submission.session_id);
2682        submission.correlation_id = normalize_optional(submission.correlation_id);
2683        submission.file_name = normalize_optional(submission.file_name);
2684        submission.process = normalize_optional(submission.process);
2685        submission.component = normalize_optional(submission.component);
2686        submission.event = normalize_optional(submission.event);
2687        submission.level = normalize_optional(submission.level);
2688        submission.fingerprint = normalize_optional(submission.fingerprint);
2689        submission.excerpt = submission
2690            .excerpt
2691            .into_iter()
2692            .map(|line| line.trim_end().to_string())
2693            .filter(|line| !line.is_empty())
2694            .take(50)
2695            .collect();
2696
2697        let config = self.bug_monitor_config().await;
2698        let repo = submission
2699            .repo
2700            .clone()
2701            .or(config.repo.clone())
2702            .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
2703        if !is_valid_owner_repo_slug(&repo) {
2704            anyhow::bail!("Bug Monitor repo must be in owner/repo format");
2705        }
2706
2707        let title = submission.title.clone().unwrap_or_else(|| {
2708            if let Some(event) = submission.event.as_ref() {
2709                format!("Failure detected in {event}")
2710            } else if let Some(component) = submission.component.as_ref() {
2711                format!("Failure detected in {component}")
2712            } else if let Some(process) = submission.process.as_ref() {
2713                format!("Failure detected in {process}")
2714            } else if let Some(source) = submission.source.as_ref() {
2715                format!("Failure report from {source}")
2716            } else {
2717                "Failure report".to_string()
2718            }
2719        });
2720
2721        let mut detail_lines = Vec::new();
2722        if let Some(source) = submission.source.as_ref() {
2723            detail_lines.push(format!("source: {source}"));
2724        }
2725        if let Some(file_name) = submission.file_name.as_ref() {
2726            detail_lines.push(format!("file: {file_name}"));
2727        }
2728        if let Some(level) = submission.level.as_ref() {
2729            detail_lines.push(format!("level: {level}"));
2730        }
2731        if let Some(process) = submission.process.as_ref() {
2732            detail_lines.push(format!("process: {process}"));
2733        }
2734        if let Some(component) = submission.component.as_ref() {
2735            detail_lines.push(format!("component: {component}"));
2736        }
2737        if let Some(event) = submission.event.as_ref() {
2738            detail_lines.push(format!("event: {event}"));
2739        }
2740        if let Some(run_id) = submission.run_id.as_ref() {
2741            detail_lines.push(format!("run_id: {run_id}"));
2742        }
2743        if let Some(session_id) = submission.session_id.as_ref() {
2744            detail_lines.push(format!("session_id: {session_id}"));
2745        }
2746        if let Some(correlation_id) = submission.correlation_id.as_ref() {
2747            detail_lines.push(format!("correlation_id: {correlation_id}"));
2748        }
2749        if let Some(detail) = submission.detail.as_ref() {
2750            detail_lines.push(String::new());
2751            detail_lines.push(detail.clone());
2752        }
2753        if !submission.excerpt.is_empty() {
2754            if !detail_lines.is_empty() {
2755                detail_lines.push(String::new());
2756            }
2757            detail_lines.push("excerpt:".to_string());
2758            detail_lines.extend(submission.excerpt.iter().map(|line| format!("  {line}")));
2759        }
2760        let detail = if detail_lines.is_empty() {
2761            None
2762        } else {
2763            Some(detail_lines.join("\n"))
2764        };
2765
2766        let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2767            compute_fingerprint(&[
2768                repo.as_str(),
2769                title.as_str(),
2770                detail.as_deref().unwrap_or(""),
2771                submission.source.as_deref().unwrap_or(""),
2772                submission.run_id.as_deref().unwrap_or(""),
2773                submission.session_id.as_deref().unwrap_or(""),
2774                submission.correlation_id.as_deref().unwrap_or(""),
2775            ])
2776        });
2777
2778        let mut drafts = self.bug_monitor_drafts.write().await;
2779        if let Some(existing) = drafts
2780            .values()
2781            .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2782            .cloned()
2783        {
2784            return Ok(existing);
2785        }
2786
2787        let draft = BugMonitorDraftRecord {
2788            draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2789            fingerprint,
2790            repo,
2791            status: if config.require_approval_for_new_issues {
2792                "approval_required".to_string()
2793            } else {
2794                "draft_ready".to_string()
2795            },
2796            created_at_ms: now_ms(),
2797            triage_run_id: None,
2798            issue_number: None,
2799            title: Some(title),
2800            detail,
2801            github_status: None,
2802            github_issue_url: None,
2803            github_comment_url: None,
2804            github_posted_at_ms: None,
2805            matched_issue_number: None,
2806            matched_issue_state: None,
2807            evidence_digest: None,
2808            last_post_error: None,
2809        };
2810        drafts.insert(draft.draft_id.clone(), draft.clone());
2811        drop(drafts);
2812        self.persist_bug_monitor_drafts().await?;
2813        Ok(draft)
2814    }
2815
2816    pub async fn update_bug_monitor_draft_status(
2817        &self,
2818        draft_id: &str,
2819        next_status: &str,
2820        reason: Option<&str>,
2821    ) -> anyhow::Result<BugMonitorDraftRecord> {
2822        let normalized_status = next_status.trim().to_ascii_lowercase();
2823        if normalized_status != "draft_ready" && normalized_status != "denied" {
2824            anyhow::bail!("unsupported Bug Monitor draft status");
2825        }
2826
2827        let mut drafts = self.bug_monitor_drafts.write().await;
2828        let Some(draft) = drafts.get_mut(draft_id) else {
2829            anyhow::bail!("Bug Monitor draft not found");
2830        };
2831        if !draft.status.eq_ignore_ascii_case("approval_required") {
2832            anyhow::bail!("Bug Monitor draft is not waiting for approval");
2833        }
2834        draft.status = normalized_status.clone();
2835        if let Some(reason) = reason
2836            .map(|value| value.trim())
2837            .filter(|value| !value.is_empty())
2838        {
2839            let next_detail = if let Some(detail) = draft.detail.as_ref() {
2840                format!("{detail}\n\noperator_note: {reason}")
2841            } else {
2842                format!("operator_note: {reason}")
2843            };
2844            draft.detail = Some(next_detail);
2845        }
2846        let updated = draft.clone();
2847        drop(drafts);
2848        self.persist_bug_monitor_drafts().await?;
2849
2850        let event_name = if normalized_status == "draft_ready" {
2851            "bug_monitor.draft.approved"
2852        } else {
2853            "bug_monitor.draft.denied"
2854        };
2855        self.event_bus.publish(EngineEvent::new(
2856            event_name,
2857            serde_json::json!({
2858                "draft_id": updated.draft_id,
2859                "repo": updated.repo,
2860                "status": updated.status,
2861                "reason": reason,
2862            }),
2863        ));
2864        Ok(updated)
2865    }
2866
2867    pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2868        let required_capabilities = vec![
2869            "github.list_issues".to_string(),
2870            "github.get_issue".to_string(),
2871            "github.create_issue".to_string(),
2872            "github.comment_on_issue".to_string(),
2873        ];
2874        let config = self.bug_monitor_config().await;
2875        let drafts = self.bug_monitor_drafts.read().await;
2876        let incidents = self.bug_monitor_incidents.read().await;
2877        let posts = self.bug_monitor_posts.read().await;
2878        let total_incidents = incidents.len();
2879        let pending_incidents = incidents
2880            .values()
2881            .filter(|row| {
2882                matches!(
2883                    row.status.as_str(),
2884                    "queued"
2885                        | "draft_created"
2886                        | "triage_queued"
2887                        | "analysis_queued"
2888                        | "triage_pending"
2889                        | "issue_draft_pending"
2890                )
2891            })
2892            .count();
2893        let pending_drafts = drafts
2894            .values()
2895            .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2896            .count();
2897        let pending_posts = posts
2898            .values()
2899            .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2900            .count();
2901        let last_activity_at_ms = drafts
2902            .values()
2903            .map(|row| row.created_at_ms)
2904            .chain(posts.values().map(|row| row.updated_at_ms))
2905            .max();
2906        drop(drafts);
2907        drop(incidents);
2908        drop(posts);
2909        let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2910        runtime.paused = config.paused;
2911        runtime.total_incidents = total_incidents;
2912        runtime.pending_incidents = pending_incidents;
2913        runtime.pending_posts = pending_posts;
2914
2915        let mut status = BugMonitorStatus {
2916            config: config.clone(),
2917            runtime,
2918            pending_drafts,
2919            pending_posts,
2920            last_activity_at_ms,
2921            ..BugMonitorStatus::default()
2922        };
2923        let repo_valid = config
2924            .repo
2925            .as_ref()
2926            .map(|repo| is_valid_owner_repo_slug(repo))
2927            .unwrap_or(false);
2928        let servers = self.mcp.list().await;
2929        let selected_server = config
2930            .mcp_server
2931            .as_ref()
2932            .and_then(|name| servers.get(name))
2933            .cloned();
2934        let provider_catalog = self.providers.list().await;
2935        let selected_model = config
2936            .model_policy
2937            .as_ref()
2938            .and_then(|policy| policy.get("default_model"))
2939            .and_then(parse_model_spec);
2940        let selected_model_ready = selected_model
2941            .as_ref()
2942            .map(|spec| provider_catalog_has_model(&provider_catalog, spec))
2943            .unwrap_or(false);
2944        let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2945            self.mcp.server_tools(server_name).await
2946        } else {
2947            Vec::new()
2948        };
2949        let discovered_tools = self
2950            .capability_resolver
2951            .discover_from_runtime(selected_server_tools, Vec::new())
2952            .await;
2953        status.discovered_mcp_tools = discovered_tools
2954            .iter()
2955            .map(|row| row.tool_name.clone())
2956            .collect();
2957        let discovered_providers = discovered_tools
2958            .iter()
2959            .map(|row| row.provider.to_ascii_lowercase())
2960            .collect::<std::collections::HashSet<_>>();
2961        let provider_preference = match config.provider_preference {
2962            BugMonitorProviderPreference::OfficialGithub => {
2963                vec![
2964                    "mcp".to_string(),
2965                    "composio".to_string(),
2966                    "arcade".to_string(),
2967                ]
2968            }
2969            BugMonitorProviderPreference::Composio => {
2970                vec![
2971                    "composio".to_string(),
2972                    "mcp".to_string(),
2973                    "arcade".to_string(),
2974                ]
2975            }
2976            BugMonitorProviderPreference::Arcade => {
2977                vec![
2978                    "arcade".to_string(),
2979                    "mcp".to_string(),
2980                    "composio".to_string(),
2981                ]
2982            }
2983            BugMonitorProviderPreference::Auto => {
2984                vec![
2985                    "mcp".to_string(),
2986                    "composio".to_string(),
2987                    "arcade".to_string(),
2988                ]
2989            }
2990        };
2991        let capability_resolution = self
2992            .capability_resolver
2993            .resolve(
2994                crate::capability_resolver::CapabilityResolveInput {
2995                    workflow_id: Some("bug_monitor".to_string()),
2996                    required_capabilities: required_capabilities.clone(),
2997                    optional_capabilities: Vec::new(),
2998                    provider_preference,
2999                    available_tools: discovered_tools,
3000                },
3001                Vec::new(),
3002            )
3003            .await
3004            .ok();
3005        let bindings_file = self.capability_resolver.list_bindings().await.ok();
3006        if let Some(bindings) = bindings_file.as_ref() {
3007            status.binding_source_version = bindings.builtin_version.clone();
3008            status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
3009            status.selected_server_binding_candidates = bindings
3010                .bindings
3011                .iter()
3012                .filter(|binding| required_capabilities.contains(&binding.capability_id))
3013                .filter(|binding| {
3014                    discovered_providers.is_empty()
3015                        || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
3016                })
3017                .map(|binding| {
3018                    let binding_key = format!(
3019                        "{}::{}",
3020                        binding.capability_id,
3021                        binding.tool_name.to_ascii_lowercase()
3022                    );
3023                    let matched = capability_resolution
3024                        .as_ref()
3025                        .map(|resolution| {
3026                            resolution.resolved.iter().any(|row| {
3027                                row.capability_id == binding.capability_id
3028                                    && format!(
3029                                        "{}::{}",
3030                                        row.capability_id,
3031                                        row.tool_name.to_ascii_lowercase()
3032                                    ) == binding_key
3033                            })
3034                        })
3035                        .unwrap_or(false);
3036                    BugMonitorBindingCandidate {
3037                        capability_id: binding.capability_id.clone(),
3038                        binding_tool_name: binding.tool_name.clone(),
3039                        aliases: binding.tool_name_aliases.clone(),
3040                        matched,
3041                    }
3042                })
3043                .collect();
3044            status.selected_server_binding_candidates.sort_by(|a, b| {
3045                a.capability_id
3046                    .cmp(&b.capability_id)
3047                    .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
3048            });
3049        }
3050        let capability_ready = |capability_id: &str| -> bool {
3051            capability_resolution
3052                .as_ref()
3053                .map(|resolved| {
3054                    resolved
3055                        .resolved
3056                        .iter()
3057                        .any(|row| row.capability_id == capability_id)
3058                })
3059                .unwrap_or(false)
3060        };
3061        if let Some(resolution) = capability_resolution.as_ref() {
3062            status.missing_required_capabilities = resolution.missing_required.clone();
3063            status.resolved_capabilities = resolution
3064                .resolved
3065                .iter()
3066                .map(|row| BugMonitorCapabilityMatch {
3067                    capability_id: row.capability_id.clone(),
3068                    provider: row.provider.clone(),
3069                    tool_name: row.tool_name.clone(),
3070                    binding_index: row.binding_index,
3071                })
3072                .collect();
3073        } else {
3074            status.missing_required_capabilities = required_capabilities.clone();
3075        }
3076        status.required_capabilities = BugMonitorCapabilityReadiness {
3077            github_list_issues: capability_ready("github.list_issues"),
3078            github_get_issue: capability_ready("github.get_issue"),
3079            github_create_issue: capability_ready("github.create_issue"),
3080            github_comment_on_issue: capability_ready("github.comment_on_issue"),
3081        };
3082        status.selected_model = selected_model;
3083        status.readiness = BugMonitorReadiness {
3084            config_valid: repo_valid
3085                && selected_server.is_some()
3086                && status.required_capabilities.github_list_issues
3087                && status.required_capabilities.github_get_issue
3088                && status.required_capabilities.github_create_issue
3089                && status.required_capabilities.github_comment_on_issue
3090                && selected_model_ready,
3091            repo_valid,
3092            mcp_server_present: selected_server.is_some(),
3093            mcp_connected: selected_server
3094                .as_ref()
3095                .map(|row| row.connected)
3096                .unwrap_or(false),
3097            github_read_ready: status.required_capabilities.github_list_issues
3098                && status.required_capabilities.github_get_issue,
3099            github_write_ready: status.required_capabilities.github_create_issue
3100                && status.required_capabilities.github_comment_on_issue,
3101            selected_model_ready,
3102            ingest_ready: config.enabled && !config.paused && repo_valid,
3103            publish_ready: config.enabled
3104                && !config.paused
3105                && repo_valid
3106                && selected_server
3107                    .as_ref()
3108                    .map(|row| row.connected)
3109                    .unwrap_or(false)
3110                && status.required_capabilities.github_list_issues
3111                && status.required_capabilities.github_get_issue
3112                && status.required_capabilities.github_create_issue
3113                && status.required_capabilities.github_comment_on_issue
3114                && selected_model_ready,
3115            runtime_ready: config.enabled
3116                && !config.paused
3117                && repo_valid
3118                && selected_server
3119                    .as_ref()
3120                    .map(|row| row.connected)
3121                    .unwrap_or(false)
3122                && status.required_capabilities.github_list_issues
3123                && status.required_capabilities.github_get_issue
3124                && status.required_capabilities.github_create_issue
3125                && status.required_capabilities.github_comment_on_issue
3126                && selected_model_ready,
3127        };
3128        if config.enabled {
3129            if config.paused {
3130                status.last_error = Some("Bug monitor monitoring is paused.".to_string());
3131            } else if !repo_valid {
3132                status.last_error = Some("Target repo is missing or invalid.".to_string());
3133            } else if selected_server.is_none() {
3134                status.last_error = Some("Selected MCP server is missing.".to_string());
3135            } else if !status.readiness.mcp_connected {
3136                status.last_error = Some("Selected MCP server is disconnected.".to_string());
3137            } else if !selected_model_ready {
3138                status.last_error = Some(
3139                    "Selected provider/model is unavailable. Bug monitor is fail-closed."
3140                        .to_string(),
3141                );
3142            } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
3143                let missing = if status.missing_required_capabilities.is_empty() {
3144                    "unknown".to_string()
3145                } else {
3146                    status.missing_required_capabilities.join(", ")
3147                };
3148                status.last_error = Some(format!(
3149                    "Selected MCP server is missing required GitHub capabilities: {missing}"
3150                ));
3151            }
3152        }
3153        status.runtime.monitoring_active = status.readiness.ingest_ready;
3154        status
3155    }
3156
3157    pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
3158        if !self.workflow_runs_path.exists() {
3159            return Ok(());
3160        }
3161        let raw = fs::read_to_string(&self.workflow_runs_path).await?;
3162        let parsed =
3163            serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
3164                .unwrap_or_default();
3165        *self.workflow_runs.write().await = parsed;
3166        Ok(())
3167    }
3168
3169    pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
3170        if let Some(parent) = self.workflow_runs_path.parent() {
3171            fs::create_dir_all(parent).await?;
3172        }
3173        let payload = {
3174            let guard = self.workflow_runs.read().await;
3175            serde_json::to_string_pretty(&*guard)?
3176        };
3177        fs::write(&self.workflow_runs_path, payload).await?;
3178        Ok(())
3179    }
3180
3181    pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3182        if !self.workflow_hook_overrides_path.exists() {
3183            return Ok(());
3184        }
3185        let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
3186        let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
3187            .unwrap_or_default();
3188        *self.workflow_hook_overrides.write().await = parsed;
3189        Ok(())
3190    }
3191
3192    pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
3193        if let Some(parent) = self.workflow_hook_overrides_path.parent() {
3194            fs::create_dir_all(parent).await?;
3195        }
3196        let payload = {
3197            let guard = self.workflow_hook_overrides.read().await;
3198            serde_json::to_string_pretty(&*guard)?
3199        };
3200        fs::write(&self.workflow_hook_overrides_path, payload).await?;
3201        Ok(())
3202    }
3203
3204    pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
3205        let mut sources = Vec::new();
3206        sources.push(WorkflowLoadSource {
3207            root: resolve_builtin_workflows_dir(),
3208            kind: WorkflowSourceKind::BuiltIn,
3209            pack_id: None,
3210        });
3211
3212        let workspace_root = self.workspace_index.snapshot().await.root;
3213        sources.push(WorkflowLoadSource {
3214            root: PathBuf::from(workspace_root).join(".tandem"),
3215            kind: WorkflowSourceKind::Workspace,
3216            pack_id: None,
3217        });
3218
3219        if let Ok(packs) = self.pack_manager.list().await {
3220            for pack in packs {
3221                sources.push(WorkflowLoadSource {
3222                    root: PathBuf::from(pack.install_path),
3223                    kind: WorkflowSourceKind::Pack,
3224                    pack_id: Some(pack.pack_id),
3225                });
3226            }
3227        }
3228
3229        let mut registry = load_workflow_registry(&sources)?;
3230        let overrides = self.workflow_hook_overrides.read().await.clone();
3231        for hook in &mut registry.hooks {
3232            if let Some(enabled) = overrides.get(&hook.binding_id) {
3233                hook.enabled = *enabled;
3234            }
3235        }
3236        for workflow in registry.workflows.values_mut() {
3237            workflow.hooks = registry
3238                .hooks
3239                .iter()
3240                .filter(|hook| hook.workflow_id == workflow.workflow_id)
3241                .cloned()
3242                .collect();
3243        }
3244        let messages = validate_workflow_registry(&registry);
3245        *self.workflows.write().await = registry;
3246        Ok(messages)
3247    }
3248
3249    pub async fn workflow_registry(&self) -> WorkflowRegistry {
3250        self.workflows.read().await.clone()
3251    }
3252
3253    pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
3254        let mut rows = self
3255            .workflows
3256            .read()
3257            .await
3258            .workflows
3259            .values()
3260            .cloned()
3261            .collect::<Vec<_>>();
3262        rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
3263        rows
3264    }
3265
3266    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
3267        self.workflows
3268            .read()
3269            .await
3270            .workflows
3271            .get(workflow_id)
3272            .cloned()
3273    }
3274
3275    pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
3276        let mut rows = self
3277            .workflows
3278            .read()
3279            .await
3280            .hooks
3281            .iter()
3282            .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
3283            .cloned()
3284            .collect::<Vec<_>>();
3285        rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
3286        rows
3287    }
3288
3289    pub async fn set_workflow_hook_enabled(
3290        &self,
3291        binding_id: &str,
3292        enabled: bool,
3293    ) -> anyhow::Result<Option<WorkflowHookBinding>> {
3294        self.workflow_hook_overrides
3295            .write()
3296            .await
3297            .insert(binding_id.to_string(), enabled);
3298        self.persist_workflow_hook_overrides().await?;
3299        let _ = self.reload_workflows().await?;
3300        Ok(self
3301            .workflows
3302            .read()
3303            .await
3304            .hooks
3305            .iter()
3306            .find(|hook| hook.binding_id == binding_id)
3307            .cloned())
3308    }
3309
3310    pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
3311        self.workflow_runs
3312            .write()
3313            .await
3314            .insert(run.run_id.clone(), run);
3315        self.persist_workflow_runs().await
3316    }
3317
3318    pub async fn update_workflow_run(
3319        &self,
3320        run_id: &str,
3321        update: impl FnOnce(&mut WorkflowRunRecord),
3322    ) -> Option<WorkflowRunRecord> {
3323        let mut guard = self.workflow_runs.write().await;
3324        let row = guard.get_mut(run_id)?;
3325        update(row);
3326        row.updated_at_ms = now_ms();
3327        if matches!(
3328            row.status,
3329            WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
3330        ) {
3331            row.finished_at_ms.get_or_insert_with(now_ms);
3332        }
3333        let out = row.clone();
3334        drop(guard);
3335        let _ = self.persist_workflow_runs().await;
3336        Some(out)
3337    }
3338
3339    pub async fn list_workflow_runs(
3340        &self,
3341        workflow_id: Option<&str>,
3342        limit: usize,
3343    ) -> Vec<WorkflowRunRecord> {
3344        let mut rows = self
3345            .workflow_runs
3346            .read()
3347            .await
3348            .values()
3349            .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
3350            .cloned()
3351            .collect::<Vec<_>>();
3352        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3353        rows.truncate(limit.clamp(1, 500));
3354        rows
3355    }
3356
3357    pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
3358        self.workflow_runs.read().await.get(run_id).cloned()
3359    }
3360
3361    pub async fn put_automation_v2(
3362        &self,
3363        mut automation: AutomationV2Spec,
3364    ) -> anyhow::Result<AutomationV2Spec> {
3365        if automation.automation_id.trim().is_empty() {
3366            anyhow::bail!("automation_id is required");
3367        }
3368        for agent in &mut automation.agents {
3369            if agent.display_name.trim().is_empty() {
3370                agent.display_name = auto_generated_agent_name(&agent.agent_id);
3371            }
3372            agent.tool_policy.allowlist =
3373                normalize_allowed_tools(agent.tool_policy.allowlist.clone());
3374            agent.tool_policy.denylist =
3375                normalize_allowed_tools(agent.tool_policy.denylist.clone());
3376            agent.mcp_policy.allowed_servers =
3377                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
3378            agent.mcp_policy.allowed_tools = agent
3379                .mcp_policy
3380                .allowed_tools
3381                .take()
3382                .map(normalize_allowed_tools);
3383        }
3384        let now = now_ms();
3385        if automation.created_at_ms == 0 {
3386            automation.created_at_ms = now;
3387        }
3388        automation.updated_at_ms = now;
3389        if automation.next_fire_at_ms.is_none() {
3390            automation.next_fire_at_ms =
3391                automation_schedule_next_fire_at_ms(&automation.schedule, now);
3392        }
3393        self.automations_v2
3394            .write()
3395            .await
3396            .insert(automation.automation_id.clone(), automation.clone());
3397        self.persist_automations_v2().await?;
3398        self.verify_automation_v2_persisted(&automation.automation_id, true)
3399            .await?;
3400        Ok(automation)
3401    }
3402
3403    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
3404        self.automations_v2.read().await.get(automation_id).cloned()
3405    }
3406
3407    pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
3408        self.workflow_plans
3409            .write()
3410            .await
3411            .insert(plan.plan_id.clone(), plan);
3412    }
3413
3414    pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
3415        self.workflow_plans.read().await.get(plan_id).cloned()
3416    }
3417
3418    pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
3419        self.workflow_plan_drafts
3420            .write()
3421            .await
3422            .insert(draft.current_plan.plan_id.clone(), draft.clone());
3423        self.put_workflow_plan(draft.current_plan).await;
3424    }
3425
3426    pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
3427        self.workflow_plan_drafts.read().await.get(plan_id).cloned()
3428    }
3429
3430    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3431        let mut rows = self
3432            .automations_v2
3433            .read()
3434            .await
3435            .values()
3436            .cloned()
3437            .collect::<Vec<_>>();
3438        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3439        rows
3440    }
3441
3442    pub async fn delete_automation_v2(
3443        &self,
3444        automation_id: &str,
3445    ) -> anyhow::Result<Option<AutomationV2Spec>> {
3446        let removed = self.automations_v2.write().await.remove(automation_id);
3447        self.persist_automations_v2().await?;
3448        self.verify_automation_v2_persisted(automation_id, false)
3449            .await?;
3450        Ok(removed)
3451    }
3452
3453    pub async fn create_automation_v2_run(
3454        &self,
3455        automation: &AutomationV2Spec,
3456        trigger_type: &str,
3457    ) -> anyhow::Result<AutomationV2RunRecord> {
3458        let now = now_ms();
3459        let pending_nodes = automation
3460            .flow
3461            .nodes
3462            .iter()
3463            .map(|n| n.node_id.clone())
3464            .collect::<Vec<_>>();
3465        let run = AutomationV2RunRecord {
3466            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
3467            automation_id: automation.automation_id.clone(),
3468            trigger_type: trigger_type.to_string(),
3469            status: AutomationRunStatus::Queued,
3470            created_at_ms: now,
3471            updated_at_ms: now,
3472            started_at_ms: None,
3473            finished_at_ms: None,
3474            active_session_ids: Vec::new(),
3475            active_instance_ids: Vec::new(),
3476            checkpoint: AutomationRunCheckpoint {
3477                completed_nodes: Vec::new(),
3478                pending_nodes,
3479                node_outputs: std::collections::HashMap::new(),
3480                node_attempts: std::collections::HashMap::new(),
3481            },
3482            automation_snapshot: Some(automation.clone()),
3483            pause_reason: None,
3484            resume_reason: None,
3485            detail: None,
3486            prompt_tokens: 0,
3487            completion_tokens: 0,
3488            total_tokens: 0,
3489            estimated_cost_usd: 0.0,
3490        };
3491        self.automation_v2_runs
3492            .write()
3493            .await
3494            .insert(run.run_id.clone(), run.clone());
3495        self.persist_automation_v2_runs().await?;
3496        Ok(run)
3497    }
3498
3499    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
3500        self.automation_v2_runs.read().await.get(run_id).cloned()
3501    }
3502
3503    pub async fn list_automation_v2_runs(
3504        &self,
3505        automation_id: Option<&str>,
3506        limit: usize,
3507    ) -> Vec<AutomationV2RunRecord> {
3508        let mut rows = self
3509            .automation_v2_runs
3510            .read()
3511            .await
3512            .values()
3513            .filter(|row| {
3514                if let Some(id) = automation_id {
3515                    row.automation_id == id
3516                } else {
3517                    true
3518                }
3519            })
3520            .cloned()
3521            .collect::<Vec<_>>();
3522        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
3523        rows.truncate(limit.clamp(1, 500));
3524        rows
3525    }
3526
3527    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
3528        let mut guard = self.automation_v2_runs.write().await;
3529        let run_id = guard
3530            .values()
3531            .filter(|row| row.status == AutomationRunStatus::Queued)
3532            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
3533            .map(|row| row.run_id.clone())?;
3534        let now = now_ms();
3535        let run = guard.get_mut(&run_id)?;
3536        run.status = AutomationRunStatus::Running;
3537        run.updated_at_ms = now;
3538        run.started_at_ms.get_or_insert(now);
3539        let claimed = run.clone();
3540        drop(guard);
3541        let _ = self.persist_automation_v2_runs().await;
3542        Some(claimed)
3543    }
3544
3545    pub async fn update_automation_v2_run(
3546        &self,
3547        run_id: &str,
3548        update: impl FnOnce(&mut AutomationV2RunRecord),
3549    ) -> Option<AutomationV2RunRecord> {
3550        let mut guard = self.automation_v2_runs.write().await;
3551        let run = guard.get_mut(run_id)?;
3552        update(run);
3553        run.updated_at_ms = now_ms();
3554        if matches!(
3555            run.status,
3556            AutomationRunStatus::Completed
3557                | AutomationRunStatus::Failed
3558                | AutomationRunStatus::Cancelled
3559        ) {
3560            run.finished_at_ms.get_or_insert_with(now_ms);
3561        }
3562        let out = run.clone();
3563        drop(guard);
3564        let _ = self.persist_automation_v2_runs().await;
3565        Some(out)
3566    }
3567
3568    pub async fn add_automation_v2_session(
3569        &self,
3570        run_id: &str,
3571        session_id: &str,
3572    ) -> Option<AutomationV2RunRecord> {
3573        let updated = self
3574            .update_automation_v2_run(run_id, |row| {
3575                if !row.active_session_ids.iter().any(|id| id == session_id) {
3576                    row.active_session_ids.push(session_id.to_string());
3577                }
3578            })
3579            .await;
3580        self.automation_v2_session_runs
3581            .write()
3582            .await
3583            .insert(session_id.to_string(), run_id.to_string());
3584        updated
3585    }
3586
3587    pub async fn clear_automation_v2_session(
3588        &self,
3589        run_id: &str,
3590        session_id: &str,
3591    ) -> Option<AutomationV2RunRecord> {
3592        self.automation_v2_session_runs
3593            .write()
3594            .await
3595            .remove(session_id);
3596        self.update_automation_v2_run(run_id, |row| {
3597            row.active_session_ids.retain(|id| id != session_id);
3598        })
3599        .await
3600    }
3601
3602    pub async fn apply_provider_usage_to_runs(
3603        &self,
3604        session_id: &str,
3605        prompt_tokens: u64,
3606        completion_tokens: u64,
3607        total_tokens: u64,
3608    ) {
3609        if let Some(policy) = self.routine_session_policy(session_id).await {
3610            let rate = self.token_cost_per_1k_usd.max(0.0);
3611            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3612            let mut guard = self.routine_runs.write().await;
3613            if let Some(run) = guard.get_mut(&policy.run_id) {
3614                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3615                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3616                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3617                run.estimated_cost_usd += delta_cost;
3618                run.updated_at_ms = now_ms();
3619            }
3620            drop(guard);
3621            let _ = self.persist_routine_runs().await;
3622        }
3623
3624        let maybe_v2_run_id = self
3625            .automation_v2_session_runs
3626            .read()
3627            .await
3628            .get(session_id)
3629            .cloned();
3630        if let Some(run_id) = maybe_v2_run_id {
3631            let rate = self.token_cost_per_1k_usd.max(0.0);
3632            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
3633            let mut guard = self.automation_v2_runs.write().await;
3634            if let Some(run) = guard.get_mut(&run_id) {
3635                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
3636                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
3637                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
3638                run.estimated_cost_usd += delta_cost;
3639                run.updated_at_ms = now_ms();
3640            }
3641            drop(guard);
3642            let _ = self.persist_automation_v2_runs().await;
3643        }
3644    }
3645
3646    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
3647        let mut fired = Vec::new();
3648        let mut guard = self.automations_v2.write().await;
3649        for automation in guard.values_mut() {
3650            if automation.status != AutomationV2Status::Active {
3651                continue;
3652            }
3653            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
3654                automation.next_fire_at_ms =
3655                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3656                continue;
3657            };
3658            if now_ms < next_fire_at_ms {
3659                continue;
3660            }
3661            let run_count =
3662                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
3663            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
3664            automation.next_fire_at_ms = next;
3665            automation.last_fired_at_ms = Some(now_ms);
3666            for _ in 0..run_count {
3667                fired.push(automation.automation_id.clone());
3668            }
3669        }
3670        drop(guard);
3671        let _ = self.persist_automations_v2().await;
3672        fired
3673    }
3674}
3675
3676async fn build_channels_config(
3677    state: &AppState,
3678    channels: &ChannelsConfigFile,
3679) -> Option<ChannelsConfig> {
3680    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
3681        return None;
3682    }
3683    Some(ChannelsConfig {
3684        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
3685            bot_token: cfg.bot_token,
3686            allowed_users: cfg.allowed_users,
3687            mention_only: cfg.mention_only,
3688            style_profile: cfg.style_profile,
3689        }),
3690        discord: channels.discord.clone().map(|cfg| DiscordConfig {
3691            bot_token: cfg.bot_token,
3692            guild_id: cfg.guild_id,
3693            allowed_users: cfg.allowed_users,
3694            mention_only: cfg.mention_only,
3695        }),
3696        slack: channels.slack.clone().map(|cfg| SlackConfig {
3697            bot_token: cfg.bot_token,
3698            channel_id: cfg.channel_id,
3699            allowed_users: cfg.allowed_users,
3700            mention_only: cfg.mention_only,
3701        }),
3702        server_base_url: state.server_base_url(),
3703        api_token: state.api_token().await.unwrap_or_default(),
3704        tool_policy: channels.tool_policy.clone(),
3705    })
3706}
3707
3708fn normalize_web_ui_prefix(prefix: &str) -> String {
3709    let trimmed = prefix.trim();
3710    if trimmed.is_empty() || trimmed == "/" {
3711        return "/admin".to_string();
3712    }
3713    let with_leading = if trimmed.starts_with('/') {
3714        trimmed.to_string()
3715    } else {
3716        format!("/{trimmed}")
3717    };
3718    with_leading.trim_end_matches('/').to_string()
3719}
3720
3721fn default_web_ui_prefix() -> String {
3722    "/admin".to_string()
3723}
3724
3725fn default_allow_all() -> Vec<String> {
3726    vec!["*".to_string()]
3727}
3728
3729fn default_discord_mention_only() -> bool {
3730    true
3731}
3732
3733fn normalize_allowed_tools(raw: Vec<String>) -> Vec<String> {
3734    normalize_non_empty_list(raw)
3735}
3736
3737fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
3738    let mut out = Vec::new();
3739    let mut seen = std::collections::HashSet::new();
3740    for item in raw {
3741        let normalized = item.trim().to_string();
3742        if normalized.is_empty() {
3743            continue;
3744        }
3745        if seen.insert(normalized.clone()) {
3746            out.push(normalized);
3747        }
3748    }
3749    out
3750}
3751
3752fn resolve_run_stale_ms() -> u64 {
3753    std::env::var("TANDEM_RUN_STALE_MS")
3754        .ok()
3755        .and_then(|v| v.trim().parse::<u64>().ok())
3756        .unwrap_or(120_000)
3757        .clamp(30_000, 600_000)
3758}
3759
3760fn resolve_token_cost_per_1k_usd() -> f64 {
3761    std::env::var("TANDEM_TOKEN_COST_PER_1K_USD")
3762        .ok()
3763        .and_then(|v| v.trim().parse::<f64>().ok())
3764        .unwrap_or(0.0)
3765        .max(0.0)
3766}
3767
3768fn default_true() -> bool {
3769    true
3770}
3771
3772fn parse_bool_env(key: &str, default: bool) -> bool {
3773    std::env::var(key)
3774        .ok()
3775        .map(|raw| {
3776            matches!(
3777                raw.trim().to_ascii_lowercase().as_str(),
3778                "1" | "true" | "yes" | "on"
3779            )
3780        })
3781        .unwrap_or(default)
3782}
3783
3784fn resolve_bug_monitor_env_config() -> BugMonitorConfig {
3785    fn env_value(new_name: &str, legacy_name: &str) -> Option<String> {
3786        std::env::var(new_name)
3787            .ok()
3788            .or_else(|| std::env::var(legacy_name).ok())
3789            .map(|v| v.trim().to_string())
3790            .filter(|v| !v.is_empty())
3791    }
3792
3793    fn env_bool(new_name: &str, legacy_name: &str, default: bool) -> bool {
3794        env_value(new_name, legacy_name)
3795            .map(|value| parse_bool_like(&value, default))
3796            .unwrap_or(default)
3797    }
3798
3799    fn parse_bool_like(value: &str, default: bool) -> bool {
3800        match value.trim().to_ascii_lowercase().as_str() {
3801            "1" | "true" | "yes" | "on" => true,
3802            "0" | "false" | "no" | "off" => false,
3803            _ => default,
3804        }
3805    }
3806
3807    let provider_preference = match env_value(
3808        "TANDEM_BUG_MONITOR_PROVIDER_PREFERENCE",
3809        "TANDEM_FAILURE_REPORTER_PROVIDER_PREFERENCE",
3810    )
3811    .unwrap_or_default()
3812    .trim()
3813    .to_ascii_lowercase()
3814    .as_str()
3815    {
3816        "official_github" | "official-github" | "github" => {
3817            BugMonitorProviderPreference::OfficialGithub
3818        }
3819        "composio" => BugMonitorProviderPreference::Composio,
3820        "arcade" => BugMonitorProviderPreference::Arcade,
3821        _ => BugMonitorProviderPreference::Auto,
3822    };
3823    let provider_id = env_value(
3824        "TANDEM_BUG_MONITOR_PROVIDER_ID",
3825        "TANDEM_FAILURE_REPORTER_PROVIDER_ID",
3826    );
3827    let model_id = env_value(
3828        "TANDEM_BUG_MONITOR_MODEL_ID",
3829        "TANDEM_FAILURE_REPORTER_MODEL_ID",
3830    );
3831    let model_policy = match (provider_id, model_id) {
3832        (Some(provider_id), Some(model_id)) => Some(json!({
3833            "default_model": {
3834                "provider_id": provider_id,
3835                "model_id": model_id,
3836            }
3837        })),
3838        _ => None,
3839    };
3840    BugMonitorConfig {
3841        enabled: env_bool(
3842            "TANDEM_BUG_MONITOR_ENABLED",
3843            "TANDEM_FAILURE_REPORTER_ENABLED",
3844            false,
3845        ),
3846        paused: env_bool(
3847            "TANDEM_BUG_MONITOR_PAUSED",
3848            "TANDEM_FAILURE_REPORTER_PAUSED",
3849            false,
3850        ),
3851        workspace_root: env_value(
3852            "TANDEM_BUG_MONITOR_WORKSPACE_ROOT",
3853            "TANDEM_FAILURE_REPORTER_WORKSPACE_ROOT",
3854        ),
3855        repo: env_value("TANDEM_BUG_MONITOR_REPO", "TANDEM_FAILURE_REPORTER_REPO"),
3856        mcp_server: env_value(
3857            "TANDEM_BUG_MONITOR_MCP_SERVER",
3858            "TANDEM_FAILURE_REPORTER_MCP_SERVER",
3859        ),
3860        provider_preference,
3861        model_policy,
3862        auto_create_new_issues: env_bool(
3863            "TANDEM_BUG_MONITOR_AUTO_CREATE_NEW_ISSUES",
3864            "TANDEM_FAILURE_REPORTER_AUTO_CREATE_NEW_ISSUES",
3865            true,
3866        ),
3867        require_approval_for_new_issues: env_bool(
3868            "TANDEM_BUG_MONITOR_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3869            "TANDEM_FAILURE_REPORTER_REQUIRE_APPROVAL_FOR_NEW_ISSUES",
3870            false,
3871        ),
3872        auto_comment_on_matched_open_issues: env_bool(
3873            "TANDEM_BUG_MONITOR_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3874            "TANDEM_FAILURE_REPORTER_AUTO_COMMENT_ON_MATCHED_OPEN_ISSUES",
3875            true,
3876        ),
3877        label_mode: BugMonitorLabelMode::ReporterOnly,
3878        updated_at_ms: 0,
3879    }
3880}
3881
3882fn is_valid_owner_repo_slug(value: &str) -> bool {
3883    let trimmed = value.trim();
3884    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
3885        return false;
3886    }
3887    let mut parts = trimmed.split('/');
3888    let Some(owner) = parts.next() else {
3889        return false;
3890    };
3891    let Some(repo) = parts.next() else {
3892        return false;
3893    };
3894    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
3895}
3896
3897fn resolve_shared_resources_path() -> PathBuf {
3898    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3899        let trimmed = dir.trim();
3900        if !trimmed.is_empty() {
3901            return PathBuf::from(trimmed).join("shared_resources.json");
3902        }
3903    }
3904    default_state_dir().join("shared_resources.json")
3905}
3906
3907fn resolve_routines_path() -> PathBuf {
3908    if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
3909        let trimmed = dir.trim();
3910        if !trimmed.is_empty() {
3911            return PathBuf::from(trimmed).join("routines.json");
3912        }
3913    }
3914    default_state_dir().join("routines.json")
3915}
3916
3917fn resolve_routine_history_path() -> PathBuf {
3918    if let Ok(root) = std::env::var("TANDEM_STORAGE_DIR") {
3919        let trimmed = root.trim();
3920        if !trimmed.is_empty() {
3921            return PathBuf::from(trimmed).join("routine_history.json");
3922        }
3923    }
3924    default_state_dir().join("routine_history.json")
3925}
3926
3927fn resolve_routine_runs_path() -> PathBuf {
3928    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
3929        let trimmed = root.trim();
3930        if !trimmed.is_empty() {
3931            return PathBuf::from(trimmed).join("routine_runs.json");
3932        }
3933    }
3934    default_state_dir().join("routine_runs.json")
3935}
3936
3937fn resolve_automations_v2_path() -> PathBuf {
3938    resolve_canonical_data_file_path("automations_v2.json")
3939}
3940
3941fn legacy_automations_v2_path() -> Option<PathBuf> {
3942    resolve_legacy_root_file_path("automations_v2.json")
3943        .filter(|path| path != &resolve_automations_v2_path())
3944}
3945
3946fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
3947    let mut candidates = vec![active_path.clone()];
3948    if let Some(legacy_path) = legacy_automations_v2_path() {
3949        if !candidates.contains(&legacy_path) {
3950            candidates.push(legacy_path);
3951        }
3952    }
3953    let default_path = default_state_dir().join("automations_v2.json");
3954    if !candidates.contains(&default_path) {
3955        candidates.push(default_path);
3956    }
3957    candidates
3958}
3959
3960fn resolve_automation_v2_runs_path() -> PathBuf {
3961    resolve_canonical_data_file_path("automation_v2_runs.json")
3962}
3963
3964fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
3965    resolve_legacy_root_file_path("automation_v2_runs.json")
3966        .filter(|path| path != &resolve_automation_v2_runs_path())
3967}
3968
3969fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
3970    let mut candidates = vec![active_path.clone()];
3971    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
3972        if !candidates.contains(&legacy_path) {
3973            candidates.push(legacy_path);
3974        }
3975    }
3976    let default_path = default_state_dir().join("automation_v2_runs.json");
3977    if !candidates.contains(&default_path) {
3978        candidates.push(default_path);
3979    }
3980    candidates
3981}
3982
3983fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
3984    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
3985        .unwrap_or_default()
3986}
3987
3988fn parse_automation_v2_runs_file(
3989    raw: &str,
3990) -> std::collections::HashMap<String, AutomationV2RunRecord> {
3991    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
3992        .unwrap_or_default()
3993}
3994
3995fn resolve_canonical_data_file_path(file_name: &str) -> PathBuf {
3996    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
3997        let trimmed = root.trim();
3998        if !trimmed.is_empty() {
3999            let base = PathBuf::from(trimmed);
4000            return if path_is_data_dir(&base) {
4001                base.join(file_name)
4002            } else {
4003                base.join("data").join(file_name)
4004            };
4005        }
4006    }
4007    default_state_dir().join(file_name)
4008}
4009
4010fn resolve_legacy_root_file_path(file_name: &str) -> Option<PathBuf> {
4011    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4012        let trimmed = root.trim();
4013        if !trimmed.is_empty() {
4014            let base = PathBuf::from(trimmed);
4015            if !path_is_data_dir(&base) {
4016                return Some(base.join(file_name));
4017            }
4018        }
4019    }
4020    resolve_shared_paths()
4021        .ok()
4022        .map(|paths| paths.canonical_root.join(file_name))
4023}
4024
4025fn path_is_data_dir(path: &std::path::Path) -> bool {
4026    path.file_name()
4027        .and_then(|value| value.to_str())
4028        .map(|value| value.eq_ignore_ascii_case("data"))
4029        .unwrap_or(false)
4030}
4031
4032fn resolve_workflow_runs_path() -> PathBuf {
4033    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4034        let trimmed = root.trim();
4035        if !trimmed.is_empty() {
4036            return PathBuf::from(trimmed).join("workflow_runs.json");
4037        }
4038    }
4039    default_state_dir().join("workflow_runs.json")
4040}
4041
4042fn resolve_bug_monitor_config_path() -> PathBuf {
4043    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4044        let trimmed = root.trim();
4045        if !trimmed.is_empty() {
4046            return PathBuf::from(trimmed).join("bug_monitor_config.json");
4047        }
4048    }
4049    default_state_dir().join("bug_monitor_config.json")
4050}
4051
4052fn resolve_bug_monitor_drafts_path() -> PathBuf {
4053    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4054        let trimmed = root.trim();
4055        if !trimmed.is_empty() {
4056            return PathBuf::from(trimmed).join("bug_monitor_drafts.json");
4057        }
4058    }
4059    default_state_dir().join("bug_monitor_drafts.json")
4060}
4061
4062fn resolve_bug_monitor_incidents_path() -> PathBuf {
4063    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4064        let trimmed = root.trim();
4065        if !trimmed.is_empty() {
4066            return PathBuf::from(trimmed).join("bug_monitor_incidents.json");
4067        }
4068    }
4069    default_state_dir().join("bug_monitor_incidents.json")
4070}
4071
4072fn resolve_bug_monitor_posts_path() -> PathBuf {
4073    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4074        let trimmed = root.trim();
4075        if !trimmed.is_empty() {
4076            return PathBuf::from(trimmed).join("bug_monitor_posts.json");
4077        }
4078    }
4079    default_state_dir().join("bug_monitor_posts.json")
4080}
4081
4082fn legacy_failure_reporter_path(file_name: &str) -> PathBuf {
4083    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4084        let trimmed = root.trim();
4085        if !trimmed.is_empty() {
4086            return PathBuf::from(trimmed).join(file_name);
4087        }
4088    }
4089    default_state_dir().join(file_name)
4090}
4091
4092fn resolve_workflow_hook_overrides_path() -> PathBuf {
4093    if let Ok(root) = std::env::var("TANDEM_STATE_DIR") {
4094        let trimmed = root.trim();
4095        if !trimmed.is_empty() {
4096            return PathBuf::from(trimmed).join("workflow_hook_overrides.json");
4097        }
4098    }
4099    default_state_dir().join("workflow_hook_overrides.json")
4100}
4101
4102fn resolve_builtin_workflows_dir() -> PathBuf {
4103    if let Ok(root) = std::env::var("TANDEM_BUILTIN_WORKFLOW_DIR") {
4104        let trimmed = root.trim();
4105        if !trimmed.is_empty() {
4106            return PathBuf::from(trimmed);
4107        }
4108    }
4109    default_state_dir().join("builtin_workflows")
4110}
4111
4112fn resolve_agent_team_audit_path() -> PathBuf {
4113    if let Ok(base) = std::env::var("TANDEM_STATE_DIR") {
4114        let trimmed = base.trim();
4115        if !trimmed.is_empty() {
4116            return PathBuf::from(trimmed)
4117                .join("agent-team")
4118                .join("audit.log.jsonl");
4119        }
4120    }
4121    default_state_dir()
4122        .join("agent-team")
4123        .join("audit.log.jsonl")
4124}
4125
4126fn default_state_dir() -> PathBuf {
4127    if let Ok(paths) = resolve_shared_paths() {
4128        return paths.engine_state_dir;
4129    }
4130    if let Some(data_dir) = dirs::data_dir() {
4131        return data_dir.join("tandem").join("data");
4132    }
4133    dirs::home_dir()
4134        .map(|home| home.join(".tandem").join("data"))
4135        .unwrap_or_else(|| PathBuf::from(".tandem"))
4136}
4137
4138fn sibling_backup_path(path: &PathBuf) -> PathBuf {
4139    let base = path
4140        .file_name()
4141        .and_then(|name| name.to_str())
4142        .unwrap_or("state.json");
4143    let backup_name = format!("{base}.bak");
4144    path.with_file_name(backup_name)
4145}
4146
4147fn sibling_tmp_path(path: &PathBuf) -> PathBuf {
4148    let base = path
4149        .file_name()
4150        .and_then(|name| name.to_str())
4151        .unwrap_or("state.json");
4152    let tmp_name = format!("{base}.tmp");
4153    path.with_file_name(tmp_name)
4154}
4155
4156fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4157    match schedule {
4158        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4159        RoutineSchedule::Cron { .. } => None,
4160    }
4161}
4162
4163fn parse_timezone(timezone: &str) -> Option<Tz> {
4164    timezone.trim().parse::<Tz>().ok()
4165}
4166
4167fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4168    let tz = parse_timezone(timezone)?;
4169    let schedule = Schedule::from_str(expression).ok()?;
4170    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4171    let local_from = from_dt.with_timezone(&tz);
4172    let next = schedule.after(&local_from).next()?;
4173    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4174}
4175
4176fn compute_next_schedule_fire_at_ms(
4177    schedule: &RoutineSchedule,
4178    timezone: &str,
4179    from_ms: u64,
4180) -> Option<u64> {
4181    let _ = parse_timezone(timezone)?;
4182    match schedule {
4183        RoutineSchedule::IntervalSeconds { seconds } => {
4184            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4185        }
4186        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4187    }
4188}
4189
4190fn compute_misfire_plan_for_schedule(
4191    now_ms: u64,
4192    next_fire_at_ms: u64,
4193    schedule: &RoutineSchedule,
4194    timezone: &str,
4195    policy: &RoutineMisfirePolicy,
4196) -> (u32, u64) {
4197    match schedule {
4198        RoutineSchedule::IntervalSeconds { .. } => {
4199            let Some(interval_ms) = routine_interval_ms(schedule) else {
4200                return (0, next_fire_at_ms);
4201            };
4202            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4203        }
4204        RoutineSchedule::Cron { expression } => {
4205            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4206                .unwrap_or_else(|| now_ms.saturating_add(60_000));
4207            match policy {
4208                RoutineMisfirePolicy::Skip => (0, aligned_next),
4209                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4210                RoutineMisfirePolicy::CatchUp { max_runs } => {
4211                    let mut count = 0u32;
4212                    let mut cursor = next_fire_at_ms;
4213                    while cursor <= now_ms && count < *max_runs {
4214                        count = count.saturating_add(1);
4215                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4216                            break;
4217                        };
4218                        if next <= cursor {
4219                            break;
4220                        }
4221                        cursor = next;
4222                    }
4223                    (count, aligned_next)
4224                }
4225            }
4226        }
4227    }
4228}
4229
4230fn compute_misfire_plan(
4231    now_ms: u64,
4232    next_fire_at_ms: u64,
4233    interval_ms: u64,
4234    policy: &RoutineMisfirePolicy,
4235) -> (u32, u64) {
4236    if now_ms < next_fire_at_ms || interval_ms == 0 {
4237        return (0, next_fire_at_ms);
4238    }
4239    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4240    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4241    match policy {
4242        RoutineMisfirePolicy::Skip => (0, aligned_next),
4243        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4244        RoutineMisfirePolicy::CatchUp { max_runs } => {
4245            let count = missed.min(u64::from(*max_runs)) as u32;
4246            (count, aligned_next)
4247        }
4248    }
4249}
4250
4251fn auto_generated_agent_name(agent_id: &str) -> String {
4252    let names = [
4253        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4254    ];
4255    let digest = Sha256::digest(agent_id.as_bytes());
4256    let idx = usize::from(digest[0]) % names.len();
4257    format!("{}-{:02x}", names[idx], digest[1])
4258}
4259
4260fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4261    match schedule.schedule_type {
4262        AutomationV2ScheduleType::Manual => None,
4263        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4264            seconds: schedule.interval_seconds.unwrap_or(60),
4265        }),
4266        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4267            expression: schedule.cron_expression.clone().unwrap_or_default(),
4268        }),
4269    }
4270}
4271
4272fn automation_schedule_next_fire_at_ms(
4273    schedule: &AutomationV2Schedule,
4274    from_ms: u64,
4275) -> Option<u64> {
4276    let routine_schedule = schedule_from_automation_v2(schedule)?;
4277    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4278}
4279
4280fn automation_schedule_due_count(
4281    schedule: &AutomationV2Schedule,
4282    now_ms: u64,
4283    next_fire_at_ms: u64,
4284) -> u32 {
4285    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4286        return 0;
4287    };
4288    let (count, _) = compute_misfire_plan_for_schedule(
4289        now_ms,
4290        next_fire_at_ms,
4291        &routine_schedule,
4292        &schedule.timezone,
4293        &schedule.misfire_policy,
4294    );
4295    count.max(1)
4296}
4297
4298#[derive(Debug, Clone, PartialEq, Eq)]
4299pub enum RoutineExecutionDecision {
4300    Allowed,
4301    RequiresApproval { reason: String },
4302    Blocked { reason: String },
4303}
4304
4305pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4306    let entrypoint = routine.entrypoint.to_ascii_lowercase();
4307    if entrypoint.starts_with("connector.")
4308        || entrypoint.starts_with("integration.")
4309        || entrypoint.contains("external")
4310    {
4311        return true;
4312    }
4313    routine
4314        .args
4315        .get("uses_external_integrations")
4316        .and_then(|v| v.as_bool())
4317        .unwrap_or(false)
4318        || routine
4319            .args
4320            .get("connector_id")
4321            .and_then(|v| v.as_str())
4322            .is_some()
4323}
4324
4325pub fn evaluate_routine_execution_policy(
4326    routine: &RoutineSpec,
4327    trigger_type: &str,
4328) -> RoutineExecutionDecision {
4329    if !routine_uses_external_integrations(routine) {
4330        return RoutineExecutionDecision::Allowed;
4331    }
4332    if !routine.external_integrations_allowed {
4333        return RoutineExecutionDecision::Blocked {
4334            reason: "external integrations are disabled by policy".to_string(),
4335        };
4336    }
4337    if routine.requires_approval {
4338        return RoutineExecutionDecision::RequiresApproval {
4339            reason: format!(
4340                "manual approval required before external side effects ({})",
4341                trigger_type
4342            ),
4343        };
4344    }
4345    RoutineExecutionDecision::Allowed
4346}
4347
4348fn is_valid_resource_key(key: &str) -> bool {
4349    let trimmed = key.trim();
4350    if trimmed.is_empty() {
4351        return false;
4352    }
4353    if trimmed == "swarm.active_tasks" {
4354        return true;
4355    }
4356    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4357    if !allowed_prefix
4358        .iter()
4359        .any(|prefix| trimmed.starts_with(prefix))
4360    {
4361        return false;
4362    }
4363    !trimmed.contains("//")
4364}
4365
4366impl Deref for AppState {
4367    type Target = RuntimeState;
4368
4369    fn deref(&self) -> &Self::Target {
4370        self.runtime
4371            .get()
4372            .expect("runtime accessed before startup completion")
4373    }
4374}
4375
4376#[derive(Clone)]
4377struct ServerPromptContextHook {
4378    state: AppState,
4379}
4380
4381impl ServerPromptContextHook {
4382    fn new(state: AppState) -> Self {
4383        Self { state }
4384    }
4385
4386    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4387        let paths = resolve_shared_paths().ok()?;
4388        MemoryDatabase::new(&paths.memory_db_path).await.ok()
4389    }
4390
4391    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4392        let paths = resolve_shared_paths().ok()?;
4393        tandem_memory::MemoryManager::new(&paths.memory_db_path)
4394            .await
4395            .ok()
4396    }
4397
4398    fn hash_query(input: &str) -> String {
4399        let mut hasher = Sha256::new();
4400        hasher.update(input.as_bytes());
4401        format!("{:x}", hasher.finalize())
4402    }
4403
4404    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4405        let mut out = vec!["<memory_context>".to_string()];
4406        let mut used = 0usize;
4407        for hit in hits {
4408            let text = hit
4409                .record
4410                .content
4411                .split_whitespace()
4412                .take(60)
4413                .collect::<Vec<_>>()
4414                .join(" ");
4415            let line = format!(
4416                "- [{:.3}] {} (source={}, run={})",
4417                hit.score, text, hit.record.source_type, hit.record.run_id
4418            );
4419            used = used.saturating_add(line.len());
4420            if used > 2200 {
4421                break;
4422            }
4423            out.push(line);
4424        }
4425        out.push("</memory_context>".to_string());
4426        out.join("\n")
4427    }
4428
4429    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4430        chunk
4431            .metadata
4432            .as_ref()
4433            .and_then(|meta| meta.get("source_url"))
4434            .and_then(Value::as_str)
4435            .map(str::trim)
4436            .filter(|v| !v.is_empty())
4437            .map(ToString::to_string)
4438    }
4439
4440    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4441        if let Some(path) = chunk
4442            .metadata
4443            .as_ref()
4444            .and_then(|meta| meta.get("relative_path"))
4445            .and_then(Value::as_str)
4446            .map(str::trim)
4447            .filter(|v| !v.is_empty())
4448        {
4449            return path.to_string();
4450        }
4451        chunk
4452            .source
4453            .strip_prefix("guide_docs:")
4454            .unwrap_or(chunk.source.as_str())
4455            .to_string()
4456    }
4457
4458    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4459        let mut out = vec!["<docs_context>".to_string()];
4460        let mut used = 0usize;
4461        for hit in hits {
4462            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4463            let path = Self::extract_docs_relative_path(&hit.chunk);
4464            let text = hit
4465                .chunk
4466                .content
4467                .split_whitespace()
4468                .take(70)
4469                .collect::<Vec<_>>()
4470                .join(" ");
4471            let line = format!(
4472                "- [{:.3}] {} (doc_path={}, source_url={})",
4473                hit.similarity, text, path, url
4474            );
4475            used = used.saturating_add(line.len());
4476            if used > 2800 {
4477                break;
4478            }
4479            out.push(line);
4480        }
4481        out.push("</docs_context>".to_string());
4482        out.join("\n")
4483    }
4484
4485    async fn search_embedded_docs(
4486        &self,
4487        query: &str,
4488        limit: usize,
4489    ) -> Vec<tandem_memory::types::MemorySearchResult> {
4490        let Some(manager) = self.open_memory_manager().await else {
4491            return Vec::new();
4492        };
4493        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
4494        manager
4495            .search(
4496                query,
4497                Some(MemoryTier::Global),
4498                None,
4499                None,
4500                Some(search_limit),
4501            )
4502            .await
4503            .unwrap_or_default()
4504            .into_iter()
4505            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
4506            .take(limit)
4507            .collect()
4508    }
4509
4510    fn should_skip_memory_injection(query: &str) -> bool {
4511        let trimmed = query.trim();
4512        if trimmed.is_empty() {
4513            return true;
4514        }
4515        let lower = trimmed.to_ascii_lowercase();
4516        let social = [
4517            "hi",
4518            "hello",
4519            "hey",
4520            "thanks",
4521            "thank you",
4522            "ok",
4523            "okay",
4524            "cool",
4525            "nice",
4526            "yo",
4527            "good morning",
4528            "good afternoon",
4529            "good evening",
4530        ];
4531        lower.len() <= 32 && social.contains(&lower.as_str())
4532    }
4533
4534    fn personality_preset_text(preset: &str) -> &'static str {
4535        match preset {
4536            "concise" => {
4537                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
4538            }
4539            "friendly" => {
4540                "Default style: friendly and supportive while staying technically rigorous and concrete."
4541            }
4542            "mentor" => {
4543                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
4544            }
4545            "critical" => {
4546                "Default style: critical and risk-first. Surface failure modes and assumptions early."
4547            }
4548            _ => {
4549                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
4550            }
4551        }
4552    }
4553
4554    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
4555        let allow_agent_override = agent_name
4556            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
4557            .unwrap_or(false);
4558        let legacy_bot_name = config
4559            .get("bot_name")
4560            .and_then(Value::as_str)
4561            .map(str::trim)
4562            .filter(|v| !v.is_empty());
4563        let bot_name = config
4564            .get("identity")
4565            .and_then(|identity| identity.get("bot"))
4566            .and_then(|bot| bot.get("canonical_name"))
4567            .and_then(Value::as_str)
4568            .map(str::trim)
4569            .filter(|v| !v.is_empty())
4570            .or(legacy_bot_name)
4571            .unwrap_or("Tandem");
4572
4573        let default_profile = config
4574            .get("identity")
4575            .and_then(|identity| identity.get("personality"))
4576            .and_then(|personality| personality.get("default"));
4577        let default_preset = default_profile
4578            .and_then(|profile| profile.get("preset"))
4579            .and_then(Value::as_str)
4580            .map(str::trim)
4581            .filter(|v| !v.is_empty())
4582            .unwrap_or("balanced");
4583        let default_custom = default_profile
4584            .and_then(|profile| profile.get("custom_instructions"))
4585            .and_then(Value::as_str)
4586            .map(str::trim)
4587            .filter(|v| !v.is_empty())
4588            .map(ToString::to_string);
4589        let legacy_persona = config
4590            .get("persona")
4591            .and_then(Value::as_str)
4592            .map(str::trim)
4593            .filter(|v| !v.is_empty())
4594            .map(ToString::to_string);
4595
4596        let per_agent_profile = if allow_agent_override {
4597            agent_name.and_then(|name| {
4598                config
4599                    .get("identity")
4600                    .and_then(|identity| identity.get("personality"))
4601                    .and_then(|personality| personality.get("per_agent"))
4602                    .and_then(|per_agent| per_agent.get(name))
4603            })
4604        } else {
4605            None
4606        };
4607        let preset = per_agent_profile
4608            .and_then(|profile| profile.get("preset"))
4609            .and_then(Value::as_str)
4610            .map(str::trim)
4611            .filter(|v| !v.is_empty())
4612            .unwrap_or(default_preset);
4613        let custom = per_agent_profile
4614            .and_then(|profile| profile.get("custom_instructions"))
4615            .and_then(Value::as_str)
4616            .map(str::trim)
4617            .filter(|v| !v.is_empty())
4618            .map(ToString::to_string)
4619            .or(default_custom)
4620            .or(legacy_persona);
4621
4622        let mut lines = vec![
4623            format!("You are {bot_name}, an AI assistant."),
4624            Self::personality_preset_text(preset).to_string(),
4625        ];
4626        if let Some(custom) = custom {
4627            lines.push(format!("Additional personality instructions: {custom}"));
4628        }
4629        Some(lines.join("\n"))
4630    }
4631}
4632
4633impl PromptContextHook for ServerPromptContextHook {
4634    fn augment_provider_messages(
4635        &self,
4636        ctx: PromptContextHookContext,
4637        mut messages: Vec<ChatMessage>,
4638    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
4639        let this = self.clone();
4640        Box::pin(async move {
4641            // Startup can invoke prompt plumbing before RuntimeState is installed.
4642            // Never panic from context hooks; fail-open and continue without augmentation.
4643            if !this.state.is_ready() {
4644                return Ok(messages);
4645            }
4646            let run = this.state.run_registry.get(&ctx.session_id).await;
4647            let Some(run) = run else {
4648                return Ok(messages);
4649            };
4650            let config = this.state.config.get_effective_value().await;
4651            if let Some(identity_block) =
4652                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
4653            {
4654                messages.push(ChatMessage {
4655                    role: "system".to_string(),
4656                    content: identity_block,
4657                    attachments: Vec::new(),
4658                });
4659            }
4660            let run_id = run.run_id;
4661            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
4662            let query = messages
4663                .iter()
4664                .rev()
4665                .find(|m| m.role == "user")
4666                .map(|m| m.content.clone())
4667                .unwrap_or_default();
4668            if query.trim().is_empty() {
4669                return Ok(messages);
4670            }
4671            if Self::should_skip_memory_injection(&query) {
4672                return Ok(messages);
4673            }
4674
4675            let docs_hits = this.search_embedded_docs(&query, 6).await;
4676            if !docs_hits.is_empty() {
4677                let docs_block = Self::build_docs_memory_block(&docs_hits);
4678                messages.push(ChatMessage {
4679                    role: "system".to_string(),
4680                    content: docs_block.clone(),
4681                    attachments: Vec::new(),
4682                });
4683                this.state.event_bus.publish(EngineEvent::new(
4684                    "memory.docs.context.injected",
4685                    json!({
4686                        "runID": run_id,
4687                        "sessionID": ctx.session_id,
4688                        "messageID": ctx.message_id,
4689                        "iteration": ctx.iteration,
4690                        "count": docs_hits.len(),
4691                        "tokenSizeApprox": docs_block.split_whitespace().count(),
4692                        "sourcePrefix": "guide_docs:"
4693                    }),
4694                ));
4695                return Ok(messages);
4696            }
4697
4698            let Some(db) = this.open_memory_db().await else {
4699                return Ok(messages);
4700            };
4701            let started = now_ms();
4702            let hits = db
4703                .search_global_memory(&user_id, &query, 8, None, None, None)
4704                .await
4705                .unwrap_or_default();
4706            let latency_ms = now_ms().saturating_sub(started);
4707            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
4708            this.state.event_bus.publish(EngineEvent::new(
4709                "memory.search.performed",
4710                json!({
4711                    "runID": run_id,
4712                    "sessionID": ctx.session_id,
4713                    "messageID": ctx.message_id,
4714                    "providerID": ctx.provider_id,
4715                    "modelID": ctx.model_id,
4716                    "iteration": ctx.iteration,
4717                    "queryHash": Self::hash_query(&query),
4718                    "resultCount": hits.len(),
4719                    "scoreMin": scores.iter().copied().reduce(f64::min),
4720                    "scoreMax": scores.iter().copied().reduce(f64::max),
4721                    "scores": scores,
4722                    "latencyMs": latency_ms,
4723                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
4724                }),
4725            ));
4726
4727            if hits.is_empty() {
4728                return Ok(messages);
4729            }
4730
4731            let memory_block = Self::build_memory_block(&hits);
4732            messages.push(ChatMessage {
4733                role: "system".to_string(),
4734                content: memory_block.clone(),
4735                attachments: Vec::new(),
4736            });
4737            this.state.event_bus.publish(EngineEvent::new(
4738                "memory.context.injected",
4739                json!({
4740                    "runID": run_id,
4741                    "sessionID": ctx.session_id,
4742                    "messageID": ctx.message_id,
4743                    "iteration": ctx.iteration,
4744                    "count": hits.len(),
4745                    "tokenSizeApprox": memory_block.split_whitespace().count(),
4746                }),
4747            ));
4748            Ok(messages)
4749        })
4750    }
4751}
4752
4753fn extract_event_session_id(properties: &Value) -> Option<String> {
4754    properties
4755        .get("sessionID")
4756        .or_else(|| properties.get("sessionId"))
4757        .or_else(|| properties.get("id"))
4758        .or_else(|| {
4759            properties
4760                .get("part")
4761                .and_then(|part| part.get("sessionID"))
4762        })
4763        .or_else(|| {
4764            properties
4765                .get("part")
4766                .and_then(|part| part.get("sessionId"))
4767        })
4768        .and_then(|v| v.as_str())
4769        .map(|s| s.to_string())
4770}
4771
4772fn extract_event_run_id(properties: &Value) -> Option<String> {
4773    properties
4774        .get("runID")
4775        .or_else(|| properties.get("run_id"))
4776        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
4777        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
4778        .and_then(|v| v.as_str())
4779        .map(|s| s.to_string())
4780}
4781
4782fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
4783    let part = properties.get("part")?;
4784    let part_type = part
4785        .get("type")
4786        .and_then(|v| v.as_str())
4787        .unwrap_or_default()
4788        .to_ascii_lowercase();
4789    if part_type != "tool" && part_type != "tool-invocation" && part_type != "tool-result" {
4790        return None;
4791    }
4792    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
4793    let message_id = part
4794        .get("messageID")
4795        .or_else(|| part.get("message_id"))
4796        .and_then(|v| v.as_str())?
4797        .to_string();
4798    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
4799    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
4800        if let Some(preview) = properties
4801            .get("toolCallDelta")
4802            .and_then(|delta| delta.get("parsedArgsPreview"))
4803            .cloned()
4804        {
4805            let preview_nonempty = !preview.is_null()
4806                && !preview.as_object().is_some_and(|value| value.is_empty())
4807                && !preview
4808                    .as_str()
4809                    .map(|value| value.trim().is_empty())
4810                    .unwrap_or(false);
4811            if preview_nonempty {
4812                args = preview;
4813            }
4814        }
4815    }
4816    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
4817    {
4818        tracing::info!(
4819            message_id = %message_id,
4820            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
4821            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
4822            has_result = part.get("result").is_some(),
4823            has_error = part.get("error").is_some(),
4824            "persistable write tool part still has empty args"
4825        );
4826    }
4827    let result = part.get("result").cloned().filter(|value| !value.is_null());
4828    let error = part
4829        .get("error")
4830        .and_then(|v| v.as_str())
4831        .map(|value| value.to_string());
4832    Some((
4833        message_id,
4834        MessagePart::ToolInvocation {
4835            tool,
4836            args,
4837            result,
4838            error,
4839        },
4840    ))
4841}
4842
4843fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
4844    let session_id = extract_event_session_id(&event.properties)?;
4845    let run_id = extract_event_run_id(&event.properties);
4846    let key = format!("run/{session_id}/status");
4847
4848    let mut base = serde_json::Map::new();
4849    base.insert("sessionID".to_string(), Value::String(session_id));
4850    if let Some(run_id) = run_id {
4851        base.insert("runID".to_string(), Value::String(run_id));
4852    }
4853
4854    match event.event_type.as_str() {
4855        "session.run.started" => {
4856            base.insert("state".to_string(), Value::String("running".to_string()));
4857            base.insert("phase".to_string(), Value::String("run".to_string()));
4858            base.insert(
4859                "eventType".to_string(),
4860                Value::String("session.run.started".to_string()),
4861            );
4862            Some(StatusIndexUpdate {
4863                key,
4864                value: Value::Object(base),
4865            })
4866        }
4867        "session.run.finished" => {
4868            base.insert("state".to_string(), Value::String("finished".to_string()));
4869            base.insert("phase".to_string(), Value::String("run".to_string()));
4870            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
4871                base.insert("result".to_string(), Value::String(status.to_string()));
4872            }
4873            base.insert(
4874                "eventType".to_string(),
4875                Value::String("session.run.finished".to_string()),
4876            );
4877            Some(StatusIndexUpdate {
4878                key,
4879                value: Value::Object(base),
4880            })
4881        }
4882        "message.part.updated" => {
4883            let part_type = event
4884                .properties
4885                .get("part")
4886                .and_then(|v| v.get("type"))
4887                .and_then(|v| v.as_str())?;
4888            let part_state = event
4889                .properties
4890                .get("part")
4891                .and_then(|v| v.get("state"))
4892                .and_then(|v| v.as_str())
4893                .unwrap_or("");
4894            let (phase, tool_active) = match (part_type, part_state) {
4895                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
4896                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
4897                _ => return None,
4898            };
4899            base.insert("state".to_string(), Value::String("running".to_string()));
4900            base.insert("phase".to_string(), Value::String(phase.to_string()));
4901            base.insert("toolActive".to_string(), Value::Bool(tool_active));
4902            if let Some(tool_name) = event
4903                .properties
4904                .get("part")
4905                .and_then(|v| v.get("tool"))
4906                .and_then(|v| v.as_str())
4907            {
4908                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
4909            }
4910            base.insert(
4911                "eventType".to_string(),
4912                Value::String("message.part.updated".to_string()),
4913            );
4914            Some(StatusIndexUpdate {
4915                key,
4916                value: Value::Object(base),
4917            })
4918        }
4919        _ => None,
4920    }
4921}
4922
4923pub async fn run_session_part_persister(state: AppState) {
4924    if !state.wait_until_ready_or_failed(120, 250).await {
4925        tracing::warn!("session part persister: skipped because runtime did not become ready");
4926        return;
4927    }
4928    let Some(mut rx) = state.event_bus.take_session_part_receiver() else {
4929        tracing::warn!("session part persister: skipped because receiver was already taken");
4930        return;
4931    };
4932    while let Some(event) = rx.recv().await {
4933        if event.event_type != "message.part.updated" {
4934            continue;
4935        }
4936        // Streaming tool-call previews are useful for the live UI, but persistence
4937        // should store the finalized invocation/result events to avoid duplicating
4938        // one tool part per streamed args delta.
4939        if event.properties.get("toolCallDelta").is_some() {
4940            continue;
4941        }
4942        let Some(session_id) = extract_event_session_id(&event.properties) else {
4943            continue;
4944        };
4945        let Some((message_id, part)) = extract_persistable_tool_part(&event.properties) else {
4946            continue;
4947        };
4948        if let Err(error) = state
4949            .storage
4950            .append_message_part(&session_id, &message_id, part)
4951            .await
4952        {
4953            tracing::warn!(
4954                "session part persister failed for session={} message={}: {error:#}",
4955                session_id,
4956                message_id
4957            );
4958        }
4959    }
4960}
4961
4962pub async fn run_status_indexer(state: AppState) {
4963    if !state.wait_until_ready_or_failed(120, 250).await {
4964        tracing::warn!("status indexer: skipped because runtime did not become ready");
4965        return;
4966    }
4967    let mut rx = state.event_bus.subscribe();
4968    loop {
4969        match rx.recv().await {
4970            Ok(event) => {
4971                if let Some(update) = derive_status_index_update(&event) {
4972                    if let Err(error) = state
4973                        .put_shared_resource(
4974                            update.key,
4975                            update.value,
4976                            None,
4977                            "system.status_indexer".to_string(),
4978                            None,
4979                        )
4980                        .await
4981                    {
4982                        tracing::warn!("status indexer failed to persist update: {error:?}");
4983                    }
4984                }
4985            }
4986            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
4987            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
4988        }
4989    }
4990}
4991
4992pub async fn run_agent_team_supervisor(state: AppState) {
4993    if !state.wait_until_ready_or_failed(120, 250).await {
4994        tracing::warn!("agent team supervisor: skipped because runtime did not become ready");
4995        return;
4996    }
4997    let mut rx = state.event_bus.subscribe();
4998    loop {
4999        match rx.recv().await {
5000            Ok(event) => {
5001                state.agent_teams.handle_engine_event(&state, &event).await;
5002            }
5003            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5004            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5005        }
5006    }
5007}
5008
5009pub async fn run_bug_monitor(state: AppState) {
5010    if !state.wait_until_ready_or_failed(120, 250).await {
5011        tracing::warn!("bug monitor: skipped because runtime did not become ready");
5012        return;
5013    }
5014    state
5015        .update_bug_monitor_runtime_status(|runtime| {
5016            runtime.monitoring_active = false;
5017            runtime.last_runtime_error = None;
5018        })
5019        .await;
5020    let mut rx = state.event_bus.subscribe();
5021    loop {
5022        match rx.recv().await {
5023            Ok(event) => {
5024                if !is_bug_monitor_candidate_event(&event) {
5025                    continue;
5026                }
5027                let status = state.bug_monitor_status().await;
5028                if !status.config.enabled || status.config.paused || !status.readiness.repo_valid {
5029                    state
5030                        .update_bug_monitor_runtime_status(|runtime| {
5031                            runtime.monitoring_active = status.config.enabled
5032                                && !status.config.paused
5033                                && status.readiness.repo_valid;
5034                            runtime.paused = status.config.paused;
5035                            runtime.last_runtime_error = status.last_error.clone();
5036                        })
5037                        .await;
5038                    continue;
5039                }
5040                match process_bug_monitor_event(&state, &event, &status.config).await {
5041                    Ok(incident) => {
5042                        state
5043                            .update_bug_monitor_runtime_status(|runtime| {
5044                                runtime.monitoring_active = true;
5045                                runtime.paused = status.config.paused;
5046                                runtime.last_processed_at_ms = Some(now_ms());
5047                                runtime.last_incident_event_type =
5048                                    Some(incident.event_type.clone());
5049                                runtime.last_runtime_error = None;
5050                            })
5051                            .await;
5052                    }
5053                    Err(error) => {
5054                        let detail = truncate_text(&error.to_string(), 500);
5055                        state
5056                            .update_bug_monitor_runtime_status(|runtime| {
5057                                runtime.monitoring_active = true;
5058                                runtime.paused = status.config.paused;
5059                                runtime.last_processed_at_ms = Some(now_ms());
5060                                runtime.last_incident_event_type = Some(event.event_type.clone());
5061                                runtime.last_runtime_error = Some(detail.clone());
5062                            })
5063                            .await;
5064                        state.event_bus.publish(EngineEvent::new(
5065                            "bug_monitor.error",
5066                            serde_json::json!({
5067                                "eventType": event.event_type,
5068                                "detail": detail,
5069                            }),
5070                        ));
5071                    }
5072                }
5073            }
5074            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5075            Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => {
5076                state
5077                    .update_bug_monitor_runtime_status(|runtime| {
5078                        runtime.last_runtime_error =
5079                            Some(format!("Bug monitor lagged and dropped {count} events."));
5080                    })
5081                    .await;
5082            }
5083        }
5084    }
5085}
5086
5087pub async fn run_usage_aggregator(state: AppState) {
5088    if !state.wait_until_ready_or_failed(120, 250).await {
5089        tracing::warn!("usage aggregator: skipped because runtime did not become ready");
5090        return;
5091    }
5092    let mut rx = state.event_bus.subscribe();
5093    loop {
5094        match rx.recv().await {
5095            Ok(event) => {
5096                if event.event_type != "provider.usage" {
5097                    continue;
5098                }
5099                let session_id = event
5100                    .properties
5101                    .get("sessionID")
5102                    .and_then(|v| v.as_str())
5103                    .unwrap_or("");
5104                if session_id.is_empty() {
5105                    continue;
5106                }
5107                let prompt_tokens = event
5108                    .properties
5109                    .get("promptTokens")
5110                    .and_then(|v| v.as_u64())
5111                    .unwrap_or(0);
5112                let completion_tokens = event
5113                    .properties
5114                    .get("completionTokens")
5115                    .and_then(|v| v.as_u64())
5116                    .unwrap_or(0);
5117                let total_tokens = event
5118                    .properties
5119                    .get("totalTokens")
5120                    .and_then(|v| v.as_u64())
5121                    .unwrap_or(prompt_tokens.saturating_add(completion_tokens));
5122                state
5123                    .apply_provider_usage_to_runs(
5124                        session_id,
5125                        prompt_tokens,
5126                        completion_tokens,
5127                        total_tokens,
5128                    )
5129                    .await;
5130            }
5131            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
5132            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
5133        }
5134    }
5135}
5136
5137fn is_bug_monitor_candidate_event(event: &EngineEvent) -> bool {
5138    if event.event_type.starts_with("bug_monitor.") {
5139        return false;
5140    }
5141    matches!(
5142        event.event_type.as_str(),
5143        "context.task.failed" | "workflow.run.failed" | "routine.run.failed" | "session.error"
5144    )
5145}
5146
5147async fn process_bug_monitor_event(
5148    state: &AppState,
5149    event: &EngineEvent,
5150    config: &BugMonitorConfig,
5151) -> anyhow::Result<BugMonitorIncidentRecord> {
5152    let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
5153    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5154        state,
5155        submission.repo.as_deref().unwrap_or_default(),
5156        submission.fingerprint.as_deref().unwrap_or_default(),
5157        submission.title.as_deref(),
5158        submission.detail.as_deref(),
5159        &submission.excerpt,
5160        3,
5161    )
5162    .await;
5163    let fingerprint = submission
5164        .fingerprint
5165        .clone()
5166        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5167    let default_workspace_root = state.workspace_index.snapshot().await.root;
5168    let workspace_root = config
5169        .workspace_root
5170        .clone()
5171        .unwrap_or(default_workspace_root);
5172    let now = now_ms();
5173
5174    let existing = state
5175        .bug_monitor_incidents
5176        .read()
5177        .await
5178        .values()
5179        .find(|row| row.fingerprint == fingerprint)
5180        .cloned();
5181
5182    let mut incident = if let Some(mut row) = existing {
5183        row.occurrence_count = row.occurrence_count.saturating_add(1);
5184        row.updated_at_ms = now;
5185        row.last_seen_at_ms = Some(now);
5186        if row.excerpt.is_empty() {
5187            row.excerpt = submission.excerpt.clone();
5188        }
5189        row
5190    } else {
5191        BugMonitorIncidentRecord {
5192            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5193            fingerprint: fingerprint.clone(),
5194            event_type: event.event_type.clone(),
5195            status: "queued".to_string(),
5196            repo: submission.repo.clone().unwrap_or_default(),
5197            workspace_root,
5198            title: submission
5199                .title
5200                .clone()
5201                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5202            detail: submission.detail.clone(),
5203            excerpt: submission.excerpt.clone(),
5204            source: submission.source.clone(),
5205            run_id: submission.run_id.clone(),
5206            session_id: submission.session_id.clone(),
5207            correlation_id: submission.correlation_id.clone(),
5208            component: submission.component.clone(),
5209            level: submission.level.clone(),
5210            occurrence_count: 1,
5211            created_at_ms: now,
5212            updated_at_ms: now,
5213            last_seen_at_ms: Some(now),
5214            draft_id: None,
5215            triage_run_id: None,
5216            last_error: None,
5217            duplicate_summary: None,
5218            duplicate_matches: None,
5219            event_payload: Some(event.properties.clone()),
5220        }
5221    };
5222    state.put_bug_monitor_incident(incident.clone()).await?;
5223
5224    if !duplicate_matches.is_empty() {
5225        incident.status = "duplicate_suppressed".to_string();
5226        let duplicate_summary =
5227            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5228        incident.duplicate_summary = Some(duplicate_summary.clone());
5229        incident.duplicate_matches = Some(duplicate_matches.clone());
5230        incident.updated_at_ms = now_ms();
5231        state.put_bug_monitor_incident(incident.clone()).await?;
5232        state.event_bus.publish(EngineEvent::new(
5233            "bug_monitor.incident.duplicate_suppressed",
5234            serde_json::json!({
5235                "incident_id": incident.incident_id,
5236                "fingerprint": incident.fingerprint,
5237                "eventType": incident.event_type,
5238                "status": incident.status,
5239                "duplicate_summary": duplicate_summary,
5240                "duplicate_matches": duplicate_matches,
5241            }),
5242        ));
5243        return Ok(incident);
5244    }
5245
5246    let draft = match state.submit_bug_monitor_draft(submission).await {
5247        Ok(draft) => draft,
5248        Err(error) => {
5249            incident.status = "draft_failed".to_string();
5250            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5251            incident.updated_at_ms = now_ms();
5252            state.put_bug_monitor_incident(incident.clone()).await?;
5253            state.event_bus.publish(EngineEvent::new(
5254                "bug_monitor.incident.detected",
5255                serde_json::json!({
5256                    "incident_id": incident.incident_id,
5257                    "fingerprint": incident.fingerprint,
5258                    "eventType": incident.event_type,
5259                    "draft_id": incident.draft_id,
5260                    "triage_run_id": incident.triage_run_id,
5261                    "status": incident.status,
5262                    "detail": incident.last_error,
5263                }),
5264            ));
5265            return Ok(incident);
5266        }
5267    };
5268    incident.draft_id = Some(draft.draft_id.clone());
5269    incident.status = "draft_created".to_string();
5270    state.put_bug_monitor_incident(incident.clone()).await?;
5271
5272    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5273        state.clone(),
5274        &draft.draft_id,
5275        true,
5276    )
5277    .await
5278    {
5279        Ok((updated_draft, _run_id, _deduped)) => {
5280            incident.triage_run_id = updated_draft.triage_run_id.clone();
5281            if incident.triage_run_id.is_some() {
5282                incident.status = "triage_queued".to_string();
5283            }
5284            incident.last_error = None;
5285        }
5286        Err(error) => {
5287            incident.status = "draft_created".to_string();
5288            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5289        }
5290    }
5291
5292    if let Some(draft_id) = incident.draft_id.clone() {
5293        let latest_draft = state
5294            .get_bug_monitor_draft(&draft_id)
5295            .await
5296            .unwrap_or(draft.clone());
5297        match crate::bug_monitor_github::publish_draft(
5298            state,
5299            &draft_id,
5300            Some(&incident.incident_id),
5301            crate::bug_monitor_github::PublishMode::Auto,
5302        )
5303        .await
5304        {
5305            Ok(outcome) => {
5306                incident.status = outcome.action;
5307                incident.last_error = None;
5308            }
5309            Err(error) => {
5310                let detail = truncate_text(&error.to_string(), 500);
5311                incident.last_error = Some(detail.clone());
5312                let mut failed_draft = latest_draft;
5313                failed_draft.status = "github_post_failed".to_string();
5314                failed_draft.github_status = Some("github_post_failed".to_string());
5315                failed_draft.last_post_error = Some(detail.clone());
5316                let evidence_digest = failed_draft.evidence_digest.clone();
5317                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5318                let _ = crate::bug_monitor_github::record_post_failure(
5319                    state,
5320                    &failed_draft,
5321                    Some(&incident.incident_id),
5322                    "auto_post",
5323                    evidence_digest.as_deref(),
5324                    &detail,
5325                )
5326                .await;
5327            }
5328        }
5329    }
5330
5331    incident.updated_at_ms = now_ms();
5332    state.put_bug_monitor_incident(incident.clone()).await?;
5333    state.event_bus.publish(EngineEvent::new(
5334        "bug_monitor.incident.detected",
5335        serde_json::json!({
5336            "incident_id": incident.incident_id,
5337            "fingerprint": incident.fingerprint,
5338            "eventType": incident.event_type,
5339            "draft_id": incident.draft_id,
5340            "triage_run_id": incident.triage_run_id,
5341            "status": incident.status,
5342        }),
5343    ));
5344    Ok(incident)
5345}
5346
5347async fn build_bug_monitor_submission_from_event(
5348    state: &AppState,
5349    config: &BugMonitorConfig,
5350    event: &EngineEvent,
5351) -> anyhow::Result<BugMonitorSubmission> {
5352    let repo = config
5353        .repo
5354        .clone()
5355        .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
5356    let default_workspace_root = state.workspace_index.snapshot().await.root;
5357    let workspace_root = config
5358        .workspace_root
5359        .clone()
5360        .unwrap_or(default_workspace_root);
5361    let reason = first_string(
5362        &event.properties,
5363        &["reason", "error", "detail", "message", "summary"],
5364    );
5365    let run_id = first_string(&event.properties, &["runID", "run_id"]);
5366    let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
5367    let correlation_id = first_string(
5368        &event.properties,
5369        &["correlationID", "correlation_id", "commandID", "command_id"],
5370    );
5371    let component = first_string(
5372        &event.properties,
5373        &[
5374            "component",
5375            "routineID",
5376            "routine_id",
5377            "workflowID",
5378            "workflow_id",
5379            "task",
5380            "title",
5381        ],
5382    );
5383    let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
5384    if excerpt.is_empty() {
5385        if let Some(reason) = reason.as_ref() {
5386            excerpt.push(reason.clone());
5387        }
5388    }
5389    let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
5390    let fingerprint = sha256_hex(&[
5391        repo.as_str(),
5392        workspace_root.as_str(),
5393        event.event_type.as_str(),
5394        reason.as_deref().unwrap_or(""),
5395        run_id.as_deref().unwrap_or(""),
5396        session_id.as_deref().unwrap_or(""),
5397        correlation_id.as_deref().unwrap_or(""),
5398        component.as_deref().unwrap_or(""),
5399        serialized.as_str(),
5400    ]);
5401    let title = if let Some(component) = component.as_ref() {
5402        format!("{} failure in {}", event.event_type, component)
5403    } else {
5404        format!("{} detected", event.event_type)
5405    };
5406    let mut detail_lines = vec![
5407        format!("event_type: {}", event.event_type),
5408        format!("workspace_root: {}", workspace_root),
5409    ];
5410    if let Some(reason) = reason.as_ref() {
5411        detail_lines.push(format!("reason: {reason}"));
5412    }
5413    if let Some(run_id) = run_id.as_ref() {
5414        detail_lines.push(format!("run_id: {run_id}"));
5415    }
5416    if let Some(session_id) = session_id.as_ref() {
5417        detail_lines.push(format!("session_id: {session_id}"));
5418    }
5419    if let Some(correlation_id) = correlation_id.as_ref() {
5420        detail_lines.push(format!("correlation_id: {correlation_id}"));
5421    }
5422    if let Some(component) = component.as_ref() {
5423        detail_lines.push(format!("component: {component}"));
5424    }
5425    if !serialized.trim().is_empty() {
5426        detail_lines.push(String::new());
5427        detail_lines.push("payload:".to_string());
5428        detail_lines.push(truncate_text(&serialized, 2_000));
5429    }
5430
5431    Ok(BugMonitorSubmission {
5432        repo: Some(repo),
5433        title: Some(title),
5434        detail: Some(detail_lines.join("\n")),
5435        source: Some("tandem_events".to_string()),
5436        run_id,
5437        session_id,
5438        correlation_id,
5439        file_name: None,
5440        process: Some("tandem-engine".to_string()),
5441        component,
5442        event: Some(event.event_type.clone()),
5443        level: Some("error".to_string()),
5444        excerpt,
5445        fingerprint: Some(fingerprint),
5446    })
5447}
5448
5449async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
5450    let mut excerpt = Vec::new();
5451    if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
5452        excerpt.push(reason);
5453    }
5454    if let Some(title) = first_string(properties, &["title", "task"]) {
5455        if !excerpt.iter().any(|row| row == &title) {
5456            excerpt.push(title);
5457        }
5458    }
5459    let logs = state.logs.read().await;
5460    for entry in logs.iter().rev().take(3) {
5461        if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
5462            excerpt.push(truncate_text(message, 240));
5463        }
5464    }
5465    excerpt.truncate(8);
5466    excerpt
5467}
5468
5469fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
5470    for key in keys {
5471        if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
5472            let trimmed = value.trim();
5473            if !trimmed.is_empty() {
5474                return Some(trimmed.to_string());
5475            }
5476        }
5477    }
5478    None
5479}
5480
5481fn sha256_hex(parts: &[&str]) -> String {
5482    let mut hasher = Sha256::new();
5483    for part in parts {
5484        hasher.update(part.as_bytes());
5485        hasher.update([0u8]);
5486    }
5487    format!("{:x}", hasher.finalize())
5488}
5489
5490pub async fn run_routine_scheduler(state: AppState) {
5491    loop {
5492        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5493        let now = now_ms();
5494        let plans = state.evaluate_routine_misfires(now).await;
5495        for plan in plans {
5496            let Some(routine) = state.get_routine(&plan.routine_id).await else {
5497                continue;
5498            };
5499            match evaluate_routine_execution_policy(&routine, "scheduled") {
5500                RoutineExecutionDecision::Allowed => {
5501                    let _ = state.mark_routine_fired(&plan.routine_id, now).await;
5502                    let run = state
5503                        .create_routine_run(
5504                            &routine,
5505                            "scheduled",
5506                            plan.run_count,
5507                            RoutineRunStatus::Queued,
5508                            None,
5509                        )
5510                        .await;
5511                    state
5512                        .append_routine_history(RoutineHistoryEvent {
5513                            routine_id: plan.routine_id.clone(),
5514                            trigger_type: "scheduled".to_string(),
5515                            run_count: plan.run_count,
5516                            fired_at_ms: now,
5517                            status: "queued".to_string(),
5518                            detail: None,
5519                        })
5520                        .await;
5521                    state.event_bus.publish(EngineEvent::new(
5522                        "routine.fired",
5523                        serde_json::json!({
5524                            "routineID": plan.routine_id,
5525                            "runID": run.run_id,
5526                            "runCount": plan.run_count,
5527                            "scheduledAtMs": plan.scheduled_at_ms,
5528                            "nextFireAtMs": plan.next_fire_at_ms,
5529                        }),
5530                    ));
5531                    state.event_bus.publish(EngineEvent::new(
5532                        "routine.run.created",
5533                        serde_json::json!({
5534                            "run": run,
5535                        }),
5536                    ));
5537                }
5538                RoutineExecutionDecision::RequiresApproval { reason } => {
5539                    let run = state
5540                        .create_routine_run(
5541                            &routine,
5542                            "scheduled",
5543                            plan.run_count,
5544                            RoutineRunStatus::PendingApproval,
5545                            Some(reason.clone()),
5546                        )
5547                        .await;
5548                    state
5549                        .append_routine_history(RoutineHistoryEvent {
5550                            routine_id: plan.routine_id.clone(),
5551                            trigger_type: "scheduled".to_string(),
5552                            run_count: plan.run_count,
5553                            fired_at_ms: now,
5554                            status: "pending_approval".to_string(),
5555                            detail: Some(reason.clone()),
5556                        })
5557                        .await;
5558                    state.event_bus.publish(EngineEvent::new(
5559                        "routine.approval_required",
5560                        serde_json::json!({
5561                            "routineID": plan.routine_id,
5562                            "runID": run.run_id,
5563                            "runCount": plan.run_count,
5564                            "triggerType": "scheduled",
5565                            "reason": reason,
5566                        }),
5567                    ));
5568                    state.event_bus.publish(EngineEvent::new(
5569                        "routine.run.created",
5570                        serde_json::json!({
5571                            "run": run,
5572                        }),
5573                    ));
5574                }
5575                RoutineExecutionDecision::Blocked { reason } => {
5576                    let run = state
5577                        .create_routine_run(
5578                            &routine,
5579                            "scheduled",
5580                            plan.run_count,
5581                            RoutineRunStatus::BlockedPolicy,
5582                            Some(reason.clone()),
5583                        )
5584                        .await;
5585                    state
5586                        .append_routine_history(RoutineHistoryEvent {
5587                            routine_id: plan.routine_id.clone(),
5588                            trigger_type: "scheduled".to_string(),
5589                            run_count: plan.run_count,
5590                            fired_at_ms: now,
5591                            status: "blocked_policy".to_string(),
5592                            detail: Some(reason.clone()),
5593                        })
5594                        .await;
5595                    state.event_bus.publish(EngineEvent::new(
5596                        "routine.blocked",
5597                        serde_json::json!({
5598                            "routineID": plan.routine_id,
5599                            "runID": run.run_id,
5600                            "runCount": plan.run_count,
5601                            "triggerType": "scheduled",
5602                            "reason": reason,
5603                        }),
5604                    ));
5605                    state.event_bus.publish(EngineEvent::new(
5606                        "routine.run.created",
5607                        serde_json::json!({
5608                            "run": run,
5609                        }),
5610                    ));
5611                }
5612            }
5613        }
5614    }
5615}
5616
5617pub async fn run_routine_executor(state: AppState) {
5618    loop {
5619        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5620        let Some(run) = state.claim_next_queued_routine_run().await else {
5621            continue;
5622        };
5623
5624        state.event_bus.publish(EngineEvent::new(
5625            "routine.run.started",
5626            serde_json::json!({
5627                "runID": run.run_id,
5628                "routineID": run.routine_id,
5629                "triggerType": run.trigger_type,
5630                "startedAtMs": now_ms(),
5631            }),
5632        ));
5633
5634        let workspace_root = state.workspace_index.snapshot().await.root;
5635        let mut session = Session::new(
5636            Some(format!("Routine {}", run.routine_id)),
5637            Some(workspace_root.clone()),
5638        );
5639        let session_id = session.id.clone();
5640        session.workspace_root = Some(workspace_root);
5641
5642        if let Err(error) = state.storage.save_session(session).await {
5643            let detail = format!("failed to create routine session: {error}");
5644            let _ = state
5645                .update_routine_run_status(
5646                    &run.run_id,
5647                    RoutineRunStatus::Failed,
5648                    Some(detail.clone()),
5649                )
5650                .await;
5651            state.event_bus.publish(EngineEvent::new(
5652                "routine.run.failed",
5653                serde_json::json!({
5654                    "runID": run.run_id,
5655                    "routineID": run.routine_id,
5656                    "reason": detail,
5657                }),
5658            ));
5659            continue;
5660        }
5661
5662        state
5663            .set_routine_session_policy(
5664                session_id.clone(),
5665                run.run_id.clone(),
5666                run.routine_id.clone(),
5667                run.allowed_tools.clone(),
5668            )
5669            .await;
5670        state
5671            .add_active_session_id(&run.run_id, session_id.clone())
5672            .await;
5673        state
5674            .engine_loop
5675            .set_session_allowed_tools(&session_id, run.allowed_tools.clone())
5676            .await;
5677        state
5678            .engine_loop
5679            .set_session_auto_approve_permissions(&session_id, true)
5680            .await;
5681
5682        let (selected_model, model_source) = resolve_routine_model_spec_for_run(&state, &run).await;
5683        if let Some(spec) = selected_model.as_ref() {
5684            state.event_bus.publish(EngineEvent::new(
5685                "routine.run.model_selected",
5686                serde_json::json!({
5687                    "runID": run.run_id,
5688                    "routineID": run.routine_id,
5689                    "providerID": spec.provider_id,
5690                    "modelID": spec.model_id,
5691                    "source": model_source,
5692                }),
5693            ));
5694        }
5695
5696        let request = SendMessageRequest {
5697            parts: vec![MessagePartInput::Text {
5698                text: build_routine_prompt(&state, &run).await,
5699            }],
5700            model: selected_model,
5701            agent: None,
5702            tool_mode: None,
5703            tool_allowlist: None,
5704            context_mode: None,
5705            write_required: None,
5706        };
5707
5708        let run_result = state
5709            .engine_loop
5710            .run_prompt_async_with_context(
5711                session_id.clone(),
5712                request,
5713                Some(format!("routine:{}", run.run_id)),
5714            )
5715            .await;
5716
5717        state.clear_routine_session_policy(&session_id).await;
5718        state
5719            .clear_active_session_id(&run.run_id, &session_id)
5720            .await;
5721        state
5722            .engine_loop
5723            .clear_session_allowed_tools(&session_id)
5724            .await;
5725        state
5726            .engine_loop
5727            .clear_session_auto_approve_permissions(&session_id)
5728            .await;
5729
5730        match run_result {
5731            Ok(()) => {
5732                append_configured_output_artifacts(&state, &run).await;
5733                let _ = state
5734                    .update_routine_run_status(
5735                        &run.run_id,
5736                        RoutineRunStatus::Completed,
5737                        Some("routine run completed".to_string()),
5738                    )
5739                    .await;
5740                state.event_bus.publish(EngineEvent::new(
5741                    "routine.run.completed",
5742                    serde_json::json!({
5743                        "runID": run.run_id,
5744                        "routineID": run.routine_id,
5745                        "sessionID": session_id,
5746                        "finishedAtMs": now_ms(),
5747                    }),
5748                ));
5749            }
5750            Err(error) => {
5751                if let Some(latest) = state.get_routine_run(&run.run_id).await {
5752                    if latest.status == RoutineRunStatus::Paused {
5753                        state.event_bus.publish(EngineEvent::new(
5754                            "routine.run.paused",
5755                            serde_json::json!({
5756                                "runID": run.run_id,
5757                                "routineID": run.routine_id,
5758                                "sessionID": session_id,
5759                                "finishedAtMs": now_ms(),
5760                            }),
5761                        ));
5762                        continue;
5763                    }
5764                }
5765                let detail = truncate_text(&error.to_string(), 500);
5766                let _ = state
5767                    .update_routine_run_status(
5768                        &run.run_id,
5769                        RoutineRunStatus::Failed,
5770                        Some(detail.clone()),
5771                    )
5772                    .await;
5773                state.event_bus.publish(EngineEvent::new(
5774                    "routine.run.failed",
5775                    serde_json::json!({
5776                        "runID": run.run_id,
5777                        "routineID": run.routine_id,
5778                        "sessionID": session_id,
5779                        "reason": detail,
5780                        "finishedAtMs": now_ms(),
5781                    }),
5782                ));
5783            }
5784        }
5785    }
5786}
5787
5788pub async fn run_automation_v2_scheduler(state: AppState) {
5789    loop {
5790        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
5791        let startup = state.startup_snapshot().await;
5792        if !matches!(startup.status, StartupStatus::Ready) {
5793            continue;
5794        }
5795        let now = now_ms();
5796        let due = state.evaluate_automation_v2_misfires(now).await;
5797        for automation_id in due {
5798            let Some(automation) = state.get_automation_v2(&automation_id).await else {
5799                continue;
5800            };
5801            if let Ok(run) = state
5802                .create_automation_v2_run(&automation, "scheduled")
5803                .await
5804            {
5805                state.event_bus.publish(EngineEvent::new(
5806                    "automation.v2.run.created",
5807                    serde_json::json!({
5808                        "automationID": automation_id,
5809                        "run": run,
5810                        "triggerType": "scheduled",
5811                    }),
5812                ));
5813            }
5814        }
5815    }
5816}
5817
5818fn build_automation_v2_upstream_inputs(
5819    run: &AutomationV2RunRecord,
5820    node: &AutomationFlowNode,
5821) -> anyhow::Result<Vec<Value>> {
5822    let mut inputs = Vec::new();
5823    for input_ref in &node.input_refs {
5824        let Some(output) = run.checkpoint.node_outputs.get(&input_ref.from_step_id) else {
5825            anyhow::bail!(
5826                "missing upstream output for `{}` referenced by node `{}`",
5827                input_ref.from_step_id,
5828                node.node_id
5829            );
5830        };
5831        inputs.push(json!({
5832            "alias": input_ref.alias,
5833            "from_step_id": input_ref.from_step_id,
5834            "output": output,
5835        }));
5836    }
5837    Ok(inputs)
5838}
5839
5840fn render_automation_v2_prompt(
5841    automation: &AutomationV2Spec,
5842    run_id: &str,
5843    node: &AutomationFlowNode,
5844    agent: &AutomationAgentProfile,
5845    upstream_inputs: &[Value],
5846) -> String {
5847    let contract_kind = node
5848        .output_contract
5849        .as_ref()
5850        .map(|contract| contract.kind.as_str())
5851        .unwrap_or("structured_json");
5852    let mut prompt = format!(
5853        "Automation ID: {}\nRun ID: {}\nNode ID: {}\nAgent: {}\nObjective: {}\nOutput contract kind: {}",
5854        automation.automation_id, run_id, node.node_id, agent.display_name, node.objective, contract_kind
5855    );
5856    if !upstream_inputs.is_empty() {
5857        prompt.push_str("\n\nUpstream Inputs:");
5858        for input in upstream_inputs {
5859            let alias = input
5860                .get("alias")
5861                .and_then(Value::as_str)
5862                .unwrap_or("input");
5863            let from_step_id = input
5864                .get("from_step_id")
5865                .and_then(Value::as_str)
5866                .unwrap_or("unknown");
5867            let output = input.get("output").cloned().unwrap_or(Value::Null);
5868            let rendered =
5869                serde_json::to_string_pretty(&output).unwrap_or_else(|_| output.to_string());
5870            prompt.push_str(&format!(
5871                "\n- {}\n  from_step_id: {}\n  output:\n{}",
5872                alias,
5873                from_step_id,
5874                rendered
5875                    .lines()
5876                    .map(|line| format!("    {}", line))
5877                    .collect::<Vec<_>>()
5878                    .join("\n")
5879            ));
5880        }
5881    }
5882    if node.node_id == "notify_user" || node.objective.to_ascii_lowercase().contains("email") {
5883        prompt.push_str(
5884            "\n\nDelivery rules:\n- Prefer inline email body delivery by default.\n- Only include an email attachment when upstream inputs contain a concrete attachment artifact with a non-empty s3key or upload result.\n- Never send an attachment parameter with an empty or null s3key.\n- If no attachment artifact exists, omit the attachment parameter entirely.",
5885        );
5886    }
5887    prompt.push_str(
5888        "\n\nReturn a concise completion. If you produce structured content, keep it valid JSON inside the response body.",
5889    );
5890    prompt
5891}
5892
5893fn extract_session_text_output(session: &Session) -> String {
5894    session
5895        .messages
5896        .iter()
5897        .rev()
5898        .find(|message| matches!(message.role, MessageRole::Assistant))
5899        .map(|message| {
5900            message
5901                .parts
5902                .iter()
5903                .filter_map(|part| match part {
5904                    MessagePart::Text { text } | MessagePart::Reasoning { text } => {
5905                        Some(text.as_str())
5906                    }
5907                    MessagePart::ToolInvocation { .. } => None,
5908                })
5909                .collect::<Vec<_>>()
5910                .join("\n")
5911        })
5912        .unwrap_or_default()
5913}
5914
5915fn wrap_automation_node_output(
5916    node: &AutomationFlowNode,
5917    session_id: &str,
5918    session_text: &str,
5919) -> Value {
5920    let contract_kind = node
5921        .output_contract
5922        .as_ref()
5923        .map(|contract| contract.kind.clone())
5924        .unwrap_or_else(|| "structured_json".to_string());
5925    let summary = if session_text.trim().is_empty() {
5926        format!("Node `{}` completed successfully.", node.node_id)
5927    } else {
5928        truncate_text(session_text.trim(), 240)
5929    };
5930    let content = match contract_kind.as_str() {
5931        "report_markdown" | "text_summary" => {
5932            json!({ "text": session_text.trim(), "session_id": session_id })
5933        }
5934        "urls" => json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id }),
5935        "citations" => {
5936            json!({ "items": [], "raw_text": session_text.trim(), "session_id": session_id })
5937        }
5938        _ => json!({ "text": session_text.trim(), "session_id": session_id }),
5939    };
5940    json!(AutomationNodeOutput {
5941        contract_kind,
5942        summary,
5943        content,
5944        created_at_ms: now_ms(),
5945        node_id: node.node_id.clone(),
5946    })
5947}
5948
5949fn automation_node_max_attempts(node: &AutomationFlowNode) -> u32 {
5950    node.retry_policy
5951        .as_ref()
5952        .and_then(|value| value.get("max_attempts"))
5953        .and_then(Value::as_u64)
5954        .map(|value| value.clamp(1, 10) as u32)
5955        .unwrap_or(3)
5956}
5957
5958async fn resolve_automation_v2_workspace_root(
5959    state: &AppState,
5960    automation: &AutomationV2Spec,
5961) -> String {
5962    if let Some(workspace_root) = automation
5963        .workspace_root
5964        .as_deref()
5965        .map(str::trim)
5966        .filter(|value| !value.is_empty())
5967        .map(str::to_string)
5968    {
5969        return workspace_root;
5970    }
5971    if let Some(workspace_root) = automation
5972        .metadata
5973        .as_ref()
5974        .and_then(|row| row.get("workspace_root"))
5975        .and_then(Value::as_str)
5976        .map(str::trim)
5977        .filter(|value| !value.is_empty())
5978        .map(str::to_string)
5979    {
5980        return workspace_root;
5981    }
5982    state.workspace_index.snapshot().await.root
5983}
5984
5985async fn execute_automation_v2_node(
5986    state: &AppState,
5987    run_id: &str,
5988    automation: &AutomationV2Spec,
5989    node: &AutomationFlowNode,
5990    agent: &AutomationAgentProfile,
5991) -> anyhow::Result<Value> {
5992    let run = state
5993        .get_automation_v2_run(run_id)
5994        .await
5995        .ok_or_else(|| anyhow::anyhow!("automation run `{}` not found", run_id))?;
5996    let upstream_inputs = build_automation_v2_upstream_inputs(&run, node)?;
5997    let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
5998    let workspace_path = PathBuf::from(&workspace_root);
5999    if !workspace_path.exists() {
6000        anyhow::bail!(
6001            "workspace_root `{}` for automation `{}` does not exist",
6002            workspace_root,
6003            automation.automation_id
6004        );
6005    }
6006    if !workspace_path.is_dir() {
6007        anyhow::bail!(
6008            "workspace_root `{}` for automation `{}` is not a directory",
6009            workspace_root,
6010            automation.automation_id
6011        );
6012    }
6013    let mut session = Session::new(
6014        Some(format!(
6015            "Automation {} / {}",
6016            automation.automation_id, node.node_id
6017        )),
6018        Some(workspace_root.clone()),
6019    );
6020    let session_id = session.id.clone();
6021    session.workspace_root = Some(workspace_root);
6022    state.storage.save_session(session).await?;
6023
6024    state.add_automation_v2_session(run_id, &session_id).await;
6025
6026    let mut allowlist = agent.tool_policy.allowlist.clone();
6027    if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
6028        allowlist.extend(mcp_tools.clone());
6029    }
6030    state
6031        .engine_loop
6032        .set_session_allowed_tools(&session_id, normalize_allowed_tools(allowlist))
6033        .await;
6034    state
6035        .engine_loop
6036        .set_session_auto_approve_permissions(&session_id, true)
6037        .await;
6038
6039    let model = agent
6040        .model_policy
6041        .as_ref()
6042        .and_then(|policy| policy.get("default_model"))
6043        .and_then(parse_model_spec);
6044    let prompt = render_automation_v2_prompt(automation, run_id, node, agent, &upstream_inputs);
6045    let req = SendMessageRequest {
6046        parts: vec![MessagePartInput::Text { text: prompt }],
6047        model,
6048        agent: None,
6049        tool_mode: None,
6050        tool_allowlist: None,
6051        context_mode: None,
6052        write_required: None,
6053    };
6054    let result = state
6055        .engine_loop
6056        .run_prompt_async_with_context(
6057            session_id.clone(),
6058            req,
6059            Some(format!("automation-v2:{run_id}")),
6060        )
6061        .await;
6062
6063    state
6064        .engine_loop
6065        .clear_session_allowed_tools(&session_id)
6066        .await;
6067    state
6068        .engine_loop
6069        .clear_session_auto_approve_permissions(&session_id)
6070        .await;
6071    state.clear_automation_v2_session(run_id, &session_id).await;
6072
6073    result?;
6074    let session = state
6075        .storage
6076        .get_session(&session_id)
6077        .await
6078        .ok_or_else(|| anyhow::anyhow!("automation session `{}` missing after run", session_id))?;
6079    let session_text = extract_session_text_output(&session);
6080    Ok(wrap_automation_node_output(
6081        node,
6082        &session_id,
6083        &session_text,
6084    ))
6085}
6086
6087pub async fn run_automation_v2_executor(state: AppState) {
6088    loop {
6089        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
6090        let Some(run) = state.claim_next_queued_automation_v2_run().await else {
6091            continue;
6092        };
6093        let Some(automation) = state.get_automation_v2(&run.automation_id).await else {
6094            let _ = state
6095                .update_automation_v2_run(&run.run_id, |row| {
6096                    row.status = AutomationRunStatus::Failed;
6097                    row.detail = Some("automation not found".to_string());
6098                })
6099                .await;
6100            continue;
6101        };
6102        let max_parallel = automation
6103            .execution
6104            .max_parallel_agents
6105            .unwrap_or(1)
6106            .clamp(1, 16) as usize;
6107
6108        loop {
6109            let Some(latest) = state.get_automation_v2_run(&run.run_id).await else {
6110                break;
6111            };
6112            if matches!(
6113                latest.status,
6114                AutomationRunStatus::Paused
6115                    | AutomationRunStatus::Pausing
6116                    | AutomationRunStatus::Cancelled
6117                    | AutomationRunStatus::Failed
6118                    | AutomationRunStatus::Completed
6119            ) {
6120                break;
6121            }
6122            if latest.checkpoint.pending_nodes.is_empty() {
6123                let _ = state
6124                    .update_automation_v2_run(&run.run_id, |row| {
6125                        row.status = AutomationRunStatus::Completed;
6126                        row.detail = Some("automation run completed".to_string());
6127                    })
6128                    .await;
6129                break;
6130            }
6131
6132            let completed = latest
6133                .checkpoint
6134                .completed_nodes
6135                .iter()
6136                .cloned()
6137                .collect::<std::collections::HashSet<_>>();
6138            let pending = latest.checkpoint.pending_nodes.clone();
6139            let runnable = pending
6140                .iter()
6141                .filter_map(|node_id| {
6142                    let node = automation
6143                        .flow
6144                        .nodes
6145                        .iter()
6146                        .find(|n| n.node_id == *node_id)?;
6147                    if node.depends_on.iter().all(|dep| completed.contains(dep)) {
6148                        Some(node.clone())
6149                    } else {
6150                        None
6151                    }
6152                })
6153                .take(max_parallel)
6154                .collect::<Vec<_>>();
6155
6156            if runnable.is_empty() {
6157                let _ = state
6158                    .update_automation_v2_run(&run.run_id, |row| {
6159                        row.status = AutomationRunStatus::Failed;
6160                        row.detail = Some("flow deadlock: no runnable nodes".to_string());
6161                    })
6162                    .await;
6163                break;
6164            }
6165
6166            let runnable_node_ids = runnable
6167                .iter()
6168                .map(|node| node.node_id.clone())
6169                .collect::<Vec<_>>();
6170            let _ = state
6171                .update_automation_v2_run(&run.run_id, |row| {
6172                    for node_id in &runnable_node_ids {
6173                        let attempts = row
6174                            .checkpoint
6175                            .node_attempts
6176                            .entry(node_id.clone())
6177                            .or_insert(0);
6178                        *attempts += 1;
6179                    }
6180                })
6181                .await;
6182
6183            let tasks = runnable
6184                .iter()
6185                .map(|node| {
6186                    let Some(agent) = automation
6187                        .agents
6188                        .iter()
6189                        .find(|a| a.agent_id == node.agent_id)
6190                        .cloned()
6191                    else {
6192                        return futures::future::ready((
6193                            node.node_id.clone(),
6194                            Err(anyhow::anyhow!("agent not found")),
6195                        ))
6196                        .boxed();
6197                    };
6198                    let state = state.clone();
6199                    let run_id = run.run_id.clone();
6200                    let automation = automation.clone();
6201                    let node = node.clone();
6202                    async move {
6203                        let result =
6204                            execute_automation_v2_node(&state, &run_id, &automation, &node, &agent)
6205                                .await;
6206                        (node.node_id, result)
6207                    }
6208                    .boxed()
6209                })
6210                .collect::<Vec<_>>();
6211            let outcomes = join_all(tasks).await;
6212
6213            let mut terminal_failure = None::<String>;
6214            let latest_attempts = state
6215                .get_automation_v2_run(&run.run_id)
6216                .await
6217                .map(|row| row.checkpoint.node_attempts)
6218                .unwrap_or_default();
6219            for (node_id, result) in outcomes {
6220                match result {
6221                    Ok(output) => {
6222                        let _ = state
6223                            .update_automation_v2_run(&run.run_id, |row| {
6224                                row.checkpoint.pending_nodes.retain(|id| id != &node_id);
6225                                if !row
6226                                    .checkpoint
6227                                    .completed_nodes
6228                                    .iter()
6229                                    .any(|id| id == &node_id)
6230                                {
6231                                    row.checkpoint.completed_nodes.push(node_id.clone());
6232                                }
6233                                row.checkpoint.node_outputs.insert(node_id.clone(), output);
6234                            })
6235                            .await;
6236                    }
6237                    Err(error) => {
6238                        let is_paused = state
6239                            .get_automation_v2_run(&run.run_id)
6240                            .await
6241                            .map(|row| row.status == AutomationRunStatus::Paused)
6242                            .unwrap_or(false);
6243                        if is_paused {
6244                            break;
6245                        }
6246                        let detail = truncate_text(&error.to_string(), 500);
6247                        let attempts = latest_attempts.get(&node_id).copied().unwrap_or(1);
6248                        let max_attempts = automation
6249                            .flow
6250                            .nodes
6251                            .iter()
6252                            .find(|row| row.node_id == node_id)
6253                            .map(automation_node_max_attempts)
6254                            .unwrap_or(1);
6255                        if attempts >= max_attempts {
6256                            terminal_failure = Some(format!(
6257                                "node `{}` failed after {}/{} attempts: {}",
6258                                node_id, attempts, max_attempts, detail
6259                            ));
6260                            break;
6261                        }
6262                        let _ = state
6263                            .update_automation_v2_run(&run.run_id, |row| {
6264                                row.detail = Some(format!(
6265                                    "retrying node `{}` after attempt {}/{} failed: {}",
6266                                    node_id, attempts, max_attempts, detail
6267                                ));
6268                            })
6269                            .await;
6270                    }
6271                }
6272            }
6273            if let Some(detail) = terminal_failure {
6274                let _ = state
6275                    .update_automation_v2_run(&run.run_id, |row| {
6276                        row.status = AutomationRunStatus::Failed;
6277                        row.detail = Some(detail);
6278                    })
6279                    .await;
6280                break;
6281            }
6282        }
6283    }
6284}
6285
6286async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
6287    let normalized_entrypoint = run.entrypoint.trim();
6288    let known_tool = state
6289        .tools
6290        .list()
6291        .await
6292        .into_iter()
6293        .any(|schema| schema.name == normalized_entrypoint);
6294    if known_tool {
6295        let args = if run.args.is_object() {
6296            run.args.clone()
6297        } else {
6298            serde_json::json!({})
6299        };
6300        return format!("/tool {} {}", normalized_entrypoint, args);
6301    }
6302
6303    if let Some(objective) = routine_objective_from_args(run) {
6304        return build_routine_mission_prompt(run, &objective);
6305    }
6306
6307    format!(
6308        "Execute routine '{}' using entrypoint '{}' with args: {}",
6309        run.routine_id, run.entrypoint, run.args
6310    )
6311}
6312
6313fn routine_objective_from_args(run: &RoutineRunRecord) -> Option<String> {
6314    run.args
6315        .get("prompt")
6316        .and_then(|v| v.as_str())
6317        .map(str::trim)
6318        .filter(|v| !v.is_empty())
6319        .map(ToString::to_string)
6320}
6321
6322fn routine_mode_from_args(args: &Value) -> &str {
6323    args.get("mode")
6324        .and_then(|v| v.as_str())
6325        .map(str::trim)
6326        .filter(|v| !v.is_empty())
6327        .unwrap_or("standalone")
6328}
6329
6330fn routine_success_criteria_from_args(args: &Value) -> Vec<String> {
6331    args.get("success_criteria")
6332        .and_then(|v| v.as_array())
6333        .map(|rows| {
6334            rows.iter()
6335                .filter_map(|row| row.as_str())
6336                .map(str::trim)
6337                .filter(|row| !row.is_empty())
6338                .map(ToString::to_string)
6339                .collect::<Vec<_>>()
6340        })
6341        .unwrap_or_default()
6342}
6343
6344fn build_routine_mission_prompt(run: &RoutineRunRecord, objective: &str) -> String {
6345    let mode = routine_mode_from_args(&run.args);
6346    let success_criteria = routine_success_criteria_from_args(&run.args);
6347    let orchestrator_only_tool_calls = run
6348        .args
6349        .get("orchestrator_only_tool_calls")
6350        .and_then(|v| v.as_bool())
6351        .unwrap_or(false);
6352
6353    let mut lines = vec![
6354        format!("Automation ID: {}", run.routine_id),
6355        format!("Run ID: {}", run.run_id),
6356        format!("Mode: {}", mode),
6357        format!("Mission Objective: {}", objective),
6358    ];
6359
6360    if !success_criteria.is_empty() {
6361        lines.push("Success Criteria:".to_string());
6362        for criterion in success_criteria {
6363            lines.push(format!("- {}", criterion));
6364        }
6365    }
6366
6367    if run.allowed_tools.is_empty() {
6368        lines.push("Allowed Tools: all available by current policy".to_string());
6369    } else {
6370        lines.push(format!("Allowed Tools: {}", run.allowed_tools.join(", ")));
6371    }
6372
6373    if run.output_targets.is_empty() {
6374        lines.push("Output Targets: none configured".to_string());
6375    } else {
6376        lines.push("Output Targets:".to_string());
6377        for target in &run.output_targets {
6378            lines.push(format!("- {}", target));
6379        }
6380    }
6381
6382    if mode.eq_ignore_ascii_case("orchestrated") {
6383        lines.push("Execution Pattern: Plan -> Do -> Verify -> Notify".to_string());
6384        lines
6385            .push("Role Contract: Orchestrator owns final decisions and final output.".to_string());
6386        if orchestrator_only_tool_calls {
6387            lines.push(
6388                "Tool Policy: only the orchestrator may execute tools; helper roles propose actions/results."
6389                    .to_string(),
6390            );
6391        }
6392    } else {
6393        lines.push("Execution Pattern: Standalone mission run".to_string());
6394    }
6395
6396    lines.push(
6397        "Deliverable: produce a concise final report that states what was done, what was verified, and final artifact locations."
6398            .to_string(),
6399    );
6400
6401    lines.join("\n")
6402}
6403
6404fn truncate_text(input: &str, max_len: usize) -> String {
6405    if input.len() <= max_len {
6406        return input.to_string();
6407    }
6408    let mut out = input[..max_len].to_string();
6409    out.push_str("...<truncated>");
6410    out
6411}
6412
6413async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
6414    if run.output_targets.is_empty() {
6415        return;
6416    }
6417    for target in &run.output_targets {
6418        let artifact = RoutineRunArtifact {
6419            artifact_id: format!("artifact-{}", uuid::Uuid::new_v4()),
6420            uri: target.clone(),
6421            kind: "output_target".to_string(),
6422            label: Some("configured output target".to_string()),
6423            created_at_ms: now_ms(),
6424            metadata: Some(serde_json::json!({
6425                "source": "routine.output_targets",
6426                "runID": run.run_id,
6427                "routineID": run.routine_id,
6428            })),
6429        };
6430        let _ = state
6431            .append_routine_run_artifact(&run.run_id, artifact.clone())
6432            .await;
6433        state.event_bus.publish(EngineEvent::new(
6434            "routine.run.artifact_added",
6435            serde_json::json!({
6436                "runID": run.run_id,
6437                "routineID": run.routine_id,
6438                "artifact": artifact,
6439            }),
6440        ));
6441    }
6442}
6443
6444fn parse_model_spec(value: &Value) -> Option<ModelSpec> {
6445    let obj = value.as_object()?;
6446    let provider_id = obj.get("provider_id")?.as_str()?.trim();
6447    let model_id = obj.get("model_id")?.as_str()?.trim();
6448    if provider_id.is_empty() || model_id.is_empty() {
6449        return None;
6450    }
6451    Some(ModelSpec {
6452        provider_id: provider_id.to_string(),
6453        model_id: model_id.to_string(),
6454    })
6455}
6456
6457fn model_spec_for_role_from_args(args: &Value, role: &str) -> Option<ModelSpec> {
6458    args.get("model_policy")
6459        .and_then(|v| v.get("role_models"))
6460        .and_then(|v| v.get(role))
6461        .and_then(parse_model_spec)
6462}
6463
6464fn default_model_spec_from_args(args: &Value) -> Option<ModelSpec> {
6465    args.get("model_policy")
6466        .and_then(|v| v.get("default_model"))
6467        .and_then(parse_model_spec)
6468}
6469
6470fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6471    let provider_id = config
6472        .get("default_provider")
6473        .and_then(|v| v.as_str())
6474        .map(str::trim)
6475        .filter(|v| !v.is_empty())?;
6476    let model_id = config
6477        .get("providers")
6478        .and_then(|v| v.get(provider_id))
6479        .and_then(|v| v.get("default_model"))
6480        .and_then(|v| v.as_str())
6481        .map(str::trim)
6482        .filter(|v| !v.is_empty())?;
6483    Some(ModelSpec {
6484        provider_id: provider_id.to_string(),
6485        model_id: model_id.to_string(),
6486    })
6487}
6488
6489fn provider_catalog_has_model(providers: &[tandem_types::ProviderInfo], spec: &ModelSpec) -> bool {
6490    providers.iter().any(|provider| {
6491        provider.id == spec.provider_id
6492            && provider
6493                .models
6494                .iter()
6495                .any(|model| model.id == spec.model_id)
6496    })
6497}
6498
6499async fn resolve_routine_model_spec_for_run(
6500    state: &AppState,
6501    run: &RoutineRunRecord,
6502) -> (Option<ModelSpec>, String) {
6503    let providers = state.providers.list().await;
6504    let mode = routine_mode_from_args(&run.args);
6505    let mut requested: Vec<(ModelSpec, &str)> = Vec::new();
6506
6507    if mode.eq_ignore_ascii_case("orchestrated") {
6508        if let Some(orchestrator) = model_spec_for_role_from_args(&run.args, "orchestrator") {
6509            requested.push((orchestrator, "args.model_policy.role_models.orchestrator"));
6510        }
6511    }
6512    if let Some(default_model) = default_model_spec_from_args(&run.args) {
6513        requested.push((default_model, "args.model_policy.default_model"));
6514    }
6515    let effective_config = state.config.get_effective_value().await;
6516    if let Some(config_default) = default_model_spec_from_effective_config(&effective_config) {
6517        requested.push((config_default, "config.default_provider"));
6518    }
6519
6520    for (candidate, source) in requested {
6521        if provider_catalog_has_model(&providers, &candidate) {
6522            return (Some(candidate), source.to_string());
6523        }
6524    }
6525
6526    let fallback = providers
6527        .into_iter()
6528        .find(|provider| !provider.models.is_empty())
6529        .and_then(|provider| {
6530            let model = provider.models.first()?;
6531            Some(ModelSpec {
6532                provider_id: provider.id,
6533                model_id: model.id.clone(),
6534            })
6535        });
6536
6537    (fallback, "provider_catalog_fallback".to_string())
6538}
6539
6540#[cfg(test)]
6541mod tests {
6542    use super::*;
6543
6544    fn test_state_with_path(path: PathBuf) -> AppState {
6545        let mut state = AppState::new_starting("test-attempt".to_string(), true);
6546        state.shared_resources_path = path;
6547        state.routines_path = tmp_routines_file("shared-state");
6548        state.routine_history_path = tmp_routines_file("routine-history");
6549        state.routine_runs_path = tmp_routines_file("routine-runs");
6550        state
6551    }
6552
6553    fn tmp_resource_file(name: &str) -> PathBuf {
6554        std::env::temp_dir().join(format!(
6555            "tandem-server-{name}-{}.json",
6556            uuid::Uuid::new_v4()
6557        ))
6558    }
6559
6560    fn tmp_routines_file(name: &str) -> PathBuf {
6561        std::env::temp_dir().join(format!(
6562            "tandem-server-routines-{name}-{}.json",
6563            uuid::Uuid::new_v4()
6564        ))
6565    }
6566
6567    #[test]
6568    fn default_model_spec_from_effective_config_reads_default_route() {
6569        let cfg = serde_json::json!({
6570            "default_provider": "openrouter",
6571            "providers": {
6572                "openrouter": {
6573                    "default_model": "google/gemini-3-flash-preview"
6574                }
6575            }
6576        });
6577        let spec = default_model_spec_from_effective_config(&cfg).expect("default model spec");
6578        assert_eq!(spec.provider_id, "openrouter");
6579        assert_eq!(spec.model_id, "google/gemini-3-flash-preview");
6580    }
6581
6582    #[test]
6583    fn default_model_spec_from_effective_config_returns_none_when_incomplete() {
6584        let missing_provider = serde_json::json!({
6585            "providers": {
6586                "openrouter": {
6587                    "default_model": "google/gemini-3-flash-preview"
6588                }
6589            }
6590        });
6591        assert!(default_model_spec_from_effective_config(&missing_provider).is_none());
6592
6593        let missing_model = serde_json::json!({
6594            "default_provider": "openrouter",
6595            "providers": {
6596                "openrouter": {}
6597            }
6598        });
6599        assert!(default_model_spec_from_effective_config(&missing_model).is_none());
6600    }
6601
6602    #[tokio::test]
6603    async fn shared_resource_put_increments_revision() {
6604        let path = tmp_resource_file("shared-resource-put");
6605        let state = test_state_with_path(path.clone());
6606
6607        let first = state
6608            .put_shared_resource(
6609                "project/demo/board".to_string(),
6610                serde_json::json!({"status":"todo"}),
6611                None,
6612                "agent-1".to_string(),
6613                None,
6614            )
6615            .await
6616            .expect("first put");
6617        assert_eq!(first.rev, 1);
6618
6619        let second = state
6620            .put_shared_resource(
6621                "project/demo/board".to_string(),
6622                serde_json::json!({"status":"doing"}),
6623                Some(1),
6624                "agent-2".to_string(),
6625                Some(60_000),
6626            )
6627            .await
6628            .expect("second put");
6629        assert_eq!(second.rev, 2);
6630        assert_eq!(second.updated_by, "agent-2");
6631        assert_eq!(second.ttl_ms, Some(60_000));
6632
6633        let raw = tokio::fs::read_to_string(path.clone())
6634            .await
6635            .expect("persisted");
6636        assert!(raw.contains("\"rev\": 2"));
6637        let _ = tokio::fs::remove_file(path).await;
6638    }
6639
6640    #[tokio::test]
6641    async fn shared_resource_put_detects_revision_conflict() {
6642        let path = tmp_resource_file("shared-resource-conflict");
6643        let state = test_state_with_path(path.clone());
6644
6645        let _ = state
6646            .put_shared_resource(
6647                "mission/demo/card-1".to_string(),
6648                serde_json::json!({"title":"Card 1"}),
6649                None,
6650                "agent-1".to_string(),
6651                None,
6652            )
6653            .await
6654            .expect("seed put");
6655
6656        let conflict = state
6657            .put_shared_resource(
6658                "mission/demo/card-1".to_string(),
6659                serde_json::json!({"title":"Card 1 edited"}),
6660                Some(99),
6661                "agent-2".to_string(),
6662                None,
6663            )
6664            .await
6665            .expect_err("expected conflict");
6666
6667        match conflict {
6668            ResourceStoreError::RevisionConflict(conflict) => {
6669                assert_eq!(conflict.expected_rev, Some(99));
6670                assert_eq!(conflict.current_rev, Some(1));
6671            }
6672            other => panic!("unexpected error: {other:?}"),
6673        }
6674
6675        let _ = tokio::fs::remove_file(path).await;
6676    }
6677
6678    #[tokio::test]
6679    async fn shared_resource_rejects_invalid_namespace_key() {
6680        let path = tmp_resource_file("shared-resource-invalid-key");
6681        let state = test_state_with_path(path.clone());
6682
6683        let error = state
6684            .put_shared_resource(
6685                "global/demo/key".to_string(),
6686                serde_json::json!({"x":1}),
6687                None,
6688                "agent-1".to_string(),
6689                None,
6690            )
6691            .await
6692            .expect_err("invalid key should fail");
6693
6694        match error {
6695            ResourceStoreError::InvalidKey { key } => assert_eq!(key, "global/demo/key"),
6696            other => panic!("unexpected error: {other:?}"),
6697        }
6698
6699        assert!(!path.exists());
6700    }
6701
6702    #[test]
6703    fn derive_status_index_update_for_run_started() {
6704        let event = EngineEvent::new(
6705            "session.run.started",
6706            serde_json::json!({
6707                "sessionID": "s-1",
6708                "runID": "r-1"
6709            }),
6710        );
6711        let update = derive_status_index_update(&event).expect("update");
6712        assert_eq!(update.key, "run/s-1/status");
6713        assert_eq!(
6714            update.value.get("state").and_then(|v| v.as_str()),
6715            Some("running")
6716        );
6717        assert_eq!(
6718            update.value.get("phase").and_then(|v| v.as_str()),
6719            Some("run")
6720        );
6721    }
6722
6723    #[test]
6724    fn derive_status_index_update_for_tool_invocation() {
6725        let event = EngineEvent::new(
6726            "message.part.updated",
6727            serde_json::json!({
6728                "sessionID": "s-2",
6729                "runID": "r-2",
6730                "part": { "type": "tool-invocation", "tool": "todo_write" }
6731            }),
6732        );
6733        let update = derive_status_index_update(&event).expect("update");
6734        assert_eq!(update.key, "run/s-2/status");
6735        assert_eq!(
6736            update.value.get("phase").and_then(|v| v.as_str()),
6737            Some("tool")
6738        );
6739        assert_eq!(
6740            update.value.get("toolActive").and_then(|v| v.as_bool()),
6741            Some(true)
6742        );
6743        assert_eq!(
6744            update.value.get("tool").and_then(|v| v.as_str()),
6745            Some("todo_write")
6746        );
6747    }
6748
6749    #[test]
6750    fn misfire_skip_drops_runs_and_advances_next_fire() {
6751        let (count, next_fire) =
6752            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::Skip);
6753        assert_eq!(count, 0);
6754        assert_eq!(next_fire, 11_000);
6755    }
6756
6757    #[test]
6758    fn misfire_run_once_emits_single_trigger() {
6759        let (count, next_fire) =
6760            compute_misfire_plan(10_500, 5_000, 1_000, &RoutineMisfirePolicy::RunOnce);
6761        assert_eq!(count, 1);
6762        assert_eq!(next_fire, 11_000);
6763    }
6764
6765    #[test]
6766    fn misfire_catch_up_caps_trigger_count() {
6767        let (count, next_fire) = compute_misfire_plan(
6768            25_000,
6769            5_000,
6770            1_000,
6771            &RoutineMisfirePolicy::CatchUp { max_runs: 3 },
6772        );
6773        assert_eq!(count, 3);
6774        assert_eq!(next_fire, 26_000);
6775    }
6776
6777    #[tokio::test]
6778    async fn routine_put_persists_and_loads() {
6779        let routines_path = tmp_routines_file("persist-load");
6780        let mut state = AppState::new_starting("routines-put".to_string(), true);
6781        state.routines_path = routines_path.clone();
6782
6783        let routine = RoutineSpec {
6784            routine_id: "routine-1".to_string(),
6785            name: "Digest".to_string(),
6786            status: RoutineStatus::Active,
6787            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
6788            timezone: "UTC".to_string(),
6789            misfire_policy: RoutineMisfirePolicy::RunOnce,
6790            entrypoint: "mission.default".to_string(),
6791            args: serde_json::json!({"topic":"status"}),
6792            allowed_tools: vec![],
6793            output_targets: vec![],
6794            creator_type: "user".to_string(),
6795            creator_id: "user-1".to_string(),
6796            requires_approval: true,
6797            external_integrations_allowed: false,
6798            next_fire_at_ms: Some(5_000),
6799            last_fired_at_ms: None,
6800        };
6801
6802        state.put_routine(routine).await.expect("store routine");
6803
6804        let mut reloaded = AppState::new_starting("routines-reload".to_string(), true);
6805        reloaded.routines_path = routines_path.clone();
6806        reloaded.load_routines().await.expect("load routines");
6807        let list = reloaded.list_routines().await;
6808        assert_eq!(list.len(), 1);
6809        assert_eq!(list[0].routine_id, "routine-1");
6810
6811        let _ = tokio::fs::remove_file(routines_path).await;
6812    }
6813
6814    #[tokio::test]
6815    async fn persist_routines_does_not_clobber_existing_store_with_empty_state() {
6816        let routines_path = tmp_routines_file("persist-guard");
6817        let mut writer = AppState::new_starting("routines-writer".to_string(), true);
6818        writer.routines_path = routines_path.clone();
6819        writer
6820            .put_routine(RoutineSpec {
6821                routine_id: "automation-guarded".to_string(),
6822                name: "Guarded Automation".to_string(),
6823                status: RoutineStatus::Active,
6824                schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
6825                timezone: "UTC".to_string(),
6826                misfire_policy: RoutineMisfirePolicy::RunOnce,
6827                entrypoint: "mission.default".to_string(),
6828                args: serde_json::json!({
6829                    "prompt": "Keep this saved across restart"
6830                }),
6831                allowed_tools: vec!["read".to_string()],
6832                output_targets: vec![],
6833                creator_type: "user".to_string(),
6834                creator_id: "user-1".to_string(),
6835                requires_approval: false,
6836                external_integrations_allowed: false,
6837                next_fire_at_ms: Some(5_000),
6838                last_fired_at_ms: None,
6839            })
6840            .await
6841            .expect("persist baseline routine");
6842
6843        let mut empty_state = AppState::new_starting("routines-empty".to_string(), true);
6844        empty_state.routines_path = routines_path.clone();
6845        let persist = empty_state.persist_routines().await;
6846        assert!(
6847            persist.is_err(),
6848            "empty state should not overwrite existing routines store"
6849        );
6850
6851        let raw = tokio::fs::read_to_string(&routines_path)
6852            .await
6853            .expect("read guarded routines file");
6854        let parsed: std::collections::HashMap<String, RoutineSpec> =
6855            serde_json::from_str(&raw).expect("parse guarded routines file");
6856        assert!(parsed.contains_key("automation-guarded"));
6857
6858        let _ = tokio::fs::remove_file(routines_path.clone()).await;
6859        let _ = tokio::fs::remove_file(sibling_backup_path(&routines_path)).await;
6860    }
6861
6862    #[tokio::test]
6863    async fn load_routines_recovers_from_backup_when_primary_corrupt() {
6864        let routines_path = tmp_routines_file("backup-recovery");
6865        let backup_path = sibling_backup_path(&routines_path);
6866        let mut state = AppState::new_starting("routines-backup-recovery".to_string(), true);
6867        state.routines_path = routines_path.clone();
6868
6869        let primary = "{ not valid json";
6870        tokio::fs::write(&routines_path, primary)
6871            .await
6872            .expect("write corrupt primary");
6873        let backup = serde_json::json!({
6874            "routine-1": {
6875                "routine_id": "routine-1",
6876                "name": "Recovered",
6877                "status": "active",
6878                "schedule": { "interval_seconds": { "seconds": 60 } },
6879                "timezone": "UTC",
6880                "misfire_policy": { "type": "run_once" },
6881                "entrypoint": "mission.default",
6882                "args": {},
6883                "allowed_tools": [],
6884                "output_targets": [],
6885                "creator_type": "user",
6886                "creator_id": "u-1",
6887                "requires_approval": true,
6888                "external_integrations_allowed": false,
6889                "next_fire_at_ms": null,
6890                "last_fired_at_ms": null
6891            }
6892        });
6893        tokio::fs::write(&backup_path, serde_json::to_string_pretty(&backup).unwrap())
6894            .await
6895            .expect("write backup");
6896
6897        state.load_routines().await.expect("load from backup");
6898        let list = state.list_routines().await;
6899        assert_eq!(list.len(), 1);
6900        assert_eq!(list[0].routine_id, "routine-1");
6901
6902        let _ = tokio::fs::remove_file(routines_path).await;
6903        let _ = tokio::fs::remove_file(backup_path).await;
6904    }
6905
6906    #[tokio::test]
6907    async fn evaluate_routine_misfires_respects_skip_run_once_and_catch_up() {
6908        let routines_path = tmp_routines_file("misfire-eval");
6909        let mut state = AppState::new_starting("routines-eval".to_string(), true);
6910        state.routines_path = routines_path.clone();
6911
6912        let base = |id: &str, policy: RoutineMisfirePolicy| RoutineSpec {
6913            routine_id: id.to_string(),
6914            name: id.to_string(),
6915            status: RoutineStatus::Active,
6916            schedule: RoutineSchedule::IntervalSeconds { seconds: 1 },
6917            timezone: "UTC".to_string(),
6918            misfire_policy: policy,
6919            entrypoint: "mission.default".to_string(),
6920            args: serde_json::json!({}),
6921            allowed_tools: vec![],
6922            output_targets: vec![],
6923            creator_type: "user".to_string(),
6924            creator_id: "u-1".to_string(),
6925            requires_approval: false,
6926            external_integrations_allowed: false,
6927            next_fire_at_ms: Some(5_000),
6928            last_fired_at_ms: None,
6929        };
6930
6931        state
6932            .put_routine(base("routine-skip", RoutineMisfirePolicy::Skip))
6933            .await
6934            .expect("put skip");
6935        state
6936            .put_routine(base("routine-once", RoutineMisfirePolicy::RunOnce))
6937            .await
6938            .expect("put once");
6939        state
6940            .put_routine(base(
6941                "routine-catch",
6942                RoutineMisfirePolicy::CatchUp { max_runs: 3 },
6943            ))
6944            .await
6945            .expect("put catch");
6946
6947        let plans = state.evaluate_routine_misfires(10_500).await;
6948        let plan_skip = plans.iter().find(|p| p.routine_id == "routine-skip");
6949        let plan_once = plans.iter().find(|p| p.routine_id == "routine-once");
6950        let plan_catch = plans.iter().find(|p| p.routine_id == "routine-catch");
6951
6952        assert!(plan_skip.is_none());
6953        assert_eq!(plan_once.map(|p| p.run_count), Some(1));
6954        assert_eq!(plan_catch.map(|p| p.run_count), Some(3));
6955
6956        let stored = state.list_routines().await;
6957        let skip_next = stored
6958            .iter()
6959            .find(|r| r.routine_id == "routine-skip")
6960            .and_then(|r| r.next_fire_at_ms)
6961            .expect("skip next");
6962        assert!(skip_next > 10_500);
6963
6964        let _ = tokio::fs::remove_file(routines_path).await;
6965    }
6966
6967    #[test]
6968    fn routine_policy_blocks_external_side_effects_by_default() {
6969        let routine = RoutineSpec {
6970            routine_id: "routine-policy-1".to_string(),
6971            name: "Connector routine".to_string(),
6972            status: RoutineStatus::Active,
6973            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
6974            timezone: "UTC".to_string(),
6975            misfire_policy: RoutineMisfirePolicy::RunOnce,
6976            entrypoint: "connector.email.reply".to_string(),
6977            args: serde_json::json!({}),
6978            allowed_tools: vec![],
6979            output_targets: vec![],
6980            creator_type: "user".to_string(),
6981            creator_id: "u-1".to_string(),
6982            requires_approval: true,
6983            external_integrations_allowed: false,
6984            next_fire_at_ms: None,
6985            last_fired_at_ms: None,
6986        };
6987
6988        let decision = evaluate_routine_execution_policy(&routine, "manual");
6989        assert!(matches!(decision, RoutineExecutionDecision::Blocked { .. }));
6990    }
6991
6992    #[test]
6993    fn routine_policy_requires_approval_for_external_side_effects_when_enabled() {
6994        let routine = RoutineSpec {
6995            routine_id: "routine-policy-2".to_string(),
6996            name: "Connector routine".to_string(),
6997            status: RoutineStatus::Active,
6998            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
6999            timezone: "UTC".to_string(),
7000            misfire_policy: RoutineMisfirePolicy::RunOnce,
7001            entrypoint: "connector.email.reply".to_string(),
7002            args: serde_json::json!({}),
7003            allowed_tools: vec![],
7004            output_targets: vec![],
7005            creator_type: "user".to_string(),
7006            creator_id: "u-1".to_string(),
7007            requires_approval: true,
7008            external_integrations_allowed: true,
7009            next_fire_at_ms: None,
7010            last_fired_at_ms: None,
7011        };
7012
7013        let decision = evaluate_routine_execution_policy(&routine, "manual");
7014        assert!(matches!(
7015            decision,
7016            RoutineExecutionDecision::RequiresApproval { .. }
7017        ));
7018    }
7019
7020    #[test]
7021    fn routine_policy_allows_non_external_entrypoints() {
7022        let routine = RoutineSpec {
7023            routine_id: "routine-policy-3".to_string(),
7024            name: "Internal mission routine".to_string(),
7025            status: RoutineStatus::Active,
7026            schedule: RoutineSchedule::IntervalSeconds { seconds: 60 },
7027            timezone: "UTC".to_string(),
7028            misfire_policy: RoutineMisfirePolicy::RunOnce,
7029            entrypoint: "mission.default".to_string(),
7030            args: serde_json::json!({}),
7031            allowed_tools: vec![],
7032            output_targets: vec![],
7033            creator_type: "user".to_string(),
7034            creator_id: "u-1".to_string(),
7035            requires_approval: true,
7036            external_integrations_allowed: false,
7037            next_fire_at_ms: None,
7038            last_fired_at_ms: None,
7039        };
7040
7041        let decision = evaluate_routine_execution_policy(&routine, "manual");
7042        assert_eq!(decision, RoutineExecutionDecision::Allowed);
7043    }
7044
7045    #[tokio::test]
7046    async fn claim_next_queued_routine_run_marks_oldest_running() {
7047        let mut state = AppState::new_starting("routine-claim".to_string(), true);
7048        state.routine_runs_path = tmp_routines_file("routine-claim-runs");
7049
7050        let mk = |run_id: &str, created_at_ms: u64| RoutineRunRecord {
7051            run_id: run_id.to_string(),
7052            routine_id: "routine-claim".to_string(),
7053            trigger_type: "manual".to_string(),
7054            run_count: 1,
7055            status: RoutineRunStatus::Queued,
7056            created_at_ms,
7057            updated_at_ms: created_at_ms,
7058            fired_at_ms: Some(created_at_ms),
7059            started_at_ms: None,
7060            finished_at_ms: None,
7061            requires_approval: false,
7062            approval_reason: None,
7063            denial_reason: None,
7064            paused_reason: None,
7065            detail: None,
7066            entrypoint: "mission.default".to_string(),
7067            args: serde_json::json!({}),
7068            allowed_tools: vec![],
7069            output_targets: vec![],
7070            artifacts: vec![],
7071            active_session_ids: vec![],
7072            latest_session_id: None,
7073            prompt_tokens: 0,
7074            completion_tokens: 0,
7075            total_tokens: 0,
7076            estimated_cost_usd: 0.0,
7077        };
7078
7079        {
7080            let mut guard = state.routine_runs.write().await;
7081            guard.insert("run-late".to_string(), mk("run-late", 2_000));
7082            guard.insert("run-early".to_string(), mk("run-early", 1_000));
7083        }
7084        state.persist_routine_runs().await.expect("persist");
7085
7086        let claimed = state
7087            .claim_next_queued_routine_run()
7088            .await
7089            .expect("claimed run");
7090        assert_eq!(claimed.run_id, "run-early");
7091        assert_eq!(claimed.status, RoutineRunStatus::Running);
7092        assert!(claimed.started_at_ms.is_some());
7093    }
7094
7095    #[tokio::test]
7096    async fn routine_session_policy_roundtrip_normalizes_tools() {
7097        let state = AppState::new_starting("routine-policy-hook".to_string(), true);
7098        state
7099            .set_routine_session_policy(
7100                "session-routine-1".to_string(),
7101                "run-1".to_string(),
7102                "routine-1".to_string(),
7103                vec![
7104                    "read".to_string(),
7105                    " mcp.arcade.search ".to_string(),
7106                    "read".to_string(),
7107                    "".to_string(),
7108                ],
7109            )
7110            .await;
7111
7112        let policy = state
7113            .routine_session_policy("session-routine-1")
7114            .await
7115            .expect("policy");
7116        assert_eq!(
7117            policy.allowed_tools,
7118            vec!["read".to_string(), "mcp.arcade.search".to_string()]
7119        );
7120    }
7121
7122    #[tokio::test]
7123    async fn routine_run_preserves_latest_session_id_after_session_clears() {
7124        let state = AppState::new_starting("routine-latest-session".to_string(), true);
7125        let routine = RoutineSpec {
7126            routine_id: "routine-session-link".to_string(),
7127            name: "Routine Session Link".to_string(),
7128            status: RoutineStatus::Active,
7129            schedule: RoutineSchedule::IntervalSeconds { seconds: 300 },
7130            timezone: "UTC".to_string(),
7131            misfire_policy: RoutineMisfirePolicy::Skip,
7132            entrypoint: "mission.default".to_string(),
7133            args: serde_json::json!({}),
7134            allowed_tools: vec![],
7135            output_targets: vec![],
7136            creator_type: "user".to_string(),
7137            creator_id: "test".to_string(),
7138            requires_approval: false,
7139            external_integrations_allowed: false,
7140            next_fire_at_ms: None,
7141            last_fired_at_ms: None,
7142        };
7143
7144        let run = state
7145            .create_routine_run(&routine, "manual", 1, RoutineRunStatus::Queued, None)
7146            .await;
7147        state
7148            .add_active_session_id(&run.run_id, "session-123".to_string())
7149            .await
7150            .expect("active session added");
7151        state
7152            .clear_active_session_id(&run.run_id, "session-123")
7153            .await
7154            .expect("active session cleared");
7155
7156        let updated = state
7157            .get_routine_run(&run.run_id)
7158            .await
7159            .expect("run exists");
7160        assert!(updated.active_session_ids.is_empty());
7161        assert_eq!(updated.latest_session_id.as_deref(), Some("session-123"));
7162    }
7163
7164    #[test]
7165    fn routine_mission_prompt_includes_orchestrated_contract() {
7166        let run = RoutineRunRecord {
7167            run_id: "run-orchestrated-1".to_string(),
7168            routine_id: "automation-orchestrated".to_string(),
7169            trigger_type: "manual".to_string(),
7170            run_count: 1,
7171            status: RoutineRunStatus::Queued,
7172            created_at_ms: 1_000,
7173            updated_at_ms: 1_000,
7174            fired_at_ms: Some(1_000),
7175            started_at_ms: None,
7176            finished_at_ms: None,
7177            requires_approval: true,
7178            approval_reason: None,
7179            denial_reason: None,
7180            paused_reason: None,
7181            detail: None,
7182            entrypoint: "mission.default".to_string(),
7183            args: serde_json::json!({
7184                "prompt": "Coordinate a multi-step release readiness check.",
7185                "mode": "orchestrated",
7186                "success_criteria": ["All blockers listed", "Output artifact written"],
7187                "orchestrator_only_tool_calls": true
7188            }),
7189            allowed_tools: vec!["read".to_string(), "webfetch".to_string()],
7190            output_targets: vec!["file://reports/release-readiness.md".to_string()],
7191            artifacts: vec![],
7192            active_session_ids: vec![],
7193            latest_session_id: None,
7194            prompt_tokens: 0,
7195            completion_tokens: 0,
7196            total_tokens: 0,
7197            estimated_cost_usd: 0.0,
7198        };
7199
7200        let objective = routine_objective_from_args(&run).expect("objective");
7201        let prompt = build_routine_mission_prompt(&run, &objective);
7202
7203        assert!(prompt.contains("Mode: orchestrated"));
7204        assert!(prompt.contains("Plan -> Do -> Verify -> Notify"));
7205        assert!(prompt.contains("only the orchestrator may execute tools"));
7206        assert!(prompt.contains("Allowed Tools: read, webfetch"));
7207        assert!(prompt.contains("file://reports/release-readiness.md"));
7208    }
7209
7210    #[test]
7211    fn routine_mission_prompt_includes_standalone_defaults() {
7212        let run = RoutineRunRecord {
7213            run_id: "run-standalone-1".to_string(),
7214            routine_id: "automation-standalone".to_string(),
7215            trigger_type: "manual".to_string(),
7216            run_count: 1,
7217            status: RoutineRunStatus::Queued,
7218            created_at_ms: 2_000,
7219            updated_at_ms: 2_000,
7220            fired_at_ms: Some(2_000),
7221            started_at_ms: None,
7222            finished_at_ms: None,
7223            requires_approval: false,
7224            approval_reason: None,
7225            denial_reason: None,
7226            paused_reason: None,
7227            detail: None,
7228            entrypoint: "mission.default".to_string(),
7229            args: serde_json::json!({
7230                "prompt": "Summarize top engineering updates.",
7231                "success_criteria": ["Three bullet summary"]
7232            }),
7233            allowed_tools: vec![],
7234            output_targets: vec![],
7235            artifacts: vec![],
7236            active_session_ids: vec![],
7237            latest_session_id: None,
7238            prompt_tokens: 0,
7239            completion_tokens: 0,
7240            total_tokens: 0,
7241            estimated_cost_usd: 0.0,
7242        };
7243
7244        let objective = routine_objective_from_args(&run).expect("objective");
7245        let prompt = build_routine_mission_prompt(&run, &objective);
7246
7247        assert!(prompt.contains("Mode: standalone"));
7248        assert!(prompt.contains("Execution Pattern: Standalone mission run"));
7249        assert!(prompt.contains("Allowed Tools: all available by current policy"));
7250        assert!(prompt.contains("Output Targets: none configured"));
7251    }
7252
7253    #[test]
7254    fn shared_resource_key_validator_accepts_swarm_active_tasks() {
7255        assert!(is_valid_resource_key("swarm.active_tasks"));
7256        assert!(is_valid_resource_key("project/demo"));
7257        assert!(!is_valid_resource_key("swarm//active_tasks"));
7258        assert!(!is_valid_resource_key("misc/demo"));
7259    }
7260}