Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
22use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
23use tandem_memory::db::MemoryDatabase;
24use tandem_providers::ChatMessage;
25use tandem_workflows::{
26    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
27    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
28    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
29};
30
31use crate::agent_teams::AgentTeamRuntime;
32use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
33use crate::automation_v2::types::*;
34use crate::bug_monitor::types::*;
35use crate::capability_resolver::CapabilityResolver;
36use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
37use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
38use crate::pack_manager::PackManager;
39use crate::preset_registry::PresetRegistry;
40use crate::routines::{errors::RoutineStoreError, types::*};
41use crate::runtime::{
42    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
43};
44use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
45use crate::util::{host::detect_host_runtime_context, time::now_ms};
46use crate::{
47    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
48    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
49    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
50    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
51    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
52    OptimizationPromotionDecisionKind,
53};
54
55#[derive(Clone)]
56pub struct AppState {
57    pub runtime: Arc<OnceLock<RuntimeState>>,
58    pub startup: Arc<RwLock<StartupState>>,
59    pub in_process_mode: Arc<AtomicBool>,
60    pub api_token: Arc<RwLock<Option<String>>>,
61    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
62    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
63    pub run_registry: RunRegistry,
64    pub run_stale_ms: u64,
65    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
66    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
67    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
68    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
69    pub shared_resources_path: PathBuf,
70    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
71    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
72    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
73    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
74    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
75    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
76    pub automation_scheduler_stopping: Arc<AtomicBool>,
77    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
78    pub workflow_plan_drafts:
79        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
80    pub optimization_campaigns:
81        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
82    pub optimization_experiments:
83        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
84    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
85    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
86    pub bug_monitor_incidents:
87        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
88    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
89    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
90    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
91    pub workflows: Arc<RwLock<WorkflowRegistry>>,
92    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
93    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
94    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
95    pub routine_session_policies:
96        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
97    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
98    pub token_cost_per_1k_usd: f64,
99    pub routines_path: PathBuf,
100    pub routine_history_path: PathBuf,
101    pub routine_runs_path: PathBuf,
102    pub automations_v2_path: PathBuf,
103    pub automation_v2_runs_path: PathBuf,
104    pub optimization_campaigns_path: PathBuf,
105    pub optimization_experiments_path: PathBuf,
106    pub bug_monitor_config_path: PathBuf,
107    pub bug_monitor_drafts_path: PathBuf,
108    pub bug_monitor_incidents_path: PathBuf,
109    pub bug_monitor_posts_path: PathBuf,
110    pub external_actions_path: PathBuf,
111    pub workflow_runs_path: PathBuf,
112    pub workflow_hook_overrides_path: PathBuf,
113    pub agent_teams: AgentTeamRuntime,
114    pub web_ui_enabled: Arc<AtomicBool>,
115    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
116    pub server_base_url: Arc<std::sync::RwLock<String>>,
117    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
118    pub host_runtime_context: HostRuntimeContext,
119    pub pack_manager: Arc<PackManager>,
120    pub capability_resolver: Arc<CapabilityResolver>,
121    pub preset_registry: Arc<PresetRegistry>,
122}
123#[derive(Debug, Clone, Serialize, Deserialize, Default)]
124pub struct ChannelStatus {
125    pub enabled: bool,
126    pub connected: bool,
127    pub last_error: Option<String>,
128    pub active_sessions: u64,
129    pub meta: Value,
130}
131#[derive(Debug, Clone, Serialize, Deserialize, Default)]
132struct EffectiveAppConfig {
133    #[serde(default)]
134    pub channels: ChannelsConfigFile,
135    #[serde(default)]
136    pub web_ui: WebUiConfig,
137    #[serde(default)]
138    pub browser: tandem_core::BrowserConfig,
139    #[serde(default)]
140    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
141}
142
143#[derive(Default)]
144pub struct ChannelRuntime {
145    pub listeners: Option<tokio::task::JoinSet<()>>,
146    pub statuses: std::collections::HashMap<String, ChannelStatus>,
147}
148
149#[derive(Debug, Clone)]
150pub struct StatusIndexUpdate {
151    pub key: String,
152    pub value: Value,
153}
154
155impl AppState {
156    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
157        Self {
158            runtime: Arc::new(OnceLock::new()),
159            startup: Arc::new(RwLock::new(StartupState {
160                status: StartupStatus::Starting,
161                phase: "boot".to_string(),
162                started_at_ms: now_ms(),
163                attempt_id,
164                last_error: None,
165            })),
166            in_process_mode: Arc::new(AtomicBool::new(in_process)),
167            api_token: Arc::new(RwLock::new(None)),
168            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
169            managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
170            run_registry: RunRegistry::new(),
171            run_stale_ms: config::env::resolve_run_stale_ms(),
172            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
173            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
174            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
175            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
176            shared_resources_path: config::paths::resolve_shared_resources_path(),
177            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
178            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
179            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
180            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
181            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
182            automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
183                config::env::resolve_scheduler_max_concurrent_runs(),
184            ))),
185            automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
186            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
187            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
188            optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
189            optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
190            bug_monitor_config: Arc::new(
191                RwLock::new(config::env::resolve_bug_monitor_env_config()),
192            ),
193            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
194            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
195            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
196            external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
197            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
198            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
199            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
200            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
201            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
202            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
203            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
204            routines_path: config::paths::resolve_routines_path(),
205            routine_history_path: config::paths::resolve_routine_history_path(),
206            routine_runs_path: config::paths::resolve_routine_runs_path(),
207            automations_v2_path: config::paths::resolve_automations_v2_path(),
208            automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
209            optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
210            optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
211            bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
212            bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
213            bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
214            bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
215            external_actions_path: config::paths::resolve_external_actions_path(),
216            workflow_runs_path: config::paths::resolve_workflow_runs_path(),
217            workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
218            agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
219            web_ui_enabled: Arc::new(AtomicBool::new(false)),
220            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
221            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
222            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
223            host_runtime_context: detect_host_runtime_context(),
224            token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
225            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
226            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
227            preset_registry: Arc::new(PresetRegistry::new(
228                PackManager::default_root(),
229                resolve_shared_paths()
230                    .map(|paths| paths.canonical_root)
231                    .unwrap_or_else(|_| {
232                        dirs::home_dir()
233                            .unwrap_or_else(|| PathBuf::from("."))
234                            .join(".tandem")
235                    }),
236            )),
237        }
238    }
239
240    pub fn is_ready(&self) -> bool {
241        self.runtime.get().is_some()
242    }
243
244    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
245        for _ in 0..attempts {
246            if self.is_ready() {
247                return true;
248            }
249            let startup = self.startup_snapshot().await;
250            if matches!(startup.status, StartupStatus::Failed) {
251                return false;
252            }
253            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
254        }
255        self.is_ready()
256    }
257
258    pub fn mode_label(&self) -> &'static str {
259        if self.in_process_mode.load(Ordering::Relaxed) {
260            "in-process"
261        } else {
262            "sidecar"
263        }
264    }
265
266    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
267        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
268        if let Ok(mut guard) = self.web_ui_prefix.write() {
269            *guard = config::webui::normalize_web_ui_prefix(&prefix);
270        }
271    }
272
273    pub fn web_ui_enabled(&self) -> bool {
274        self.web_ui_enabled.load(Ordering::Relaxed)
275    }
276
277    pub fn web_ui_prefix(&self) -> String {
278        self.web_ui_prefix
279            .read()
280            .map(|v| v.clone())
281            .unwrap_or_else(|_| "/admin".to_string())
282    }
283
284    pub fn set_server_base_url(&self, base_url: String) {
285        if let Ok(mut guard) = self.server_base_url.write() {
286            *guard = base_url;
287        }
288    }
289
290    pub fn server_base_url(&self) -> String {
291        self.server_base_url
292            .read()
293            .map(|v| v.clone())
294            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
295    }
296
297    pub async fn api_token(&self) -> Option<String> {
298        self.api_token.read().await.clone()
299    }
300
301    pub async fn set_api_token(&self, token: Option<String>) {
302        *self.api_token.write().await = token;
303    }
304
305    pub async fn startup_snapshot(&self) -> StartupSnapshot {
306        let state = self.startup.read().await.clone();
307        StartupSnapshot {
308            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
309            status: state.status,
310            phase: state.phase,
311            started_at_ms: state.started_at_ms,
312            attempt_id: state.attempt_id,
313            last_error: state.last_error,
314        }
315    }
316
317    pub fn host_runtime_context(&self) -> HostRuntimeContext {
318        self.runtime
319            .get()
320            .map(|runtime| runtime.host_runtime_context.clone())
321            .unwrap_or_else(|| self.host_runtime_context.clone())
322    }
323
324    pub async fn set_phase(&self, phase: impl Into<String>) {
325        let mut startup = self.startup.write().await;
326        startup.phase = phase.into();
327    }
328
329    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
330        self.runtime
331            .set(runtime)
332            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
333        #[cfg(feature = "browser")]
334        self.register_browser_tools().await?;
335        self.tools
336            .register_tool(
337                "pack_builder".to_string(),
338                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
339            )
340            .await;
341        self.tools
342            .register_tool(
343                "mcp_list".to_string(),
344                Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
345            )
346            .await;
347        self.engine_loop
348            .set_spawn_agent_hook(std::sync::Arc::new(
349                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
350            ))
351            .await;
352        self.engine_loop
353            .set_tool_policy_hook(std::sync::Arc::new(
354                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
355            ))
356            .await;
357        self.engine_loop
358            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
359                self.clone(),
360            )))
361            .await;
362        let _ = self.load_shared_resources().await;
363        self.load_routines().await?;
364        let _ = self.load_routine_history().await;
365        let _ = self.load_routine_runs().await;
366        self.load_automations_v2().await?;
367        let _ = self.load_automation_v2_runs().await;
368        let _ = self.load_optimization_campaigns().await;
369        let _ = self.load_optimization_experiments().await;
370        let _ = self.load_bug_monitor_config().await;
371        let _ = self.load_bug_monitor_drafts().await;
372        let _ = self.load_bug_monitor_incidents().await;
373        let _ = self.load_bug_monitor_posts().await;
374        let _ = self.load_external_actions().await;
375        let _ = self.load_workflow_runs().await;
376        let _ = self.load_workflow_hook_overrides().await;
377        let _ = self.reload_workflows().await;
378        let workspace_root = self.workspace_index.snapshot().await.root;
379        let _ = self
380            .agent_teams
381            .ensure_loaded_for_workspace(&workspace_root)
382            .await;
383        let mut startup = self.startup.write().await;
384        startup.status = StartupStatus::Ready;
385        startup.phase = "ready".to_string();
386        startup.last_error = None;
387        Ok(())
388    }
389
390    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
391        let mut startup = self.startup.write().await;
392        startup.status = StartupStatus::Failed;
393        startup.phase = phase.into();
394        startup.last_error = Some(error.into());
395    }
396
397    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
398        let runtime = self.channels_runtime.lock().await;
399        runtime.statuses.clone()
400    }
401
402    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
403        let effective = self.config.get_effective_value().await;
404        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
405        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
406
407        let mut runtime = self.channels_runtime.lock().await;
408        if let Some(listeners) = runtime.listeners.as_mut() {
409            listeners.abort_all();
410        }
411        runtime.listeners = None;
412        runtime.statuses.clear();
413
414        let mut status_map = std::collections::HashMap::new();
415        status_map.insert(
416            "telegram".to_string(),
417            ChannelStatus {
418                enabled: parsed.channels.telegram.is_some(),
419                connected: false,
420                last_error: None,
421                active_sessions: 0,
422                meta: serde_json::json!({}),
423            },
424        );
425        status_map.insert(
426            "discord".to_string(),
427            ChannelStatus {
428                enabled: parsed.channels.discord.is_some(),
429                connected: false,
430                last_error: None,
431                active_sessions: 0,
432                meta: serde_json::json!({}),
433            },
434        );
435        status_map.insert(
436            "slack".to_string(),
437            ChannelStatus {
438                enabled: parsed.channels.slack.is_some(),
439                connected: false,
440                last_error: None,
441                active_sessions: 0,
442                meta: serde_json::json!({}),
443            },
444        );
445
446        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
447            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
448            runtime.listeners = Some(listeners);
449            for status in status_map.values_mut() {
450                if status.enabled {
451                    status.connected = true;
452                }
453            }
454        }
455
456        runtime.statuses = status_map.clone();
457        drop(runtime);
458
459        self.event_bus.publish(EngineEvent::new(
460            "channel.status.changed",
461            serde_json::json!({ "channels": status_map }),
462        ));
463        Ok(())
464    }
465
466    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
467        if !self.shared_resources_path.exists() {
468            return Ok(());
469        }
470        let raw = fs::read_to_string(&self.shared_resources_path).await?;
471        let parsed =
472            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
473                .unwrap_or_default();
474        let mut guard = self.shared_resources.write().await;
475        *guard = parsed;
476        Ok(())
477    }
478
479    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
480        if let Some(parent) = self.shared_resources_path.parent() {
481            fs::create_dir_all(parent).await?;
482        }
483        let payload = {
484            let guard = self.shared_resources.read().await;
485            serde_json::to_string_pretty(&*guard)?
486        };
487        fs::write(&self.shared_resources_path, payload).await?;
488        Ok(())
489    }
490
491    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
492        self.shared_resources.read().await.get(key).cloned()
493    }
494
495    pub async fn list_shared_resources(
496        &self,
497        prefix: Option<&str>,
498        limit: usize,
499    ) -> Vec<SharedResourceRecord> {
500        let limit = limit.clamp(1, 500);
501        let mut rows = self
502            .shared_resources
503            .read()
504            .await
505            .values()
506            .filter(|record| {
507                if let Some(prefix) = prefix {
508                    record.key.starts_with(prefix)
509                } else {
510                    true
511                }
512            })
513            .cloned()
514            .collect::<Vec<_>>();
515        rows.sort_by(|a, b| a.key.cmp(&b.key));
516        rows.truncate(limit);
517        rows
518    }
519
520    pub async fn put_shared_resource(
521        &self,
522        key: String,
523        value: Value,
524        if_match_rev: Option<u64>,
525        updated_by: String,
526        ttl_ms: Option<u64>,
527    ) -> Result<SharedResourceRecord, ResourceStoreError> {
528        if !is_valid_resource_key(&key) {
529            return Err(ResourceStoreError::InvalidKey { key });
530        }
531
532        let now = now_ms();
533        let mut guard = self.shared_resources.write().await;
534        let existing = guard.get(&key).cloned();
535
536        if let Some(expected) = if_match_rev {
537            let current = existing.as_ref().map(|row| row.rev);
538            if current != Some(expected) {
539                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
540                    key,
541                    expected_rev: Some(expected),
542                    current_rev: current,
543                }));
544            }
545        }
546
547        let next_rev = existing
548            .as_ref()
549            .map(|row| row.rev.saturating_add(1))
550            .unwrap_or(1);
551
552        let record = SharedResourceRecord {
553            key: key.clone(),
554            value,
555            rev: next_rev,
556            updated_at_ms: now,
557            updated_by,
558            ttl_ms,
559        };
560
561        let previous = guard.insert(key.clone(), record.clone());
562        drop(guard);
563
564        if let Err(error) = self.persist_shared_resources().await {
565            let mut rollback = self.shared_resources.write().await;
566            if let Some(previous) = previous {
567                rollback.insert(key, previous);
568            } else {
569                rollback.remove(&key);
570            }
571            return Err(ResourceStoreError::PersistFailed {
572                message: error.to_string(),
573            });
574        }
575
576        Ok(record)
577    }
578
579    pub async fn delete_shared_resource(
580        &self,
581        key: &str,
582        if_match_rev: Option<u64>,
583    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
584        if !is_valid_resource_key(key) {
585            return Err(ResourceStoreError::InvalidKey {
586                key: key.to_string(),
587            });
588        }
589
590        let mut guard = self.shared_resources.write().await;
591        let current = guard.get(key).cloned();
592        if let Some(expected) = if_match_rev {
593            let current_rev = current.as_ref().map(|row| row.rev);
594            if current_rev != Some(expected) {
595                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
596                    key: key.to_string(),
597                    expected_rev: Some(expected),
598                    current_rev,
599                }));
600            }
601        }
602
603        let removed = guard.remove(key);
604        drop(guard);
605
606        if let Err(error) = self.persist_shared_resources().await {
607            if let Some(record) = removed.clone() {
608                self.shared_resources
609                    .write()
610                    .await
611                    .insert(record.key.clone(), record);
612            }
613            return Err(ResourceStoreError::PersistFailed {
614                message: error.to_string(),
615            });
616        }
617
618        Ok(removed)
619    }
620
621    pub async fn load_routines(&self) -> anyhow::Result<()> {
622        if !self.routines_path.exists() {
623            return Ok(());
624        }
625        let raw = fs::read_to_string(&self.routines_path).await?;
626        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
627            Ok(parsed) => {
628                let mut guard = self.routines.write().await;
629                *guard = parsed;
630                Ok(())
631            }
632            Err(primary_err) => {
633                let backup_path = config::paths::sibling_backup_path(&self.routines_path);
634                if backup_path.exists() {
635                    let backup_raw = fs::read_to_string(&backup_path).await?;
636                    if let Ok(parsed_backup) = serde_json::from_str::<
637                        std::collections::HashMap<String, RoutineSpec>,
638                    >(&backup_raw)
639                    {
640                        let mut guard = self.routines.write().await;
641                        *guard = parsed_backup;
642                        return Ok(());
643                    }
644                }
645                Err(anyhow::anyhow!(
646                    "failed to parse routines store {}: {primary_err}",
647                    self.routines_path.display()
648                ))
649            }
650        }
651    }
652
653    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
654        if !self.routine_history_path.exists() {
655            return Ok(());
656        }
657        let raw = fs::read_to_string(&self.routine_history_path).await?;
658        let parsed = serde_json::from_str::<
659            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
660        >(&raw)
661        .unwrap_or_default();
662        let mut guard = self.routine_history.write().await;
663        *guard = parsed;
664        Ok(())
665    }
666
667    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
668        if !self.routine_runs_path.exists() {
669            return Ok(());
670        }
671        let raw = fs::read_to_string(&self.routine_runs_path).await?;
672        let parsed =
673            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
674                .unwrap_or_default();
675        let mut guard = self.routine_runs.write().await;
676        *guard = parsed;
677        Ok(())
678    }
679
680    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
681        if let Some(parent) = self.routines_path.parent() {
682            fs::create_dir_all(parent).await?;
683        }
684        let (payload, is_empty) = {
685            let guard = self.routines.read().await;
686            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
687        };
688        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
689            let existing_raw = fs::read_to_string(&self.routines_path)
690                .await
691                .unwrap_or_default();
692            let existing_has_rows = serde_json::from_str::<
693                std::collections::HashMap<String, RoutineSpec>,
694            >(&existing_raw)
695            .map(|rows| !rows.is_empty())
696            .unwrap_or(true);
697            if existing_has_rows {
698                return Err(anyhow::anyhow!(
699                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
700                    self.routines_path.display()
701                ));
702            }
703        }
704        let backup_path = config::paths::sibling_backup_path(&self.routines_path);
705        if self.routines_path.exists() {
706            let _ = fs::copy(&self.routines_path, &backup_path).await;
707        }
708        let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
709        fs::write(&tmp_path, payload).await?;
710        fs::rename(&tmp_path, &self.routines_path).await?;
711        Ok(())
712    }
713
714    pub async fn persist_routines(&self) -> anyhow::Result<()> {
715        self.persist_routines_inner(false).await
716    }
717
718    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
719        if let Some(parent) = self.routine_history_path.parent() {
720            fs::create_dir_all(parent).await?;
721        }
722        let payload = {
723            let guard = self.routine_history.read().await;
724            serde_json::to_string_pretty(&*guard)?
725        };
726        fs::write(&self.routine_history_path, payload).await?;
727        Ok(())
728    }
729
730    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
731        if let Some(parent) = self.routine_runs_path.parent() {
732            fs::create_dir_all(parent).await?;
733        }
734        let payload = {
735            let guard = self.routine_runs.read().await;
736            serde_json::to_string_pretty(&*guard)?
737        };
738        fs::write(&self.routine_runs_path, payload).await?;
739        Ok(())
740    }
741
742    pub async fn put_routine(
743        &self,
744        mut routine: RoutineSpec,
745    ) -> Result<RoutineSpec, RoutineStoreError> {
746        if routine.routine_id.trim().is_empty() {
747            return Err(RoutineStoreError::InvalidRoutineId {
748                routine_id: routine.routine_id,
749            });
750        }
751
752        routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
753        routine.output_targets = normalize_non_empty_list(routine.output_targets);
754
755        let now = now_ms();
756        let next_schedule_fire =
757            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
758                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
759                    detail: "invalid schedule or timezone".to_string(),
760                })?;
761        match routine.schedule {
762            RoutineSchedule::IntervalSeconds { seconds } => {
763                if seconds == 0 {
764                    return Err(RoutineStoreError::InvalidSchedule {
765                        detail: "interval_seconds must be > 0".to_string(),
766                    });
767                }
768                let _ = seconds;
769            }
770            RoutineSchedule::Cron { .. } => {}
771        }
772        if routine.next_fire_at_ms.is_none() {
773            routine.next_fire_at_ms = Some(next_schedule_fire);
774        }
775
776        let mut guard = self.routines.write().await;
777        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
778        drop(guard);
779
780        if let Err(error) = self.persist_routines().await {
781            let mut rollback = self.routines.write().await;
782            if let Some(previous) = previous {
783                rollback.insert(previous.routine_id.clone(), previous);
784            } else {
785                rollback.remove(&routine.routine_id);
786            }
787            return Err(RoutineStoreError::PersistFailed {
788                message: error.to_string(),
789            });
790        }
791
792        Ok(routine)
793    }
794
795    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
796        let mut rows = self
797            .routines
798            .read()
799            .await
800            .values()
801            .cloned()
802            .collect::<Vec<_>>();
803        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
804        rows
805    }
806
807    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
808        self.routines.read().await.get(routine_id).cloned()
809    }
810
811    pub async fn delete_routine(
812        &self,
813        routine_id: &str,
814    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
815        let mut guard = self.routines.write().await;
816        let removed = guard.remove(routine_id);
817        drop(guard);
818
819        let allow_empty_overwrite = self.routines.read().await.is_empty();
820        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
821            if let Some(removed) = removed.clone() {
822                self.routines
823                    .write()
824                    .await
825                    .insert(removed.routine_id.clone(), removed);
826            }
827            return Err(RoutineStoreError::PersistFailed {
828                message: error.to_string(),
829            });
830        }
831        Ok(removed)
832    }
833
834    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
835        let mut plans = Vec::new();
836        let mut guard = self.routines.write().await;
837        for routine in guard.values_mut() {
838            if routine.status != RoutineStatus::Active {
839                continue;
840            }
841            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
842                continue;
843            };
844            if now_ms < next_fire_at_ms {
845                continue;
846            }
847            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
848                now_ms,
849                next_fire_at_ms,
850                &routine.schedule,
851                &routine.timezone,
852                &routine.misfire_policy,
853            );
854            routine.next_fire_at_ms = Some(next_fire_at_ms);
855            if run_count == 0 {
856                continue;
857            }
858            plans.push(RoutineTriggerPlan {
859                routine_id: routine.routine_id.clone(),
860                run_count,
861                scheduled_at_ms: now_ms,
862                next_fire_at_ms,
863            });
864        }
865        drop(guard);
866        let _ = self.persist_routines().await;
867        plans
868    }
869
870    pub async fn mark_routine_fired(
871        &self,
872        routine_id: &str,
873        fired_at_ms: u64,
874    ) -> Option<RoutineSpec> {
875        let mut guard = self.routines.write().await;
876        let routine = guard.get_mut(routine_id)?;
877        routine.last_fired_at_ms = Some(fired_at_ms);
878        let updated = routine.clone();
879        drop(guard);
880        let _ = self.persist_routines().await;
881        Some(updated)
882    }
883
884    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
885        let mut history = self.routine_history.write().await;
886        history
887            .entry(event.routine_id.clone())
888            .or_default()
889            .push(event);
890        drop(history);
891        let _ = self.persist_routine_history().await;
892    }
893
894    pub async fn list_routine_history(
895        &self,
896        routine_id: &str,
897        limit: usize,
898    ) -> Vec<RoutineHistoryEvent> {
899        let limit = limit.clamp(1, 500);
900        let mut rows = self
901            .routine_history
902            .read()
903            .await
904            .get(routine_id)
905            .cloned()
906            .unwrap_or_default();
907        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
908        rows.truncate(limit);
909        rows
910    }
911
912    pub async fn create_routine_run(
913        &self,
914        routine: &RoutineSpec,
915        trigger_type: &str,
916        run_count: u32,
917        status: RoutineRunStatus,
918        detail: Option<String>,
919    ) -> RoutineRunRecord {
920        let now = now_ms();
921        let record = RoutineRunRecord {
922            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
923            routine_id: routine.routine_id.clone(),
924            trigger_type: trigger_type.to_string(),
925            run_count,
926            status,
927            created_at_ms: now,
928            updated_at_ms: now,
929            fired_at_ms: Some(now),
930            started_at_ms: None,
931            finished_at_ms: None,
932            requires_approval: routine.requires_approval,
933            approval_reason: None,
934            denial_reason: None,
935            paused_reason: None,
936            detail,
937            entrypoint: routine.entrypoint.clone(),
938            args: routine.args.clone(),
939            allowed_tools: routine.allowed_tools.clone(),
940            output_targets: routine.output_targets.clone(),
941            artifacts: Vec::new(),
942            active_session_ids: Vec::new(),
943            latest_session_id: None,
944            prompt_tokens: 0,
945            completion_tokens: 0,
946            total_tokens: 0,
947            estimated_cost_usd: 0.0,
948        };
949        self.routine_runs
950            .write()
951            .await
952            .insert(record.run_id.clone(), record.clone());
953        let _ = self.persist_routine_runs().await;
954        record
955    }
956
957    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
958        self.routine_runs.read().await.get(run_id).cloned()
959    }
960
961    pub async fn list_routine_runs(
962        &self,
963        routine_id: Option<&str>,
964        limit: usize,
965    ) -> Vec<RoutineRunRecord> {
966        let mut rows = self
967            .routine_runs
968            .read()
969            .await
970            .values()
971            .filter(|row| {
972                if let Some(id) = routine_id {
973                    row.routine_id == id
974                } else {
975                    true
976                }
977            })
978            .cloned()
979            .collect::<Vec<_>>();
980        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
981        rows.truncate(limit.clamp(1, 500));
982        rows
983    }
984
985    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
986        let mut guard = self.routine_runs.write().await;
987        let next_run_id = guard
988            .values()
989            .filter(|row| row.status == RoutineRunStatus::Queued)
990            .min_by(|a, b| {
991                a.created_at_ms
992                    .cmp(&b.created_at_ms)
993                    .then_with(|| a.run_id.cmp(&b.run_id))
994            })
995            .map(|row| row.run_id.clone())?;
996        let now = now_ms();
997        let row = guard.get_mut(&next_run_id)?;
998        row.status = RoutineRunStatus::Running;
999        row.updated_at_ms = now;
1000        row.started_at_ms = Some(now);
1001        let claimed = row.clone();
1002        drop(guard);
1003        let _ = self.persist_routine_runs().await;
1004        Some(claimed)
1005    }
1006
1007    pub async fn set_routine_session_policy(
1008        &self,
1009        session_id: String,
1010        run_id: String,
1011        routine_id: String,
1012        allowed_tools: Vec<String>,
1013    ) {
1014        let policy = RoutineSessionPolicy {
1015            session_id: session_id.clone(),
1016            run_id,
1017            routine_id,
1018            allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1019        };
1020        self.routine_session_policies
1021            .write()
1022            .await
1023            .insert(session_id, policy);
1024    }
1025
1026    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1027        self.routine_session_policies
1028            .read()
1029            .await
1030            .get(session_id)
1031            .cloned()
1032    }
1033
1034    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1035        self.routine_session_policies
1036            .write()
1037            .await
1038            .remove(session_id);
1039    }
1040
1041    pub async fn update_routine_run_status(
1042        &self,
1043        run_id: &str,
1044        status: RoutineRunStatus,
1045        reason: Option<String>,
1046    ) -> Option<RoutineRunRecord> {
1047        let mut guard = self.routine_runs.write().await;
1048        let row = guard.get_mut(run_id)?;
1049        row.status = status.clone();
1050        row.updated_at_ms = now_ms();
1051        match status {
1052            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1053            RoutineRunStatus::Running => {
1054                row.started_at_ms.get_or_insert_with(now_ms);
1055                if let Some(detail) = reason {
1056                    row.detail = Some(detail);
1057                }
1058            }
1059            RoutineRunStatus::Denied => row.denial_reason = reason,
1060            RoutineRunStatus::Paused => row.paused_reason = reason,
1061            RoutineRunStatus::Completed
1062            | RoutineRunStatus::Failed
1063            | RoutineRunStatus::Cancelled => {
1064                row.finished_at_ms = Some(now_ms());
1065                if let Some(detail) = reason {
1066                    row.detail = Some(detail);
1067                }
1068            }
1069            _ => {
1070                if let Some(detail) = reason {
1071                    row.detail = Some(detail);
1072                }
1073            }
1074        }
1075        let updated = row.clone();
1076        drop(guard);
1077        let _ = self.persist_routine_runs().await;
1078        Some(updated)
1079    }
1080
1081    pub async fn append_routine_run_artifact(
1082        &self,
1083        run_id: &str,
1084        artifact: RoutineRunArtifact,
1085    ) -> Option<RoutineRunRecord> {
1086        let mut guard = self.routine_runs.write().await;
1087        let row = guard.get_mut(run_id)?;
1088        row.updated_at_ms = now_ms();
1089        row.artifacts.push(artifact);
1090        let updated = row.clone();
1091        drop(guard);
1092        let _ = self.persist_routine_runs().await;
1093        Some(updated)
1094    }
1095
1096    pub async fn add_active_session_id(
1097        &self,
1098        run_id: &str,
1099        session_id: String,
1100    ) -> Option<RoutineRunRecord> {
1101        let mut guard = self.routine_runs.write().await;
1102        let row = guard.get_mut(run_id)?;
1103        if !row.active_session_ids.iter().any(|id| id == &session_id) {
1104            row.active_session_ids.push(session_id);
1105        }
1106        row.latest_session_id = row.active_session_ids.last().cloned();
1107        row.updated_at_ms = now_ms();
1108        let updated = row.clone();
1109        drop(guard);
1110        let _ = self.persist_routine_runs().await;
1111        Some(updated)
1112    }
1113
1114    pub async fn clear_active_session_id(
1115        &self,
1116        run_id: &str,
1117        session_id: &str,
1118    ) -> Option<RoutineRunRecord> {
1119        let mut guard = self.routine_runs.write().await;
1120        let row = guard.get_mut(run_id)?;
1121        row.active_session_ids.retain(|id| id != session_id);
1122        row.updated_at_ms = now_ms();
1123        let updated = row.clone();
1124        drop(guard);
1125        let _ = self.persist_routine_runs().await;
1126        Some(updated)
1127    }
1128
1129    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1130        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1131        let mut loaded_from_alternate = false;
1132        let mut migrated = false;
1133        let mut path_counts = Vec::new();
1134        let mut canonical_loaded = false;
1135        if self.automations_v2_path.exists() {
1136            let raw = fs::read_to_string(&self.automations_v2_path).await?;
1137            if raw.trim().is_empty() || raw.trim() == "{}" {
1138                path_counts.push((self.automations_v2_path.clone(), 0usize));
1139            } else {
1140                let parsed = parse_automation_v2_file(&raw);
1141                path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1142                canonical_loaded = !parsed.is_empty();
1143                merged = parsed;
1144            }
1145        } else {
1146            path_counts.push((self.automations_v2_path.clone(), 0usize));
1147        }
1148        if !canonical_loaded {
1149            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1150                if path == self.automations_v2_path {
1151                    continue;
1152                }
1153                if !path.exists() {
1154                    path_counts.push((path, 0usize));
1155                    continue;
1156                }
1157                let raw = fs::read_to_string(&path).await?;
1158                if raw.trim().is_empty() || raw.trim() == "{}" {
1159                    path_counts.push((path, 0usize));
1160                    continue;
1161                }
1162                let parsed = parse_automation_v2_file(&raw);
1163                path_counts.push((path.clone(), parsed.len()));
1164                if !parsed.is_empty() {
1165                    loaded_from_alternate = true;
1166                }
1167                for (automation_id, automation) in parsed {
1168                    match merged.get(&automation_id) {
1169                        Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1170                        _ => {
1171                            merged.insert(automation_id, automation);
1172                        }
1173                    }
1174                }
1175            }
1176        } else {
1177            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1178                if path == self.automations_v2_path {
1179                    continue;
1180                }
1181                if !path.exists() {
1182                    path_counts.push((path, 0usize));
1183                    continue;
1184                }
1185                let raw = fs::read_to_string(&path).await?;
1186                let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1187                    0usize
1188                } else {
1189                    parse_automation_v2_file(&raw).len()
1190                };
1191                path_counts.push((path, count));
1192            }
1193        }
1194        let active_path = self.automations_v2_path.display().to_string();
1195        let path_count_summary = path_counts
1196            .iter()
1197            .map(|(path, count)| format!("{}={count}", path.display()))
1198            .collect::<Vec<_>>();
1199        tracing::info!(
1200            active_path,
1201            canonical_loaded,
1202            path_counts = ?path_count_summary,
1203            merged_count = merged.len(),
1204            "loaded automation v2 definitions"
1205        );
1206        for automation in merged.values_mut() {
1207            migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1208        }
1209        *self.automations_v2.write().await = merged;
1210        if loaded_from_alternate || migrated {
1211            let _ = self.persist_automations_v2().await;
1212        } else if canonical_loaded {
1213            let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1214        }
1215        Ok(())
1216    }
1217
1218    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1219        let payload = {
1220            let guard = self.automations_v2.read().await;
1221            serde_json::to_string_pretty(&*guard)?
1222        };
1223        if let Some(parent) = self.automations_v2_path.parent() {
1224            fs::create_dir_all(parent).await?;
1225        }
1226        fs::write(&self.automations_v2_path, &payload).await?;
1227        let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1228        Ok(())
1229    }
1230
1231    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1232        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1233        let mut loaded_from_alternate = false;
1234        let mut path_counts = Vec::new();
1235        for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1236            if !path.exists() {
1237                path_counts.push((path, 0usize));
1238                continue;
1239            }
1240            let raw = fs::read_to_string(&path).await?;
1241            if raw.trim().is_empty() || raw.trim() == "{}" {
1242                path_counts.push((path, 0usize));
1243                continue;
1244            }
1245            let parsed = parse_automation_v2_runs_file(&raw);
1246            path_counts.push((path.clone(), parsed.len()));
1247            if path != self.automation_v2_runs_path {
1248                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1249            }
1250            for (run_id, run) in parsed {
1251                match merged.get(&run_id) {
1252                    Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1253                    _ => {
1254                        merged.insert(run_id, run);
1255                    }
1256                }
1257            }
1258        }
1259        let active_runs_path = self.automation_v2_runs_path.display().to_string();
1260        let run_path_count_summary = path_counts
1261            .iter()
1262            .map(|(path, count)| format!("{}={count}", path.display()))
1263            .collect::<Vec<_>>();
1264        tracing::info!(
1265            active_path = active_runs_path,
1266            path_counts = ?run_path_count_summary,
1267            merged_count = merged.len(),
1268            "loaded automation v2 runs"
1269        );
1270        *self.automation_v2_runs.write().await = merged;
1271        let recovered = self
1272            .recover_automation_definitions_from_run_snapshots()
1273            .await?;
1274        let automation_count = self.automations_v2.read().await.len();
1275        let run_count = self.automation_v2_runs.read().await.len();
1276        if automation_count == 0 && run_count > 0 {
1277            let active_automations_path = self.automations_v2_path.display().to_string();
1278            let active_runs_path = self.automation_v2_runs_path.display().to_string();
1279            tracing::warn!(
1280                active_automations_path,
1281                active_runs_path,
1282                run_count,
1283                "automation v2 definitions are empty while run history exists"
1284            );
1285        }
1286        if loaded_from_alternate || recovered > 0 {
1287            let _ = self.persist_automation_v2_runs().await;
1288        }
1289        Ok(())
1290    }
1291
1292    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1293        let payload = {
1294            let guard = self.automation_v2_runs.read().await;
1295            serde_json::to_string_pretty(&*guard)?
1296        };
1297        if let Some(parent) = self.automation_v2_runs_path.parent() {
1298            fs::create_dir_all(parent).await?;
1299        }
1300        fs::write(&self.automation_v2_runs_path, &payload).await?;
1301        Ok(())
1302    }
1303
1304    pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1305        if !self.optimization_campaigns_path.exists() {
1306            return Ok(());
1307        }
1308        let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1309        let parsed = parse_optimization_campaigns_file(&raw);
1310        *self.optimization_campaigns.write().await = parsed;
1311        Ok(())
1312    }
1313
1314    pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1315        let payload = {
1316            let guard = self.optimization_campaigns.read().await;
1317            serde_json::to_string_pretty(&*guard)?
1318        };
1319        if let Some(parent) = self.optimization_campaigns_path.parent() {
1320            fs::create_dir_all(parent).await?;
1321        }
1322        fs::write(&self.optimization_campaigns_path, payload).await?;
1323        Ok(())
1324    }
1325
1326    pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1327        if !self.optimization_experiments_path.exists() {
1328            return Ok(());
1329        }
1330        let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1331        let parsed = parse_optimization_experiments_file(&raw);
1332        *self.optimization_experiments.write().await = parsed;
1333        Ok(())
1334    }
1335
1336    pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1337        let payload = {
1338            let guard = self.optimization_experiments.read().await;
1339            serde_json::to_string_pretty(&*guard)?
1340        };
1341        if let Some(parent) = self.optimization_experiments_path.parent() {
1342            fs::create_dir_all(parent).await?;
1343        }
1344        fs::write(&self.optimization_experiments_path, payload).await?;
1345        Ok(())
1346    }
1347
1348    async fn verify_automation_v2_persisted(
1349        &self,
1350        automation_id: &str,
1351        expected_present: bool,
1352    ) -> anyhow::Result<()> {
1353        let active_raw = if self.automations_v2_path.exists() {
1354            fs::read_to_string(&self.automations_v2_path).await?
1355        } else {
1356            String::new()
1357        };
1358        let active_parsed = parse_automation_v2_file(&active_raw);
1359        let active_present = active_parsed.contains_key(automation_id);
1360        if active_present != expected_present {
1361            let active_path = self.automations_v2_path.display().to_string();
1362            tracing::error!(
1363                automation_id,
1364                expected_present,
1365                actual_present = active_present,
1366                count = active_parsed.len(),
1367                active_path,
1368                "automation v2 persistence verification failed"
1369            );
1370            anyhow::bail!(
1371                "automation v2 persistence verification failed for `{}`",
1372                automation_id
1373            );
1374        }
1375        let mut alternate_mismatches = Vec::new();
1376        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1377            if path == self.automations_v2_path {
1378                continue;
1379            }
1380            let raw = if path.exists() {
1381                fs::read_to_string(&path).await?
1382            } else {
1383                String::new()
1384            };
1385            let parsed = parse_automation_v2_file(&raw);
1386            let present = parsed.contains_key(automation_id);
1387            if present != expected_present {
1388                alternate_mismatches.push(format!(
1389                    "{} expected_present={} actual_present={} count={}",
1390                    path.display(),
1391                    expected_present,
1392                    present,
1393                    parsed.len()
1394                ));
1395            }
1396        }
1397        if !alternate_mismatches.is_empty() {
1398            let active_path = self.automations_v2_path.display().to_string();
1399            tracing::warn!(
1400                automation_id,
1401                expected_present,
1402                mismatches = ?alternate_mismatches,
1403                active_path,
1404                "automation v2 alternate persistence paths are stale"
1405            );
1406        }
1407        Ok(())
1408    }
1409
1410    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1411        let runs = self
1412            .automation_v2_runs
1413            .read()
1414            .await
1415            .values()
1416            .cloned()
1417            .collect::<Vec<_>>();
1418        let mut guard = self.automations_v2.write().await;
1419        let mut recovered = 0usize;
1420        for run in runs {
1421            let Some(snapshot) = run.automation_snapshot.clone() else {
1422                continue;
1423            };
1424            let should_replace = match guard.get(&run.automation_id) {
1425                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1426                None => true,
1427            };
1428            if should_replace {
1429                if !guard.contains_key(&run.automation_id) {
1430                    recovered += 1;
1431                }
1432                guard.insert(run.automation_id.clone(), snapshot);
1433            }
1434        }
1435        drop(guard);
1436        if recovered > 0 {
1437            let active_path = self.automations_v2_path.display().to_string();
1438            tracing::warn!(
1439                recovered,
1440                active_path,
1441                "recovered automation v2 definitions from run snapshots"
1442            );
1443            self.persist_automations_v2().await?;
1444        }
1445        Ok(recovered)
1446    }
1447
1448    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1449        let path = if self.bug_monitor_config_path.exists() {
1450            self.bug_monitor_config_path.clone()
1451        } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1452            .exists()
1453        {
1454            config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1455        } else {
1456            return Ok(());
1457        };
1458        let raw = fs::read_to_string(path).await?;
1459        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1460            .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1461        *self.bug_monitor_config.write().await = parsed;
1462        Ok(())
1463    }
1464
1465    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1466        if let Some(parent) = self.bug_monitor_config_path.parent() {
1467            fs::create_dir_all(parent).await?;
1468        }
1469        let payload = {
1470            let guard = self.bug_monitor_config.read().await;
1471            serde_json::to_string_pretty(&*guard)?
1472        };
1473        fs::write(&self.bug_monitor_config_path, payload).await?;
1474        Ok(())
1475    }
1476
1477    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1478        self.bug_monitor_config.read().await.clone()
1479    }
1480
1481    pub async fn put_bug_monitor_config(
1482        &self,
1483        mut config: BugMonitorConfig,
1484    ) -> anyhow::Result<BugMonitorConfig> {
1485        config.workspace_root = config
1486            .workspace_root
1487            .as_ref()
1488            .map(|v| v.trim().to_string())
1489            .filter(|v| !v.is_empty());
1490        if let Some(repo) = config.repo.as_ref() {
1491            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1492                anyhow::bail!("repo must be in owner/repo format");
1493            }
1494        }
1495        if let Some(server) = config.mcp_server.as_ref() {
1496            let servers = self.mcp.list().await;
1497            if !servers.contains_key(server) {
1498                anyhow::bail!("unknown mcp server `{server}`");
1499            }
1500        }
1501        if let Some(model_policy) = config.model_policy.as_ref() {
1502            crate::http::routines_automations::validate_model_policy(model_policy)
1503                .map_err(anyhow::Error::msg)?;
1504        }
1505        config.updated_at_ms = now_ms();
1506        *self.bug_monitor_config.write().await = config.clone();
1507        self.persist_bug_monitor_config().await?;
1508        Ok(config)
1509    }
1510
1511    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1512        let path = if self.bug_monitor_drafts_path.exists() {
1513            self.bug_monitor_drafts_path.clone()
1514        } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1515            .exists()
1516        {
1517            config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1518        } else {
1519            return Ok(());
1520        };
1521        let raw = fs::read_to_string(path).await?;
1522        let parsed =
1523            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1524                .unwrap_or_default();
1525        *self.bug_monitor_drafts.write().await = parsed;
1526        Ok(())
1527    }
1528
1529    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1530        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1531            fs::create_dir_all(parent).await?;
1532        }
1533        let payload = {
1534            let guard = self.bug_monitor_drafts.read().await;
1535            serde_json::to_string_pretty(&*guard)?
1536        };
1537        fs::write(&self.bug_monitor_drafts_path, payload).await?;
1538        Ok(())
1539    }
1540
1541    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1542        let path = if self.bug_monitor_incidents_path.exists() {
1543            self.bug_monitor_incidents_path.clone()
1544        } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1545            .exists()
1546        {
1547            config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1548        } else {
1549            return Ok(());
1550        };
1551        let raw = fs::read_to_string(path).await?;
1552        let parsed = serde_json::from_str::<
1553            std::collections::HashMap<String, BugMonitorIncidentRecord>,
1554        >(&raw)
1555        .unwrap_or_default();
1556        *self.bug_monitor_incidents.write().await = parsed;
1557        Ok(())
1558    }
1559
1560    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1561        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1562            fs::create_dir_all(parent).await?;
1563        }
1564        let payload = {
1565            let guard = self.bug_monitor_incidents.read().await;
1566            serde_json::to_string_pretty(&*guard)?
1567        };
1568        fs::write(&self.bug_monitor_incidents_path, payload).await?;
1569        Ok(())
1570    }
1571
1572    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1573        let path = if self.bug_monitor_posts_path.exists() {
1574            self.bug_monitor_posts_path.clone()
1575        } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1576            .exists()
1577        {
1578            config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1579        } else {
1580            return Ok(());
1581        };
1582        let raw = fs::read_to_string(path).await?;
1583        let parsed =
1584            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1585                .unwrap_or_default();
1586        *self.bug_monitor_posts.write().await = parsed;
1587        Ok(())
1588    }
1589
1590    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1591        if let Some(parent) = self.bug_monitor_posts_path.parent() {
1592            fs::create_dir_all(parent).await?;
1593        }
1594        let payload = {
1595            let guard = self.bug_monitor_posts.read().await;
1596            serde_json::to_string_pretty(&*guard)?
1597        };
1598        fs::write(&self.bug_monitor_posts_path, payload).await?;
1599        Ok(())
1600    }
1601
1602    pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1603        if !self.external_actions_path.exists() {
1604            return Ok(());
1605        }
1606        let raw = fs::read_to_string(&self.external_actions_path).await?;
1607        let parsed =
1608            serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1609                .unwrap_or_default();
1610        *self.external_actions.write().await = parsed;
1611        Ok(())
1612    }
1613
1614    pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1615        if let Some(parent) = self.external_actions_path.parent() {
1616            fs::create_dir_all(parent).await?;
1617        }
1618        let payload = {
1619            let guard = self.external_actions.read().await;
1620            serde_json::to_string_pretty(&*guard)?
1621        };
1622        fs::write(&self.external_actions_path, payload).await?;
1623        Ok(())
1624    }
1625
1626    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1627        let mut rows = self
1628            .bug_monitor_incidents
1629            .read()
1630            .await
1631            .values()
1632            .cloned()
1633            .collect::<Vec<_>>();
1634        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1635        rows.truncate(limit.clamp(1, 200));
1636        rows
1637    }
1638
1639    pub async fn get_bug_monitor_incident(
1640        &self,
1641        incident_id: &str,
1642    ) -> Option<BugMonitorIncidentRecord> {
1643        self.bug_monitor_incidents
1644            .read()
1645            .await
1646            .get(incident_id)
1647            .cloned()
1648    }
1649
1650    pub async fn put_bug_monitor_incident(
1651        &self,
1652        incident: BugMonitorIncidentRecord,
1653    ) -> anyhow::Result<BugMonitorIncidentRecord> {
1654        self.bug_monitor_incidents
1655            .write()
1656            .await
1657            .insert(incident.incident_id.clone(), incident.clone());
1658        self.persist_bug_monitor_incidents().await?;
1659        Ok(incident)
1660    }
1661
1662    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1663        let mut rows = self
1664            .bug_monitor_posts
1665            .read()
1666            .await
1667            .values()
1668            .cloned()
1669            .collect::<Vec<_>>();
1670        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1671        rows.truncate(limit.clamp(1, 200));
1672        rows
1673    }
1674
1675    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1676        self.bug_monitor_posts.read().await.get(post_id).cloned()
1677    }
1678
1679    pub async fn put_bug_monitor_post(
1680        &self,
1681        post: BugMonitorPostRecord,
1682    ) -> anyhow::Result<BugMonitorPostRecord> {
1683        self.bug_monitor_posts
1684            .write()
1685            .await
1686            .insert(post.post_id.clone(), post.clone());
1687        self.persist_bug_monitor_posts().await?;
1688        Ok(post)
1689    }
1690
1691    pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1692        let mut rows = self
1693            .external_actions
1694            .read()
1695            .await
1696            .values()
1697            .cloned()
1698            .collect::<Vec<_>>();
1699        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1700        rows.truncate(limit.clamp(1, 200));
1701        rows
1702    }
1703
1704    pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1705        self.external_actions.read().await.get(action_id).cloned()
1706    }
1707
1708    pub async fn get_external_action_by_idempotency_key(
1709        &self,
1710        idempotency_key: &str,
1711    ) -> Option<ExternalActionRecord> {
1712        let normalized = idempotency_key.trim();
1713        if normalized.is_empty() {
1714            return None;
1715        }
1716        self.external_actions
1717            .read()
1718            .await
1719            .values()
1720            .find(|action| {
1721                action
1722                    .idempotency_key
1723                    .as_deref()
1724                    .map(str::trim)
1725                    .filter(|value| !value.is_empty())
1726                    == Some(normalized)
1727            })
1728            .cloned()
1729    }
1730
1731    pub async fn put_external_action(
1732        &self,
1733        action: ExternalActionRecord,
1734    ) -> anyhow::Result<ExternalActionRecord> {
1735        self.external_actions
1736            .write()
1737            .await
1738            .insert(action.action_id.clone(), action.clone());
1739        self.persist_external_actions().await?;
1740        Ok(action)
1741    }
1742
1743    pub async fn record_external_action(
1744        &self,
1745        action: ExternalActionRecord,
1746    ) -> anyhow::Result<ExternalActionRecord> {
1747        let action = {
1748            let mut guard = self.external_actions.write().await;
1749            if let Some(idempotency_key) = action
1750                .idempotency_key
1751                .as_deref()
1752                .map(str::trim)
1753                .filter(|value| !value.is_empty())
1754            {
1755                if let Some(existing) = guard
1756                    .values()
1757                    .find(|existing| {
1758                        existing
1759                            .idempotency_key
1760                            .as_deref()
1761                            .map(str::trim)
1762                            .filter(|value| !value.is_empty())
1763                            == Some(idempotency_key)
1764                    })
1765                    .cloned()
1766                {
1767                    return Ok(existing);
1768                }
1769            }
1770            guard.insert(action.action_id.clone(), action.clone());
1771            action
1772        };
1773        self.persist_external_actions().await?;
1774        if let Some(run_id) = action.routine_run_id.as_deref() {
1775            let artifact = RoutineRunArtifact {
1776                artifact_id: format!("external-action-{}", action.action_id),
1777                uri: format!("external-action://{}", action.action_id),
1778                kind: "external_action_receipt".to_string(),
1779                label: Some(format!("external action receipt: {}", action.operation)),
1780                created_at_ms: action.updated_at_ms,
1781                metadata: Some(json!({
1782                    "actionID": action.action_id,
1783                    "operation": action.operation,
1784                    "status": action.status,
1785                    "sourceKind": action.source_kind,
1786                    "sourceID": action.source_id,
1787                    "capabilityID": action.capability_id,
1788                    "target": action.target,
1789                })),
1790            };
1791            let _ = self
1792                .append_routine_run_artifact(run_id, artifact.clone())
1793                .await;
1794            if let Some(runtime) = self.runtime.get() {
1795                runtime.event_bus.publish(EngineEvent::new(
1796                    "routine.run.artifact_added",
1797                    json!({
1798                        "runID": run_id,
1799                        "artifact": artifact,
1800                    }),
1801                ));
1802            }
1803        }
1804        if let Some(context_run_id) = action.context_run_id.as_deref() {
1805            let payload = serde_json::to_value(&action)?;
1806            if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1807                self,
1808                context_run_id,
1809                &format!("external-action-{}", action.action_id),
1810                "external_action_receipt",
1811                &format!("external-actions/{}.json", action.action_id),
1812                &payload,
1813            )
1814            .await
1815            {
1816                tracing::warn!(
1817                    "failed to append external action artifact {} to context run {}: {}",
1818                    action.action_id,
1819                    context_run_id,
1820                    error
1821                );
1822            }
1823        }
1824        Ok(action)
1825    }
1826
1827    pub async fn update_bug_monitor_runtime_status(
1828        &self,
1829        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1830    ) -> BugMonitorRuntimeStatus {
1831        let mut guard = self.bug_monitor_runtime_status.write().await;
1832        update(&mut guard);
1833        guard.clone()
1834    }
1835
1836    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1837        let mut rows = self
1838            .bug_monitor_drafts
1839            .read()
1840            .await
1841            .values()
1842            .cloned()
1843            .collect::<Vec<_>>();
1844        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1845        rows.truncate(limit.clamp(1, 200));
1846        rows
1847    }
1848
1849    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1850        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1851    }
1852
1853    pub async fn put_bug_monitor_draft(
1854        &self,
1855        draft: BugMonitorDraftRecord,
1856    ) -> anyhow::Result<BugMonitorDraftRecord> {
1857        self.bug_monitor_drafts
1858            .write()
1859            .await
1860            .insert(draft.draft_id.clone(), draft.clone());
1861        self.persist_bug_monitor_drafts().await?;
1862        Ok(draft)
1863    }
1864
1865    pub async fn submit_bug_monitor_draft(
1866        &self,
1867        mut submission: BugMonitorSubmission,
1868    ) -> anyhow::Result<BugMonitorDraftRecord> {
1869        fn normalize_optional(value: Option<String>) -> Option<String> {
1870            value
1871                .map(|v| v.trim().to_string())
1872                .filter(|v| !v.is_empty())
1873        }
1874
1875        fn compute_fingerprint(parts: &[&str]) -> String {
1876            use std::hash::{Hash, Hasher};
1877
1878            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1879            for part in parts {
1880                part.hash(&mut hasher);
1881            }
1882            format!("{:016x}", hasher.finish())
1883        }
1884
1885        submission.repo = normalize_optional(submission.repo);
1886        submission.title = normalize_optional(submission.title);
1887        submission.detail = normalize_optional(submission.detail);
1888        submission.source = normalize_optional(submission.source);
1889        submission.run_id = normalize_optional(submission.run_id);
1890        submission.session_id = normalize_optional(submission.session_id);
1891        submission.correlation_id = normalize_optional(submission.correlation_id);
1892        submission.file_name = normalize_optional(submission.file_name);
1893        submission.process = normalize_optional(submission.process);
1894        submission.component = normalize_optional(submission.component);
1895        submission.event = normalize_optional(submission.event);
1896        submission.level = normalize_optional(submission.level);
1897        submission.fingerprint = normalize_optional(submission.fingerprint);
1898        submission.excerpt = submission
1899            .excerpt
1900            .into_iter()
1901            .map(|line| line.trim_end().to_string())
1902            .filter(|line| !line.is_empty())
1903            .take(50)
1904            .collect();
1905
1906        let config = self.bug_monitor_config().await;
1907        let repo = submission
1908            .repo
1909            .clone()
1910            .or(config.repo.clone())
1911            .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1912        if !is_valid_owner_repo_slug(&repo) {
1913            anyhow::bail!("Bug Monitor repo must be in owner/repo format");
1914        }
1915
1916        let title = submission.title.clone().unwrap_or_else(|| {
1917            if let Some(event) = submission.event.as_ref() {
1918                format!("Failure detected in {event}")
1919            } else if let Some(component) = submission.component.as_ref() {
1920                format!("Failure detected in {component}")
1921            } else if let Some(process) = submission.process.as_ref() {
1922                format!("Failure detected in {process}")
1923            } else if let Some(source) = submission.source.as_ref() {
1924                format!("Failure report from {source}")
1925            } else {
1926                "Failure report".to_string()
1927            }
1928        });
1929
1930        let mut detail_lines = Vec::new();
1931        if let Some(source) = submission.source.as_ref() {
1932            detail_lines.push(format!("source: {source}"));
1933        }
1934        if let Some(file_name) = submission.file_name.as_ref() {
1935            detail_lines.push(format!("file: {file_name}"));
1936        }
1937        if let Some(level) = submission.level.as_ref() {
1938            detail_lines.push(format!("level: {level}"));
1939        }
1940        if let Some(process) = submission.process.as_ref() {
1941            detail_lines.push(format!("process: {process}"));
1942        }
1943        if let Some(component) = submission.component.as_ref() {
1944            detail_lines.push(format!("component: {component}"));
1945        }
1946        if let Some(event) = submission.event.as_ref() {
1947            detail_lines.push(format!("event: {event}"));
1948        }
1949        if let Some(run_id) = submission.run_id.as_ref() {
1950            detail_lines.push(format!("run_id: {run_id}"));
1951        }
1952        if let Some(session_id) = submission.session_id.as_ref() {
1953            detail_lines.push(format!("session_id: {session_id}"));
1954        }
1955        if let Some(correlation_id) = submission.correlation_id.as_ref() {
1956            detail_lines.push(format!("correlation_id: {correlation_id}"));
1957        }
1958        if let Some(detail) = submission.detail.as_ref() {
1959            detail_lines.push(String::new());
1960            detail_lines.push(detail.clone());
1961        }
1962        if !submission.excerpt.is_empty() {
1963            if !detail_lines.is_empty() {
1964                detail_lines.push(String::new());
1965            }
1966            detail_lines.push("excerpt:".to_string());
1967            detail_lines.extend(submission.excerpt.iter().map(|line| format!("  {line}")));
1968        }
1969        let detail = if detail_lines.is_empty() {
1970            None
1971        } else {
1972            Some(detail_lines.join("\n"))
1973        };
1974
1975        let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
1976            compute_fingerprint(&[
1977                repo.as_str(),
1978                title.as_str(),
1979                detail.as_deref().unwrap_or(""),
1980                submission.source.as_deref().unwrap_or(""),
1981                submission.run_id.as_deref().unwrap_or(""),
1982                submission.session_id.as_deref().unwrap_or(""),
1983                submission.correlation_id.as_deref().unwrap_or(""),
1984            ])
1985        });
1986
1987        let mut drafts = self.bug_monitor_drafts.write().await;
1988        if let Some(existing) = drafts
1989            .values()
1990            .find(|row| row.repo == repo && row.fingerprint == fingerprint)
1991            .cloned()
1992        {
1993            return Ok(existing);
1994        }
1995
1996        let draft = BugMonitorDraftRecord {
1997            draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
1998            fingerprint,
1999            repo,
2000            status: if config.require_approval_for_new_issues {
2001                "approval_required".to_string()
2002            } else {
2003                "draft_ready".to_string()
2004            },
2005            created_at_ms: now_ms(),
2006            triage_run_id: None,
2007            issue_number: None,
2008            title: Some(title),
2009            detail,
2010            github_status: None,
2011            github_issue_url: None,
2012            github_comment_url: None,
2013            github_posted_at_ms: None,
2014            matched_issue_number: None,
2015            matched_issue_state: None,
2016            evidence_digest: None,
2017            last_post_error: None,
2018        };
2019        drafts.insert(draft.draft_id.clone(), draft.clone());
2020        drop(drafts);
2021        self.persist_bug_monitor_drafts().await?;
2022        Ok(draft)
2023    }
2024
2025    pub async fn update_bug_monitor_draft_status(
2026        &self,
2027        draft_id: &str,
2028        next_status: &str,
2029        reason: Option<&str>,
2030    ) -> anyhow::Result<BugMonitorDraftRecord> {
2031        let normalized_status = next_status.trim().to_ascii_lowercase();
2032        if normalized_status != "draft_ready" && normalized_status != "denied" {
2033            anyhow::bail!("unsupported Bug Monitor draft status");
2034        }
2035
2036        let mut drafts = self.bug_monitor_drafts.write().await;
2037        let Some(draft) = drafts.get_mut(draft_id) else {
2038            anyhow::bail!("Bug Monitor draft not found");
2039        };
2040        if !draft.status.eq_ignore_ascii_case("approval_required") {
2041            anyhow::bail!("Bug Monitor draft is not waiting for approval");
2042        }
2043        draft.status = normalized_status.clone();
2044        if let Some(reason) = reason
2045            .map(|value| value.trim())
2046            .filter(|value| !value.is_empty())
2047        {
2048            let next_detail = if let Some(detail) = draft.detail.as_ref() {
2049                format!("{detail}\n\noperator_note: {reason}")
2050            } else {
2051                format!("operator_note: {reason}")
2052            };
2053            draft.detail = Some(next_detail);
2054        }
2055        let updated = draft.clone();
2056        drop(drafts);
2057        self.persist_bug_monitor_drafts().await?;
2058
2059        let event_name = if normalized_status == "draft_ready" {
2060            "bug_monitor.draft.approved"
2061        } else {
2062            "bug_monitor.draft.denied"
2063        };
2064        self.event_bus.publish(EngineEvent::new(
2065            event_name,
2066            serde_json::json!({
2067                "draft_id": updated.draft_id,
2068                "repo": updated.repo,
2069                "status": updated.status,
2070                "reason": reason,
2071            }),
2072        ));
2073        Ok(updated)
2074    }
2075
2076    pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2077        let required_capabilities = vec![
2078            "github.list_issues".to_string(),
2079            "github.get_issue".to_string(),
2080            "github.create_issue".to_string(),
2081            "github.comment_on_issue".to_string(),
2082        ];
2083        let config = self.bug_monitor_config().await;
2084        let drafts = self.bug_monitor_drafts.read().await;
2085        let incidents = self.bug_monitor_incidents.read().await;
2086        let posts = self.bug_monitor_posts.read().await;
2087        let total_incidents = incidents.len();
2088        let pending_incidents = incidents
2089            .values()
2090            .filter(|row| {
2091                matches!(
2092                    row.status.as_str(),
2093                    "queued"
2094                        | "draft_created"
2095                        | "triage_queued"
2096                        | "analysis_queued"
2097                        | "triage_pending"
2098                        | "issue_draft_pending"
2099                )
2100            })
2101            .count();
2102        let pending_drafts = drafts
2103            .values()
2104            .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2105            .count();
2106        let pending_posts = posts
2107            .values()
2108            .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2109            .count();
2110        let last_activity_at_ms = drafts
2111            .values()
2112            .map(|row| row.created_at_ms)
2113            .chain(posts.values().map(|row| row.updated_at_ms))
2114            .max();
2115        drop(drafts);
2116        drop(incidents);
2117        drop(posts);
2118        let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2119        runtime.paused = config.paused;
2120        runtime.total_incidents = total_incidents;
2121        runtime.pending_incidents = pending_incidents;
2122        runtime.pending_posts = pending_posts;
2123
2124        let mut status = BugMonitorStatus {
2125            config: config.clone(),
2126            runtime,
2127            pending_drafts,
2128            pending_posts,
2129            last_activity_at_ms,
2130            ..BugMonitorStatus::default()
2131        };
2132        let repo_valid = config
2133            .repo
2134            .as_ref()
2135            .map(|repo| is_valid_owner_repo_slug(repo))
2136            .unwrap_or(false);
2137        let servers = self.mcp.list().await;
2138        let selected_server = config
2139            .mcp_server
2140            .as_ref()
2141            .and_then(|name| servers.get(name))
2142            .cloned();
2143        let provider_catalog = self.providers.list().await;
2144        let selected_model = config
2145            .model_policy
2146            .as_ref()
2147            .and_then(|policy| policy.get("default_model"))
2148            .and_then(crate::app::routines::parse_model_spec);
2149        let selected_model_ready = selected_model
2150            .as_ref()
2151            .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2152            .unwrap_or(false);
2153        let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2154            self.mcp.server_tools(server_name).await
2155        } else {
2156            Vec::new()
2157        };
2158        let discovered_tools = self
2159            .capability_resolver
2160            .discover_from_runtime(selected_server_tools, Vec::new())
2161            .await;
2162        status.discovered_mcp_tools = discovered_tools
2163            .iter()
2164            .map(|row| row.tool_name.clone())
2165            .collect();
2166        let discovered_providers = discovered_tools
2167            .iter()
2168            .map(|row| row.provider.to_ascii_lowercase())
2169            .collect::<std::collections::HashSet<_>>();
2170        let provider_preference = match config.provider_preference {
2171            BugMonitorProviderPreference::OfficialGithub => {
2172                vec![
2173                    "mcp".to_string(),
2174                    "composio".to_string(),
2175                    "arcade".to_string(),
2176                ]
2177            }
2178            BugMonitorProviderPreference::Composio => {
2179                vec![
2180                    "composio".to_string(),
2181                    "mcp".to_string(),
2182                    "arcade".to_string(),
2183                ]
2184            }
2185            BugMonitorProviderPreference::Arcade => {
2186                vec![
2187                    "arcade".to_string(),
2188                    "mcp".to_string(),
2189                    "composio".to_string(),
2190                ]
2191            }
2192            BugMonitorProviderPreference::Auto => {
2193                vec![
2194                    "mcp".to_string(),
2195                    "composio".to_string(),
2196                    "arcade".to_string(),
2197                ]
2198            }
2199        };
2200        let capability_resolution = self
2201            .capability_resolver
2202            .resolve(
2203                crate::capability_resolver::CapabilityResolveInput {
2204                    workflow_id: Some("bug_monitor".to_string()),
2205                    required_capabilities: required_capabilities.clone(),
2206                    optional_capabilities: Vec::new(),
2207                    provider_preference,
2208                    available_tools: discovered_tools,
2209                },
2210                Vec::new(),
2211            )
2212            .await
2213            .ok();
2214        let bindings_file = self.capability_resolver.list_bindings().await.ok();
2215        if let Some(bindings) = bindings_file.as_ref() {
2216            status.binding_source_version = bindings.builtin_version.clone();
2217            status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2218            status.selected_server_binding_candidates = bindings
2219                .bindings
2220                .iter()
2221                .filter(|binding| required_capabilities.contains(&binding.capability_id))
2222                .filter(|binding| {
2223                    discovered_providers.is_empty()
2224                        || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2225                })
2226                .map(|binding| {
2227                    let binding_key = format!(
2228                        "{}::{}",
2229                        binding.capability_id,
2230                        binding.tool_name.to_ascii_lowercase()
2231                    );
2232                    let matched = capability_resolution
2233                        .as_ref()
2234                        .map(|resolution| {
2235                            resolution.resolved.iter().any(|row| {
2236                                row.capability_id == binding.capability_id
2237                                    && format!(
2238                                        "{}::{}",
2239                                        row.capability_id,
2240                                        row.tool_name.to_ascii_lowercase()
2241                                    ) == binding_key
2242                            })
2243                        })
2244                        .unwrap_or(false);
2245                    BugMonitorBindingCandidate {
2246                        capability_id: binding.capability_id.clone(),
2247                        binding_tool_name: binding.tool_name.clone(),
2248                        aliases: binding.tool_name_aliases.clone(),
2249                        matched,
2250                    }
2251                })
2252                .collect();
2253            status.selected_server_binding_candidates.sort_by(|a, b| {
2254                a.capability_id
2255                    .cmp(&b.capability_id)
2256                    .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2257            });
2258        }
2259        let capability_ready = |capability_id: &str| -> bool {
2260            capability_resolution
2261                .as_ref()
2262                .map(|resolved| {
2263                    resolved
2264                        .resolved
2265                        .iter()
2266                        .any(|row| row.capability_id == capability_id)
2267                })
2268                .unwrap_or(false)
2269        };
2270        if let Some(resolution) = capability_resolution.as_ref() {
2271            status.missing_required_capabilities = resolution.missing_required.clone();
2272            status.resolved_capabilities = resolution
2273                .resolved
2274                .iter()
2275                .map(|row| BugMonitorCapabilityMatch {
2276                    capability_id: row.capability_id.clone(),
2277                    provider: row.provider.clone(),
2278                    tool_name: row.tool_name.clone(),
2279                    binding_index: row.binding_index,
2280                })
2281                .collect();
2282        } else {
2283            status.missing_required_capabilities = required_capabilities.clone();
2284        }
2285        status.required_capabilities = BugMonitorCapabilityReadiness {
2286            github_list_issues: capability_ready("github.list_issues"),
2287            github_get_issue: capability_ready("github.get_issue"),
2288            github_create_issue: capability_ready("github.create_issue"),
2289            github_comment_on_issue: capability_ready("github.comment_on_issue"),
2290        };
2291        status.selected_model = selected_model;
2292        status.readiness = BugMonitorReadiness {
2293            config_valid: repo_valid
2294                && selected_server.is_some()
2295                && status.required_capabilities.github_list_issues
2296                && status.required_capabilities.github_get_issue
2297                && status.required_capabilities.github_create_issue
2298                && status.required_capabilities.github_comment_on_issue
2299                && selected_model_ready,
2300            repo_valid,
2301            mcp_server_present: selected_server.is_some(),
2302            mcp_connected: selected_server
2303                .as_ref()
2304                .map(|row| row.connected)
2305                .unwrap_or(false),
2306            github_read_ready: status.required_capabilities.github_list_issues
2307                && status.required_capabilities.github_get_issue,
2308            github_write_ready: status.required_capabilities.github_create_issue
2309                && status.required_capabilities.github_comment_on_issue,
2310            selected_model_ready,
2311            ingest_ready: config.enabled && !config.paused && repo_valid,
2312            publish_ready: config.enabled
2313                && !config.paused
2314                && repo_valid
2315                && selected_server
2316                    .as_ref()
2317                    .map(|row| row.connected)
2318                    .unwrap_or(false)
2319                && status.required_capabilities.github_list_issues
2320                && status.required_capabilities.github_get_issue
2321                && status.required_capabilities.github_create_issue
2322                && status.required_capabilities.github_comment_on_issue
2323                && selected_model_ready,
2324            runtime_ready: config.enabled
2325                && !config.paused
2326                && repo_valid
2327                && selected_server
2328                    .as_ref()
2329                    .map(|row| row.connected)
2330                    .unwrap_or(false)
2331                && status.required_capabilities.github_list_issues
2332                && status.required_capabilities.github_get_issue
2333                && status.required_capabilities.github_create_issue
2334                && status.required_capabilities.github_comment_on_issue
2335                && selected_model_ready,
2336        };
2337        if config.enabled {
2338            if config.paused {
2339                status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2340            } else if !repo_valid {
2341                status.last_error = Some("Target repo is missing or invalid.".to_string());
2342            } else if selected_server.is_none() {
2343                status.last_error = Some("Selected MCP server is missing.".to_string());
2344            } else if !status.readiness.mcp_connected {
2345                status.last_error = Some("Selected MCP server is disconnected.".to_string());
2346            } else if !selected_model_ready {
2347                status.last_error = Some(
2348                    "Selected provider/model is unavailable. Bug monitor is fail-closed."
2349                        .to_string(),
2350                );
2351            } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2352                let missing = if status.missing_required_capabilities.is_empty() {
2353                    "unknown".to_string()
2354                } else {
2355                    status.missing_required_capabilities.join(", ")
2356                };
2357                status.last_error = Some(format!(
2358                    "Selected MCP server is missing required GitHub capabilities: {missing}"
2359                ));
2360            }
2361        }
2362        status.runtime.monitoring_active = status.readiness.ingest_ready;
2363        status
2364    }
2365
2366    pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2367        if !self.workflow_runs_path.exists() {
2368            return Ok(());
2369        }
2370        let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2371        let parsed =
2372            serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2373                .unwrap_or_default();
2374        *self.workflow_runs.write().await = parsed;
2375        Ok(())
2376    }
2377
2378    pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2379        if let Some(parent) = self.workflow_runs_path.parent() {
2380            fs::create_dir_all(parent).await?;
2381        }
2382        let payload = {
2383            let guard = self.workflow_runs.read().await;
2384            serde_json::to_string_pretty(&*guard)?
2385        };
2386        fs::write(&self.workflow_runs_path, payload).await?;
2387        Ok(())
2388    }
2389
2390    pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2391        if !self.workflow_hook_overrides_path.exists() {
2392            return Ok(());
2393        }
2394        let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2395        let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2396            .unwrap_or_default();
2397        *self.workflow_hook_overrides.write().await = parsed;
2398        Ok(())
2399    }
2400
2401    pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2402        if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2403            fs::create_dir_all(parent).await?;
2404        }
2405        let payload = {
2406            let guard = self.workflow_hook_overrides.read().await;
2407            serde_json::to_string_pretty(&*guard)?
2408        };
2409        fs::write(&self.workflow_hook_overrides_path, payload).await?;
2410        Ok(())
2411    }
2412
2413    pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2414        let mut sources = Vec::new();
2415        sources.push(WorkflowLoadSource {
2416            root: config::paths::resolve_builtin_workflows_dir(),
2417            kind: WorkflowSourceKind::BuiltIn,
2418            pack_id: None,
2419        });
2420
2421        let workspace_root = self.workspace_index.snapshot().await.root;
2422        sources.push(WorkflowLoadSource {
2423            root: PathBuf::from(workspace_root).join(".tandem"),
2424            kind: WorkflowSourceKind::Workspace,
2425            pack_id: None,
2426        });
2427
2428        if let Ok(packs) = self.pack_manager.list().await {
2429            for pack in packs {
2430                sources.push(WorkflowLoadSource {
2431                    root: PathBuf::from(pack.install_path),
2432                    kind: WorkflowSourceKind::Pack,
2433                    pack_id: Some(pack.pack_id),
2434                });
2435            }
2436        }
2437
2438        let mut registry = load_workflow_registry(&sources)?;
2439        let overrides = self.workflow_hook_overrides.read().await.clone();
2440        for hook in &mut registry.hooks {
2441            if let Some(enabled) = overrides.get(&hook.binding_id) {
2442                hook.enabled = *enabled;
2443            }
2444        }
2445        for workflow in registry.workflows.values_mut() {
2446            workflow.hooks = registry
2447                .hooks
2448                .iter()
2449                .filter(|hook| hook.workflow_id == workflow.workflow_id)
2450                .cloned()
2451                .collect();
2452        }
2453        let messages = validate_workflow_registry(&registry);
2454        *self.workflows.write().await = registry;
2455        Ok(messages)
2456    }
2457
2458    pub async fn workflow_registry(&self) -> WorkflowRegistry {
2459        self.workflows.read().await.clone()
2460    }
2461
2462    pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2463        let mut rows = self
2464            .workflows
2465            .read()
2466            .await
2467            .workflows
2468            .values()
2469            .cloned()
2470            .collect::<Vec<_>>();
2471        rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2472        rows
2473    }
2474
2475    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2476        self.workflows
2477            .read()
2478            .await
2479            .workflows
2480            .get(workflow_id)
2481            .cloned()
2482    }
2483
2484    pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2485        let mut rows = self
2486            .workflows
2487            .read()
2488            .await
2489            .hooks
2490            .iter()
2491            .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2492            .cloned()
2493            .collect::<Vec<_>>();
2494        rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2495        rows
2496    }
2497
2498    pub async fn set_workflow_hook_enabled(
2499        &self,
2500        binding_id: &str,
2501        enabled: bool,
2502    ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2503        self.workflow_hook_overrides
2504            .write()
2505            .await
2506            .insert(binding_id.to_string(), enabled);
2507        self.persist_workflow_hook_overrides().await?;
2508        let _ = self.reload_workflows().await?;
2509        Ok(self
2510            .workflows
2511            .read()
2512            .await
2513            .hooks
2514            .iter()
2515            .find(|hook| hook.binding_id == binding_id)
2516            .cloned())
2517    }
2518
2519    pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2520        self.workflow_runs
2521            .write()
2522            .await
2523            .insert(run.run_id.clone(), run);
2524        self.persist_workflow_runs().await
2525    }
2526
2527    pub async fn update_workflow_run(
2528        &self,
2529        run_id: &str,
2530        update: impl FnOnce(&mut WorkflowRunRecord),
2531    ) -> Option<WorkflowRunRecord> {
2532        let mut guard = self.workflow_runs.write().await;
2533        let row = guard.get_mut(run_id)?;
2534        update(row);
2535        row.updated_at_ms = now_ms();
2536        if matches!(
2537            row.status,
2538            WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2539        ) {
2540            row.finished_at_ms.get_or_insert_with(now_ms);
2541        }
2542        let out = row.clone();
2543        drop(guard);
2544        let _ = self.persist_workflow_runs().await;
2545        Some(out)
2546    }
2547
2548    pub async fn list_workflow_runs(
2549        &self,
2550        workflow_id: Option<&str>,
2551        limit: usize,
2552    ) -> Vec<WorkflowRunRecord> {
2553        let mut rows = self
2554            .workflow_runs
2555            .read()
2556            .await
2557            .values()
2558            .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2559            .cloned()
2560            .collect::<Vec<_>>();
2561        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2562        rows.truncate(limit.clamp(1, 500));
2563        rows
2564    }
2565
2566    pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2567        self.workflow_runs.read().await.get(run_id).cloned()
2568    }
2569
2570    pub async fn put_automation_v2(
2571        &self,
2572        mut automation: AutomationV2Spec,
2573    ) -> anyhow::Result<AutomationV2Spec> {
2574        if automation.automation_id.trim().is_empty() {
2575            anyhow::bail!("automation_id is required");
2576        }
2577        for agent in &mut automation.agents {
2578            if agent.display_name.trim().is_empty() {
2579                agent.display_name = auto_generated_agent_name(&agent.agent_id);
2580            }
2581            agent.tool_policy.allowlist =
2582                config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2583            agent.tool_policy.denylist =
2584                config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2585            agent.mcp_policy.allowed_servers =
2586                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2587            agent.mcp_policy.allowed_tools = agent
2588                .mcp_policy
2589                .allowed_tools
2590                .take()
2591                .map(normalize_allowed_tools);
2592        }
2593        let now = now_ms();
2594        if automation.created_at_ms == 0 {
2595            automation.created_at_ms = now;
2596        }
2597        automation.updated_at_ms = now;
2598        if automation.next_fire_at_ms.is_none() {
2599            automation.next_fire_at_ms =
2600                automation_schedule_next_fire_at_ms(&automation.schedule, now);
2601        }
2602        migrate_bundled_studio_research_split_automation(&mut automation);
2603        self.automations_v2
2604            .write()
2605            .await
2606            .insert(automation.automation_id.clone(), automation.clone());
2607        self.persist_automations_v2().await?;
2608        self.verify_automation_v2_persisted(&automation.automation_id, true)
2609            .await?;
2610        Ok(automation)
2611    }
2612
2613    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2614        self.automations_v2.read().await.get(automation_id).cloned()
2615    }
2616
2617    pub fn automation_v2_runtime_context(
2618        &self,
2619        run: &AutomationV2RunRecord,
2620    ) -> Option<AutomationRuntimeContextMaterialization> {
2621        run.runtime_context.clone().or_else(|| {
2622            run.automation_snapshot.as_ref().and_then(|automation| {
2623                automation
2624                    .runtime_context_materialization()
2625                    .or_else(|| automation.approved_plan_runtime_context_materialization())
2626            })
2627        })
2628    }
2629
2630    pub(crate) fn automation_v2_approved_plan_materialization(
2631        &self,
2632        run: &AutomationV2RunRecord,
2633    ) -> Option<tandem_plan_compiler::api::ApprovedPlanMaterialization> {
2634        run.automation_snapshot
2635            .as_ref()
2636            .and_then(AutomationV2Spec::approved_plan_materialization)
2637    }
2638
2639    pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2640        self.workflow_plans
2641            .write()
2642            .await
2643            .insert(plan.plan_id.clone(), plan);
2644    }
2645
2646    pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2647        self.workflow_plans.read().await.get(plan_id).cloned()
2648    }
2649
2650    pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2651        self.workflow_plan_drafts
2652            .write()
2653            .await
2654            .insert(draft.current_plan.plan_id.clone(), draft.clone());
2655        self.put_workflow_plan(draft.current_plan).await;
2656    }
2657
2658    pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2659        self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2660    }
2661
2662    pub async fn put_optimization_campaign(
2663        &self,
2664        mut campaign: OptimizationCampaignRecord,
2665    ) -> anyhow::Result<OptimizationCampaignRecord> {
2666        if campaign.optimization_id.trim().is_empty() {
2667            anyhow::bail!("optimization_id is required");
2668        }
2669        if campaign.source_workflow_id.trim().is_empty() {
2670            anyhow::bail!("source_workflow_id is required");
2671        }
2672        if campaign.name.trim().is_empty() {
2673            anyhow::bail!("name is required");
2674        }
2675        let now = now_ms();
2676        if campaign.created_at_ms == 0 {
2677            campaign.created_at_ms = now;
2678        }
2679        campaign.updated_at_ms = now;
2680        campaign.source_workflow_snapshot_hash =
2681            optimization_snapshot_hash(&campaign.source_workflow_snapshot);
2682        campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
2683        self.optimization_campaigns
2684            .write()
2685            .await
2686            .insert(campaign.optimization_id.clone(), campaign.clone());
2687        self.persist_optimization_campaigns().await?;
2688        Ok(campaign)
2689    }
2690
2691    pub async fn get_optimization_campaign(
2692        &self,
2693        optimization_id: &str,
2694    ) -> Option<OptimizationCampaignRecord> {
2695        self.optimization_campaigns
2696            .read()
2697            .await
2698            .get(optimization_id)
2699            .cloned()
2700    }
2701
2702    pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
2703        let mut rows = self
2704            .optimization_campaigns
2705            .read()
2706            .await
2707            .values()
2708            .cloned()
2709            .collect::<Vec<_>>();
2710        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2711        rows
2712    }
2713
2714    pub async fn put_optimization_experiment(
2715        &self,
2716        mut experiment: OptimizationExperimentRecord,
2717    ) -> anyhow::Result<OptimizationExperimentRecord> {
2718        if experiment.experiment_id.trim().is_empty() {
2719            anyhow::bail!("experiment_id is required");
2720        }
2721        if experiment.optimization_id.trim().is_empty() {
2722            anyhow::bail!("optimization_id is required");
2723        }
2724        let now = now_ms();
2725        if experiment.created_at_ms == 0 {
2726            experiment.created_at_ms = now;
2727        }
2728        experiment.updated_at_ms = now;
2729        experiment.candidate_snapshot_hash =
2730            optimization_snapshot_hash(&experiment.candidate_snapshot);
2731        self.optimization_experiments
2732            .write()
2733            .await
2734            .insert(experiment.experiment_id.clone(), experiment.clone());
2735        self.persist_optimization_experiments().await?;
2736        Ok(experiment)
2737    }
2738
2739    pub async fn get_optimization_experiment(
2740        &self,
2741        optimization_id: &str,
2742        experiment_id: &str,
2743    ) -> Option<OptimizationExperimentRecord> {
2744        self.optimization_experiments
2745            .read()
2746            .await
2747            .get(experiment_id)
2748            .filter(|row| row.optimization_id == optimization_id)
2749            .cloned()
2750    }
2751
2752    pub async fn list_optimization_experiments(
2753        &self,
2754        optimization_id: &str,
2755    ) -> Vec<OptimizationExperimentRecord> {
2756        let mut rows = self
2757            .optimization_experiments
2758            .read()
2759            .await
2760            .values()
2761            .filter(|row| row.optimization_id == optimization_id)
2762            .cloned()
2763            .collect::<Vec<_>>();
2764        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2765        rows
2766    }
2767
2768    pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
2769        self.optimization_experiments
2770            .read()
2771            .await
2772            .values()
2773            .filter(|row| row.optimization_id == optimization_id)
2774            .count()
2775    }
2776
2777    fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
2778        matches!(
2779            status,
2780            crate::AutomationRunStatus::Completed
2781                | crate::AutomationRunStatus::Blocked
2782                | crate::AutomationRunStatus::Failed
2783                | crate::AutomationRunStatus::Cancelled
2784        )
2785    }
2786
2787    fn optimization_consecutive_failure_count(
2788        experiments: &[OptimizationExperimentRecord],
2789    ) -> usize {
2790        let mut ordered = experiments.to_vec();
2791        ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
2792        ordered
2793            .iter()
2794            .rev()
2795            .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
2796            .count()
2797    }
2798
2799    fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
2800        match field {
2801            OptimizationMutableField::Objective => "objective",
2802            OptimizationMutableField::OutputContractSummaryGuidance => {
2803                "output_contract.summary_guidance"
2804            }
2805            OptimizationMutableField::TimeoutMs => "timeout_ms",
2806            OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
2807            OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
2808        }
2809    }
2810
2811    fn optimization_node_field_value(
2812        node: &crate::AutomationFlowNode,
2813        field: OptimizationMutableField,
2814    ) -> Result<Value, String> {
2815        match field {
2816            OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
2817            OptimizationMutableField::OutputContractSummaryGuidance => node
2818                .output_contract
2819                .as_ref()
2820                .and_then(|contract| contract.summary_guidance.clone())
2821                .map(Value::String)
2822                .ok_or_else(|| {
2823                    format!(
2824                        "node `{}` is missing output_contract.summary_guidance",
2825                        node.node_id
2826                    )
2827                }),
2828            OptimizationMutableField::TimeoutMs => node
2829                .timeout_ms
2830                .map(|value| json!(value))
2831                .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
2832            OptimizationMutableField::RetryPolicyMaxAttempts => node
2833                .retry_policy
2834                .as_ref()
2835                .and_then(Value::as_object)
2836                .and_then(|policy| policy.get("max_attempts"))
2837                .cloned()
2838                .ok_or_else(|| {
2839                    format!(
2840                        "node `{}` is missing retry_policy.max_attempts",
2841                        node.node_id
2842                    )
2843                }),
2844            OptimizationMutableField::RetryPolicyRetries => node
2845                .retry_policy
2846                .as_ref()
2847                .and_then(Value::as_object)
2848                .and_then(|policy| policy.get("retries"))
2849                .cloned()
2850                .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
2851        }
2852    }
2853
2854    fn set_optimization_node_field_value(
2855        node: &mut crate::AutomationFlowNode,
2856        field: OptimizationMutableField,
2857        value: &Value,
2858    ) -> Result<(), String> {
2859        match field {
2860            OptimizationMutableField::Objective => {
2861                node.objective = value
2862                    .as_str()
2863                    .ok_or_else(|| "objective apply value must be a string".to_string())?
2864                    .to_string();
2865            }
2866            OptimizationMutableField::OutputContractSummaryGuidance => {
2867                let guidance = value
2868                    .as_str()
2869                    .ok_or_else(|| {
2870                        "output_contract.summary_guidance apply value must be a string".to_string()
2871                    })?
2872                    .to_string();
2873                let contract = node.output_contract.as_mut().ok_or_else(|| {
2874                    format!(
2875                        "node `{}` is missing output_contract for apply",
2876                        node.node_id
2877                    )
2878                })?;
2879                contract.summary_guidance = Some(guidance);
2880            }
2881            OptimizationMutableField::TimeoutMs => {
2882                node.timeout_ms = Some(
2883                    value
2884                        .as_u64()
2885                        .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
2886                );
2887            }
2888            OptimizationMutableField::RetryPolicyMaxAttempts => {
2889                let next = value.as_i64().ok_or_else(|| {
2890                    "retry_policy.max_attempts apply value must be an integer".to_string()
2891                })?;
2892                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
2893                let object = policy.as_object_mut().ok_or_else(|| {
2894                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
2895                })?;
2896                object.insert("max_attempts".to_string(), json!(next));
2897            }
2898            OptimizationMutableField::RetryPolicyRetries => {
2899                let next = value.as_i64().ok_or_else(|| {
2900                    "retry_policy.retries apply value must be an integer".to_string()
2901                })?;
2902                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
2903                let object = policy.as_object_mut().ok_or_else(|| {
2904                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
2905                })?;
2906                object.insert("retries".to_string(), json!(next));
2907            }
2908        }
2909        Ok(())
2910    }
2911
2912    fn append_optimization_apply_metadata(
2913        metadata: Option<Value>,
2914        record: Value,
2915    ) -> Result<Option<Value>, String> {
2916        let mut root = match metadata {
2917            Some(Value::Object(map)) => map,
2918            Some(_) => return Err("automation metadata must be a JSON object".to_string()),
2919            None => serde_json::Map::new(),
2920        };
2921        let history = root
2922            .entry("optimization_apply_history".to_string())
2923            .or_insert_with(|| Value::Array(Vec::new()));
2924        let Some(entries) = history.as_array_mut() else {
2925            return Err("optimization_apply_history metadata must be an array".to_string());
2926        };
2927        entries.push(record.clone());
2928        root.insert("last_optimization_apply".to_string(), record);
2929        Ok(Some(Value::Object(root)))
2930    }
2931
2932    fn build_optimization_apply_patch(
2933        baseline: &crate::AutomationV2Spec,
2934        candidate: &crate::AutomationV2Spec,
2935        mutation: &crate::OptimizationValidatedMutation,
2936        approved_at_ms: u64,
2937    ) -> Result<Value, String> {
2938        let baseline_node = baseline
2939            .flow
2940            .nodes
2941            .iter()
2942            .find(|node| node.node_id == mutation.node_id)
2943            .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
2944        let candidate_node = candidate
2945            .flow
2946            .nodes
2947            .iter()
2948            .find(|node| node.node_id == mutation.node_id)
2949            .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
2950        let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
2951        let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
2952        Ok(json!({
2953            "node_id": mutation.node_id,
2954            "field": mutation.field,
2955            "field_path": Self::optimization_mutation_field_path(mutation.field),
2956            "expected_before": before,
2957            "apply_value": after,
2958            "approved_at_ms": approved_at_ms,
2959        }))
2960    }
2961
2962    pub async fn apply_optimization_winner(
2963        &self,
2964        optimization_id: &str,
2965        experiment_id: &str,
2966    ) -> Result<
2967        (
2968            OptimizationCampaignRecord,
2969            OptimizationExperimentRecord,
2970            crate::AutomationV2Spec,
2971        ),
2972        String,
2973    > {
2974        let campaign = self
2975            .get_optimization_campaign(optimization_id)
2976            .await
2977            .ok_or_else(|| "optimization not found".to_string())?;
2978        let mut experiment = self
2979            .get_optimization_experiment(optimization_id, experiment_id)
2980            .await
2981            .ok_or_else(|| "experiment not found".to_string())?;
2982        if experiment.status != OptimizationExperimentStatus::PromotionApproved {
2983            return Err("only approved winner experiments may be applied".to_string());
2984        }
2985        if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
2986            return Err(
2987                "only the latest approved winner may be applied to the live workflow".to_string(),
2988            );
2989        }
2990        let patch = experiment
2991            .metadata
2992            .as_ref()
2993            .and_then(|metadata| metadata.get("apply_patch"))
2994            .cloned()
2995            .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
2996        let node_id = patch
2997            .get("node_id")
2998            .and_then(Value::as_str)
2999            .map(str::to_string)
3000            .filter(|value| !value.is_empty())
3001            .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
3002        let field: OptimizationMutableField = serde_json::from_value(
3003            patch
3004                .get("field")
3005                .cloned()
3006                .ok_or_else(|| "apply_patch.field is required".to_string())?,
3007        )
3008        .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
3009        let expected_before = patch
3010            .get("expected_before")
3011            .cloned()
3012            .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
3013        let apply_value = patch
3014            .get("apply_value")
3015            .cloned()
3016            .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
3017        let mut live = self
3018            .get_automation_v2(&campaign.source_workflow_id)
3019            .await
3020            .ok_or_else(|| "source workflow not found".to_string())?;
3021        let current_value = {
3022            let live_node = live
3023                .flow
3024                .nodes
3025                .iter()
3026                .find(|node| node.node_id == node_id)
3027                .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3028            Self::optimization_node_field_value(live_node, field)?
3029        };
3030        if current_value != expected_before {
3031            return Err(format!(
3032                "live workflow drift detected for node `{node_id}` {}",
3033                Self::optimization_mutation_field_path(field)
3034            ));
3035        }
3036        let live_node = live
3037            .flow
3038            .nodes
3039            .iter_mut()
3040            .find(|node| node.node_id == node_id)
3041            .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3042        Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
3043        let applied_at_ms = now_ms();
3044        let apply_record = json!({
3045            "optimization_id": campaign.optimization_id,
3046            "experiment_id": experiment.experiment_id,
3047            "node_id": node_id,
3048            "field": field,
3049            "field_path": Self::optimization_mutation_field_path(field),
3050            "previous_value": expected_before,
3051            "new_value": apply_value,
3052            "applied_at_ms": applied_at_ms,
3053        });
3054        live.metadata =
3055            Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
3056        let stored_live = self
3057            .put_automation_v2(live)
3058            .await
3059            .map_err(|error| error.to_string())?;
3060        let mut metadata = match experiment.metadata.take() {
3061            Some(Value::Object(map)) => map,
3062            Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
3063            None => serde_json::Map::new(),
3064        };
3065        metadata.insert(
3066            "applied_to_live".to_string(),
3067            json!({
3068                "automation_id": stored_live.automation_id,
3069                "applied_at_ms": applied_at_ms,
3070                "field": field,
3071                "node_id": node_id,
3072            }),
3073        );
3074        experiment.metadata = Some(Value::Object(metadata));
3075        let stored_experiment = self
3076            .put_optimization_experiment(experiment)
3077            .await
3078            .map_err(|error| error.to_string())?;
3079        Ok((campaign, stored_experiment, stored_live))
3080    }
3081
3082    fn optimization_objective_hint(text: &str) -> String {
3083        let cleaned = text
3084            .lines()
3085            .map(str::trim)
3086            .filter(|line| !line.is_empty() && !line.starts_with('#'))
3087            .collect::<Vec<_>>()
3088            .join(" ");
3089        let hint = if cleaned.is_empty() {
3090            "Prioritize validator-complete output with explicit evidence."
3091        } else {
3092            cleaned.as_str()
3093        };
3094        let trimmed = hint.trim();
3095        let clipped = if trimmed.len() > 140 {
3096            trimmed[..140].trim_end()
3097        } else {
3098            trimmed
3099        };
3100        let mut sentence = clipped.trim_end_matches('.').to_string();
3101        if sentence.is_empty() {
3102            sentence = "Prioritize validator-complete output with explicit evidence".to_string();
3103        }
3104        sentence.push('.');
3105        sentence
3106    }
3107
3108    fn build_phase1_candidate_options(
3109        baseline: &crate::AutomationV2Spec,
3110        phase1: &crate::OptimizationPhase1Config,
3111    ) -> Vec<(
3112        crate::AutomationV2Spec,
3113        crate::OptimizationValidatedMutation,
3114    )> {
3115        let mut options = Vec::new();
3116        let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
3117        for (index, node) in baseline.flow.nodes.iter().enumerate() {
3118            if phase1
3119                .mutation_policy
3120                .allowed_text_fields
3121                .contains(&OptimizationMutableField::Objective)
3122            {
3123                let addition = if node.objective.contains(&hint) {
3124                    "Prioritize validator-complete output with concrete evidence."
3125                } else {
3126                    &hint
3127                };
3128                let mut candidate = baseline.clone();
3129                candidate.flow.nodes[index].objective =
3130                    format!("{} {}", node.objective.trim(), addition.trim())
3131                        .trim()
3132                        .to_string();
3133                if let Ok(validated) =
3134                    validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3135                {
3136                    options.push((candidate, validated));
3137                }
3138            }
3139            if phase1
3140                .mutation_policy
3141                .allowed_text_fields
3142                .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
3143            {
3144                if let Some(summary_guidance) = node
3145                    .output_contract
3146                    .as_ref()
3147                    .and_then(|contract| contract.summary_guidance.as_ref())
3148                {
3149                    let addition = if summary_guidance.contains("Cite concrete evidence") {
3150                        "Keep evidence explicit."
3151                    } else {
3152                        "Cite concrete evidence in the summary."
3153                    };
3154                    let mut candidate = baseline.clone();
3155                    if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
3156                        contract.summary_guidance = Some(
3157                            format!("{} {}", summary_guidance.trim(), addition)
3158                                .trim()
3159                                .to_string(),
3160                        );
3161                    }
3162                    if let Ok(validated) =
3163                        validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3164                    {
3165                        options.push((candidate, validated));
3166                    }
3167                }
3168            }
3169            if phase1
3170                .mutation_policy
3171                .allowed_knob_fields
3172                .contains(&OptimizationMutableField::TimeoutMs)
3173            {
3174                if let Some(timeout_ms) = node.timeout_ms {
3175                    let delta_by_percent = ((timeout_ms as f64)
3176                        * phase1.mutation_policy.timeout_delta_percent)
3177                        .round() as u64;
3178                    let delta = delta_by_percent
3179                        .min(phase1.mutation_policy.timeout_delta_ms)
3180                        .max(1);
3181                    let next = timeout_ms
3182                        .saturating_add(delta)
3183                        .min(phase1.mutation_policy.timeout_max_ms);
3184                    if next != timeout_ms {
3185                        let mut candidate = baseline.clone();
3186                        candidate.flow.nodes[index].timeout_ms = Some(next);
3187                        if let Ok(validated) =
3188                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3189                        {
3190                            options.push((candidate, validated));
3191                        }
3192                    }
3193                }
3194            }
3195            if phase1
3196                .mutation_policy
3197                .allowed_knob_fields
3198                .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
3199            {
3200                let current = node
3201                    .retry_policy
3202                    .as_ref()
3203                    .and_then(Value::as_object)
3204                    .and_then(|row| row.get("max_attempts"))
3205                    .and_then(Value::as_i64);
3206                if let Some(before) = current {
3207                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3208                    if next != before {
3209                        let mut candidate = baseline.clone();
3210                        let policy = candidate.flow.nodes[index]
3211                            .retry_policy
3212                            .get_or_insert_with(|| json!({}));
3213                        if let Some(object) = policy.as_object_mut() {
3214                            object.insert("max_attempts".to_string(), json!(next));
3215                        }
3216                        if let Ok(validated) =
3217                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3218                        {
3219                            options.push((candidate, validated));
3220                        }
3221                    }
3222                }
3223            }
3224            if phase1
3225                .mutation_policy
3226                .allowed_knob_fields
3227                .contains(&OptimizationMutableField::RetryPolicyRetries)
3228            {
3229                let current = node
3230                    .retry_policy
3231                    .as_ref()
3232                    .and_then(Value::as_object)
3233                    .and_then(|row| row.get("retries"))
3234                    .and_then(Value::as_i64);
3235                if let Some(before) = current {
3236                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3237                    if next != before {
3238                        let mut candidate = baseline.clone();
3239                        let policy = candidate.flow.nodes[index]
3240                            .retry_policy
3241                            .get_or_insert_with(|| json!({}));
3242                        if let Some(object) = policy.as_object_mut() {
3243                            object.insert("retries".to_string(), json!(next));
3244                        }
3245                        if let Ok(validated) =
3246                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3247                        {
3248                            options.push((candidate, validated));
3249                        }
3250                    }
3251                }
3252            }
3253        }
3254        options
3255    }
3256
3257    async fn maybe_queue_phase1_candidate_experiment(
3258        &self,
3259        campaign: &mut OptimizationCampaignRecord,
3260    ) -> Result<bool, String> {
3261        let Some(phase1) = campaign.phase1.as_ref() else {
3262            return Ok(false);
3263        };
3264        let experiment_count = self
3265            .count_optimization_experiments(&campaign.optimization_id)
3266            .await;
3267        if experiment_count >= phase1.budget.max_experiments as usize {
3268            campaign.status = OptimizationCampaignStatus::Completed;
3269            campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
3270            campaign.updated_at_ms = now_ms();
3271            return Ok(true);
3272        }
3273        if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
3274        {
3275            return Ok(false);
3276        }
3277        let existing = self
3278            .list_optimization_experiments(&campaign.optimization_id)
3279            .await;
3280        let active_eval_exists = existing.iter().any(|experiment| {
3281            matches!(experiment.status, OptimizationExperimentStatus::Draft)
3282                && experiment
3283                    .metadata
3284                    .as_ref()
3285                    .and_then(|metadata| metadata.get("eval_run_id"))
3286                    .and_then(Value::as_str)
3287                    .is_some()
3288        });
3289        if active_eval_exists {
3290            return Ok(false);
3291        }
3292        let existing_hashes = existing
3293            .iter()
3294            .map(|experiment| experiment.candidate_snapshot_hash.clone())
3295            .collect::<std::collections::HashSet<_>>();
3296        let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
3297        let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
3298            !existing_hashes.contains(&optimization_snapshot_hash(candidate))
3299        }) else {
3300            campaign.status = OptimizationCampaignStatus::Completed;
3301            campaign.last_pause_reason = Some(
3302                "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
3303            );
3304            campaign.updated_at_ms = now_ms();
3305            return Ok(true);
3306        };
3307        let eval_run = self
3308            .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
3309            .await
3310            .map_err(|error| error.to_string())?;
3311        let now = now_ms();
3312        let experiment = OptimizationExperimentRecord {
3313            experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
3314            optimization_id: campaign.optimization_id.clone(),
3315            status: OptimizationExperimentStatus::Draft,
3316            candidate_snapshot: candidate_snapshot.clone(),
3317            candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
3318            baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
3319            mutation_summary: Some(mutation.summary.clone()),
3320            metrics: None,
3321            phase1_metrics: None,
3322            promotion_recommendation: None,
3323            promotion_decision: None,
3324            created_at_ms: now,
3325            updated_at_ms: now,
3326            metadata: Some(json!({
3327                "generator": "phase1_deterministic_v1",
3328                "eval_run_id": eval_run.run_id,
3329                "mutation": mutation,
3330            })),
3331        };
3332        self.put_optimization_experiment(experiment)
3333            .await
3334            .map_err(|error| error.to_string())?;
3335        campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
3336        campaign.updated_at_ms = now_ms();
3337        Ok(true)
3338    }
3339
3340    async fn reconcile_phase1_candidate_experiments(
3341        &self,
3342        campaign: &mut OptimizationCampaignRecord,
3343    ) -> Result<bool, String> {
3344        let Some(phase1) = campaign.phase1.as_ref() else {
3345            return Ok(false);
3346        };
3347        let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
3348            return Ok(false);
3349        };
3350        let experiments = self
3351            .list_optimization_experiments(&campaign.optimization_id)
3352            .await;
3353        let mut changed = false;
3354        for mut experiment in experiments {
3355            if experiment.status != OptimizationExperimentStatus::Draft {
3356                continue;
3357            }
3358            let Some(eval_run_id) = experiment
3359                .metadata
3360                .as_ref()
3361                .and_then(|metadata| metadata.get("eval_run_id"))
3362                .and_then(Value::as_str)
3363                .map(str::to_string)
3364            else {
3365                continue;
3366            };
3367            let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
3368                continue;
3369            };
3370            if !Self::automation_run_is_terminal(&run.status) {
3371                continue;
3372            }
3373            if run.status != crate::AutomationRunStatus::Completed {
3374                experiment.status = OptimizationExperimentStatus::Failed;
3375                let mut metadata = match experiment.metadata.take() {
3376                    Some(Value::Object(map)) => map,
3377                    Some(_) => serde_json::Map::new(),
3378                    None => serde_json::Map::new(),
3379                };
3380                metadata.insert(
3381                    "eval_failure".to_string(),
3382                    json!({
3383                        "run_id": run.run_id,
3384                        "status": run.status,
3385                    }),
3386                );
3387                experiment.metadata = Some(Value::Object(metadata));
3388                self.put_optimization_experiment(experiment)
3389                    .await
3390                    .map_err(|error| error.to_string())?;
3391                changed = true;
3392                continue;
3393            }
3394            if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3395                experiment.status = OptimizationExperimentStatus::Failed;
3396                let mut metadata = match experiment.metadata.take() {
3397                    Some(Value::Object(map)) => map,
3398                    Some(_) => serde_json::Map::new(),
3399                    None => serde_json::Map::new(),
3400                };
3401                metadata.insert(
3402                    "eval_failure".to_string(),
3403                    json!({
3404                        "run_id": run.run_id,
3405                        "status": run.status,
3406                        "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
3407                    }),
3408                );
3409                experiment.metadata = Some(Value::Object(metadata));
3410                self.put_optimization_experiment(experiment)
3411                    .await
3412                    .map_err(|error| error.to_string())?;
3413                changed = true;
3414                continue;
3415            }
3416            let metrics =
3417                match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
3418                    Ok(metrics) => metrics,
3419                    Err(error) => {
3420                        experiment.status = OptimizationExperimentStatus::Failed;
3421                        let mut metadata = match experiment.metadata.take() {
3422                            Some(Value::Object(map)) => map,
3423                            Some(_) => serde_json::Map::new(),
3424                            None => serde_json::Map::new(),
3425                        };
3426                        metadata.insert(
3427                            "eval_failure".to_string(),
3428                            json!({
3429                                "run_id": run.run_id,
3430                                "status": run.status,
3431                                "reason": error,
3432                            }),
3433                        );
3434                        experiment.metadata = Some(Value::Object(metadata));
3435                        self.put_optimization_experiment(experiment)
3436                            .await
3437                            .map_err(|error| error.to_string())?;
3438                        changed = true;
3439                        continue;
3440                    }
3441                };
3442            let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
3443            experiment.phase1_metrics = Some(metrics.clone());
3444            experiment.metrics = Some(json!({
3445                "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
3446                "unmet_requirement_count": metrics.unmet_requirement_count,
3447                "blocked_node_rate": metrics.blocked_node_rate,
3448                "budget_within_limits": metrics.budget_within_limits,
3449            }));
3450            experiment.promotion_recommendation = Some(
3451                match decision.decision {
3452                    OptimizationPromotionDecisionKind::Promote => "promote",
3453                    OptimizationPromotionDecisionKind::Discard => "discard",
3454                    OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3455                        "needs_operator_review"
3456                    }
3457                }
3458                .to_string(),
3459            );
3460            experiment.promotion_decision = Some(decision.clone());
3461            match decision.decision {
3462                OptimizationPromotionDecisionKind::Promote
3463                | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3464                    experiment.status = OptimizationExperimentStatus::PromotionRecommended;
3465                    campaign.pending_promotion_experiment_id =
3466                        Some(experiment.experiment_id.clone());
3467                    campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
3468                    campaign.last_pause_reason = Some(decision.reason.clone());
3469                }
3470                OptimizationPromotionDecisionKind::Discard => {
3471                    experiment.status = OptimizationExperimentStatus::Discarded;
3472                    if campaign.status == OptimizationCampaignStatus::Running {
3473                        campaign.last_pause_reason = Some(decision.reason.clone());
3474                    }
3475                }
3476            }
3477            self.put_optimization_experiment(experiment)
3478                .await
3479                .map_err(|error| error.to_string())?;
3480            changed = true;
3481        }
3482        let refreshed = self
3483            .list_optimization_experiments(&campaign.optimization_id)
3484            .await;
3485        let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
3486        if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
3487            && phase1.budget.max_consecutive_failures > 0
3488        {
3489            campaign.status = OptimizationCampaignStatus::Failed;
3490            campaign.last_pause_reason = Some(format!(
3491                "phase 1 candidate evaluations reached {} consecutive failures",
3492                consecutive_failures
3493            ));
3494            changed = true;
3495        }
3496        Ok(changed)
3497    }
3498
3499    async fn reconcile_pending_baseline_replays(
3500        &self,
3501        campaign: &mut OptimizationCampaignRecord,
3502    ) -> Result<bool, String> {
3503        let Some(phase1) = campaign.phase1.as_ref() else {
3504            return Ok(false);
3505        };
3506        let mut changed = false;
3507        let mut remaining = Vec::new();
3508        for run_id in campaign.pending_baseline_run_ids.clone() {
3509            let Some(run) = self.get_automation_v2_run(&run_id).await else {
3510                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3511                campaign.last_pause_reason = Some(format!(
3512                    "baseline replay run `{run_id}` was not found during optimization reconciliation"
3513                ));
3514                changed = true;
3515                continue;
3516            };
3517            if !Self::automation_run_is_terminal(&run.status) {
3518                remaining.push(run_id);
3519                continue;
3520            }
3521            if run.status != crate::AutomationRunStatus::Completed {
3522                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3523                campaign.last_pause_reason = Some(format!(
3524                    "baseline replay run `{}` finished with status `{:?}`",
3525                    run.run_id, run.status
3526                ));
3527                changed = true;
3528                continue;
3529            }
3530            if run.automation_id != campaign.source_workflow_id {
3531                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3532                campaign.last_pause_reason = Some(
3533                    "baseline replay run must belong to the optimization source workflow"
3534                        .to_string(),
3535                );
3536                changed = true;
3537                continue;
3538            }
3539            let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3540                "baseline replay run must include an automation snapshot".to_string()
3541            })?;
3542            if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3543                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3544                campaign.last_pause_reason = Some(
3545                    "baseline replay run does not match the current campaign baseline snapshot"
3546                        .to_string(),
3547                );
3548                changed = true;
3549                continue;
3550            }
3551            let metrics =
3552                derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3553            let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3554            campaign
3555                .baseline_replays
3556                .push(OptimizationBaselineReplayRecord {
3557                    replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3558                    automation_run_id: Some(run.run_id.clone()),
3559                    phase1_metrics: metrics,
3560                    validator_case_outcomes,
3561                    experiment_count_at_recording: self
3562                        .count_optimization_experiments(&campaign.optimization_id)
3563                        .await as u64,
3564                    recorded_at_ms: now_ms(),
3565                });
3566            changed = true;
3567        }
3568        if remaining != campaign.pending_baseline_run_ids {
3569            campaign.pending_baseline_run_ids = remaining;
3570            changed = true;
3571        }
3572        Ok(changed)
3573    }
3574
3575    pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
3576        let campaigns = self.list_optimization_campaigns().await;
3577        let mut updated = 0usize;
3578        for campaign in campaigns {
3579            let Some(mut latest) = self
3580                .get_optimization_campaign(&campaign.optimization_id)
3581                .await
3582            else {
3583                continue;
3584            };
3585            let Some(phase1) = latest.phase1.clone() else {
3586                continue;
3587            };
3588            let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
3589            changed |= self
3590                .reconcile_phase1_candidate_experiments(&mut latest)
3591                .await?;
3592            let experiment_count = self
3593                .count_optimization_experiments(&latest.optimization_id)
3594                .await;
3595            if latest.pending_baseline_run_ids.is_empty() {
3596                if phase1_baseline_replay_due(
3597                    &latest.baseline_replays,
3598                    latest.pending_baseline_run_ids.len(),
3599                    &phase1,
3600                    experiment_count,
3601                    now_ms(),
3602                ) {
3603                    if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
3604                        latest.status = OptimizationCampaignStatus::Draft;
3605                        changed = true;
3606                    }
3607                } else if latest.baseline_replays.len()
3608                    >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
3609                {
3610                    match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
3611                        Ok(metrics) => {
3612                            if latest.baseline_metrics.as_ref() != Some(&metrics) {
3613                                latest.baseline_metrics = Some(metrics);
3614                                changed = true;
3615                            }
3616                            if matches!(
3617                                latest.status,
3618                                OptimizationCampaignStatus::Draft
3619                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
3620                            ) || (latest.status == OptimizationCampaignStatus::Running
3621                                && latest.last_pause_reason.is_some())
3622                            {
3623                                latest.status = OptimizationCampaignStatus::Running;
3624                                latest.last_pause_reason = None;
3625                                changed = true;
3626                            }
3627                        }
3628                        Err(error) => {
3629                            if matches!(
3630                                latest.status,
3631                                OptimizationCampaignStatus::Draft
3632                                    | OptimizationCampaignStatus::Running
3633                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
3634                            ) && (latest.status
3635                                != OptimizationCampaignStatus::PausedEvaluatorUnstable
3636                                || latest.last_pause_reason.as_deref() != Some(error.as_str()))
3637                            {
3638                                latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3639                                latest.last_pause_reason = Some(error);
3640                                changed = true;
3641                            }
3642                        }
3643                    }
3644                }
3645            } else if latest.last_pause_reason.as_deref()
3646                != Some("waiting for phase 1 baseline replay completion")
3647            {
3648                latest.last_pause_reason =
3649                    Some("waiting for phase 1 baseline replay completion".to_string());
3650                changed = true;
3651            }
3652            if latest.status == OptimizationCampaignStatus::Running
3653                && latest.pending_baseline_run_ids.is_empty()
3654            {
3655                changed |= self
3656                    .maybe_queue_phase1_candidate_experiment(&mut latest)
3657                    .await?;
3658            }
3659            if changed {
3660                self.put_optimization_campaign(latest)
3661                    .await
3662                    .map_err(|error| error.to_string())?;
3663                updated = updated.saturating_add(1);
3664            }
3665        }
3666        Ok(updated)
3667    }
3668
3669    async fn maybe_queue_phase1_baseline_replay(
3670        &self,
3671        campaign: &mut OptimizationCampaignRecord,
3672    ) -> Result<bool, String> {
3673        let Some(phase1) = campaign.phase1.as_ref() else {
3674            return Ok(false);
3675        };
3676        if !campaign.pending_baseline_run_ids.is_empty() {
3677            campaign.last_pause_reason =
3678                Some("waiting for phase 1 baseline replay completion".into());
3679            campaign.updated_at_ms = now_ms();
3680            return Ok(true);
3681        }
3682        let experiment_count = self
3683            .count_optimization_experiments(&campaign.optimization_id)
3684            .await;
3685        if !phase1_baseline_replay_due(
3686            &campaign.baseline_replays,
3687            campaign.pending_baseline_run_ids.len(),
3688            phase1,
3689            experiment_count,
3690            now_ms(),
3691        ) {
3692            return Ok(false);
3693        }
3694        let replay_run = self
3695            .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
3696            .await
3697            .map_err(|error| error.to_string())?;
3698        if !campaign
3699            .pending_baseline_run_ids
3700            .iter()
3701            .any(|value| value == &replay_run.run_id)
3702        {
3703            campaign
3704                .pending_baseline_run_ids
3705                .push(replay_run.run_id.clone());
3706        }
3707        campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
3708        campaign.updated_at_ms = now_ms();
3709        Ok(true)
3710    }
3711
3712    async fn maybe_queue_initial_phase1_baseline_replay(
3713        &self,
3714        campaign: &mut OptimizationCampaignRecord,
3715    ) -> Result<bool, String> {
3716        let Some(phase1) = campaign.phase1.as_ref() else {
3717            return Ok(false);
3718        };
3719        let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
3720        if campaign.baseline_replays.len() >= required_runs {
3721            return Ok(false);
3722        }
3723        self.maybe_queue_phase1_baseline_replay(campaign).await
3724    }
3725
3726    pub async fn apply_optimization_action(
3727        &self,
3728        optimization_id: &str,
3729        action: &str,
3730        experiment_id: Option<&str>,
3731        run_id: Option<&str>,
3732        reason: Option<&str>,
3733    ) -> Result<OptimizationCampaignRecord, String> {
3734        let normalized = action.trim().to_ascii_lowercase();
3735        let mut campaign = self
3736            .get_optimization_campaign(optimization_id)
3737            .await
3738            .ok_or_else(|| "optimization not found".to_string())?;
3739        match normalized.as_str() {
3740            "start" => {
3741                if campaign.phase1.is_some() {
3742                    if self
3743                        .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3744                        .await?
3745                    {
3746                        campaign.status = OptimizationCampaignStatus::Draft;
3747                    } else {
3748                        let phase1 = campaign
3749                            .phase1
3750                            .as_ref()
3751                            .ok_or_else(|| "phase 1 config is required".to_string())?;
3752                        match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
3753                            Ok(metrics) => {
3754                                campaign.baseline_metrics = Some(metrics);
3755                                campaign.status = OptimizationCampaignStatus::Running;
3756                                campaign.last_pause_reason = None;
3757                            }
3758                            Err(error) => {
3759                                campaign.status =
3760                                    OptimizationCampaignStatus::PausedEvaluatorUnstable;
3761                                campaign.last_pause_reason = Some(error);
3762                            }
3763                        }
3764                    }
3765                } else {
3766                    campaign.status = OptimizationCampaignStatus::Running;
3767                    campaign.last_pause_reason = None;
3768                }
3769            }
3770            "pause" => {
3771                campaign.status = OptimizationCampaignStatus::PausedManual;
3772                campaign.last_pause_reason = reason
3773                    .map(str::trim)
3774                    .filter(|value| !value.is_empty())
3775                    .map(str::to_string);
3776            }
3777            "resume" => {
3778                if self
3779                    .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3780                    .await?
3781                {
3782                    campaign.status = OptimizationCampaignStatus::Draft;
3783                } else {
3784                    campaign.status = OptimizationCampaignStatus::Running;
3785                    campaign.last_pause_reason = None;
3786                }
3787            }
3788            "queue_baseline_replay" => {
3789                let replay_run = self
3790                    .create_automation_v2_run(
3791                        &campaign.baseline_snapshot,
3792                        "optimization_baseline_replay",
3793                    )
3794                    .await
3795                    .map_err(|error| error.to_string())?;
3796                if !campaign
3797                    .pending_baseline_run_ids
3798                    .iter()
3799                    .any(|value| value == &replay_run.run_id)
3800                {
3801                    campaign
3802                        .pending_baseline_run_ids
3803                        .push(replay_run.run_id.clone());
3804                }
3805                campaign.updated_at_ms = now_ms();
3806            }
3807            "record_baseline_replay" => {
3808                let run_id = run_id
3809                    .map(str::trim)
3810                    .filter(|value| !value.is_empty())
3811                    .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
3812                let phase1 = campaign
3813                    .phase1
3814                    .as_ref()
3815                    .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
3816                let run = self
3817                    .get_automation_v2_run(run_id)
3818                    .await
3819                    .ok_or_else(|| "automation run not found".to_string())?;
3820                if run.automation_id != campaign.source_workflow_id {
3821                    return Err(
3822                        "baseline replay run must belong to the optimization source workflow"
3823                            .to_string(),
3824                    );
3825                }
3826                let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3827                    "baseline replay run must include an automation snapshot".to_string()
3828                })?;
3829                if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3830                    return Err(
3831                        "baseline replay run does not match the current campaign baseline snapshot"
3832                            .to_string(),
3833                    );
3834                }
3835                let metrics =
3836                    derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3837                let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3838                campaign
3839                    .baseline_replays
3840                    .push(OptimizationBaselineReplayRecord {
3841                        replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3842                        automation_run_id: Some(run.run_id.clone()),
3843                        phase1_metrics: metrics,
3844                        validator_case_outcomes,
3845                        experiment_count_at_recording: self
3846                            .count_optimization_experiments(&campaign.optimization_id)
3847                            .await as u64,
3848                        recorded_at_ms: now_ms(),
3849                    });
3850                campaign
3851                    .pending_baseline_run_ids
3852                    .retain(|value| value != run_id);
3853                campaign.updated_at_ms = now_ms();
3854            }
3855            "approve_winner" => {
3856                let experiment_id = experiment_id
3857                    .map(str::trim)
3858                    .filter(|value| !value.is_empty())
3859                    .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
3860                let mut experiment = self
3861                    .get_optimization_experiment(optimization_id, experiment_id)
3862                    .await
3863                    .ok_or_else(|| "experiment not found".to_string())?;
3864                if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3865                    return Err(
3866                        "experiment baseline_snapshot_hash does not match current campaign baseline"
3867                            .to_string(),
3868                    );
3869                }
3870                if let Some(phase1) = campaign.phase1.as_ref() {
3871                    let validated = validate_phase1_candidate_mutation(
3872                        &campaign.baseline_snapshot,
3873                        &experiment.candidate_snapshot,
3874                        phase1,
3875                    )?;
3876                    if experiment.mutation_summary.is_none() {
3877                        experiment.mutation_summary = Some(validated.summary.clone());
3878                    }
3879                    let approved_at_ms = now_ms();
3880                    let apply_patch = Self::build_optimization_apply_patch(
3881                        &campaign.baseline_snapshot,
3882                        &experiment.candidate_snapshot,
3883                        &validated,
3884                        approved_at_ms,
3885                    )?;
3886                    let mut metadata = match experiment.metadata.take() {
3887                        Some(Value::Object(map)) => map,
3888                        Some(_) => {
3889                            return Err("experiment metadata must be a JSON object".to_string());
3890                        }
3891                        None => serde_json::Map::new(),
3892                    };
3893                    metadata.insert("apply_patch".to_string(), apply_patch);
3894                    experiment.metadata = Some(Value::Object(metadata));
3895                    if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
3896                        let candidate_metrics = experiment
3897                            .phase1_metrics
3898                            .clone()
3899                            .or_else(|| {
3900                                experiment
3901                                    .metrics
3902                                    .as_ref()
3903                                    .and_then(|metrics| parse_phase1_metrics(metrics).ok())
3904                            })
3905                            .ok_or_else(|| {
3906                                "phase 1 candidate is missing promotion metrics".to_string()
3907                            })?;
3908                        let decision =
3909                            evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
3910                        experiment.promotion_recommendation = Some(
3911                            match decision.decision {
3912                                OptimizationPromotionDecisionKind::Promote => "promote",
3913                                OptimizationPromotionDecisionKind::Discard => "discard",
3914                                OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3915                                    "needs_operator_review"
3916                                }
3917                            }
3918                            .to_string(),
3919                        );
3920                        experiment.promotion_decision = Some(decision.clone());
3921                        if decision.decision != OptimizationPromotionDecisionKind::Promote {
3922                            let _ = self
3923                                .put_optimization_experiment(experiment)
3924                                .await
3925                                .map_err(|e| e.to_string())?;
3926                            return Err(decision.reason);
3927                        }
3928                        campaign.baseline_metrics = Some(candidate_metrics);
3929                    }
3930                }
3931                campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
3932                campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
3933                campaign.baseline_replays.clear();
3934                campaign.pending_baseline_run_ids.clear();
3935                campaign.pending_promotion_experiment_id = None;
3936                campaign.status = OptimizationCampaignStatus::Draft;
3937                campaign.last_pause_reason = None;
3938                experiment.status = OptimizationExperimentStatus::PromotionApproved;
3939                let _ = self
3940                    .put_optimization_experiment(experiment)
3941                    .await
3942                    .map_err(|e| e.to_string())?;
3943            }
3944            "reject_winner" => {
3945                let experiment_id = experiment_id
3946                    .map(str::trim)
3947                    .filter(|value| !value.is_empty())
3948                    .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
3949                let mut experiment = self
3950                    .get_optimization_experiment(optimization_id, experiment_id)
3951                    .await
3952                    .ok_or_else(|| "experiment not found".to_string())?;
3953                campaign.pending_promotion_experiment_id = None;
3954                campaign.status = OptimizationCampaignStatus::Draft;
3955                campaign.last_pause_reason = reason
3956                    .map(str::trim)
3957                    .filter(|value| !value.is_empty())
3958                    .map(str::to_string);
3959                experiment.status = OptimizationExperimentStatus::PromotionRejected;
3960                let _ = self
3961                    .put_optimization_experiment(experiment)
3962                    .await
3963                    .map_err(|e| e.to_string())?;
3964            }
3965            _ => return Err("unsupported optimization action".to_string()),
3966        }
3967        self.put_optimization_campaign(campaign)
3968            .await
3969            .map_err(|e| e.to_string())
3970    }
3971
3972    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3973        let mut rows = self
3974            .automations_v2
3975            .read()
3976            .await
3977            .values()
3978            .cloned()
3979            .collect::<Vec<_>>();
3980        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3981        rows
3982    }
3983
3984    pub async fn delete_automation_v2(
3985        &self,
3986        automation_id: &str,
3987    ) -> anyhow::Result<Option<AutomationV2Spec>> {
3988        let removed = self.automations_v2.write().await.remove(automation_id);
3989        let removed_run_count = {
3990            let mut runs = self.automation_v2_runs.write().await;
3991            let before = runs.len();
3992            runs.retain(|_, run| run.automation_id != automation_id);
3993            before.saturating_sub(runs.len())
3994        };
3995        self.persist_automations_v2().await?;
3996        if removed_run_count > 0 {
3997            self.persist_automation_v2_runs().await?;
3998        }
3999        self.verify_automation_v2_persisted(automation_id, false)
4000            .await?;
4001        Ok(removed)
4002    }
4003
4004    pub async fn create_automation_v2_run(
4005        &self,
4006        automation: &AutomationV2Spec,
4007        trigger_type: &str,
4008    ) -> anyhow::Result<AutomationV2RunRecord> {
4009        let now = now_ms();
4010        let pending_nodes = automation
4011            .flow
4012            .nodes
4013            .iter()
4014            .map(|n| n.node_id.clone())
4015            .collect::<Vec<_>>();
4016        let run = AutomationV2RunRecord {
4017            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4018            automation_id: automation.automation_id.clone(),
4019            trigger_type: trigger_type.to_string(),
4020            status: AutomationRunStatus::Queued,
4021            created_at_ms: now,
4022            updated_at_ms: now,
4023            started_at_ms: None,
4024            finished_at_ms: None,
4025            active_session_ids: Vec::new(),
4026            latest_session_id: None,
4027            active_instance_ids: Vec::new(),
4028            checkpoint: AutomationRunCheckpoint {
4029                completed_nodes: Vec::new(),
4030                pending_nodes,
4031                node_outputs: std::collections::HashMap::new(),
4032                node_attempts: std::collections::HashMap::new(),
4033                blocked_nodes: Vec::new(),
4034                awaiting_gate: None,
4035                gate_history: Vec::new(),
4036                lifecycle_history: Vec::new(),
4037                last_failure: None,
4038            },
4039            runtime_context: automation
4040                .runtime_context_materialization()
4041                .or_else(|| automation.approved_plan_runtime_context_materialization()),
4042            automation_snapshot: Some(automation.clone()),
4043            pause_reason: None,
4044            resume_reason: None,
4045            detail: None,
4046            stop_kind: None,
4047            stop_reason: None,
4048            prompt_tokens: 0,
4049            completion_tokens: 0,
4050            total_tokens: 0,
4051            estimated_cost_usd: 0.0,
4052            scheduler: None,
4053        };
4054        self.automation_v2_runs
4055            .write()
4056            .await
4057            .insert(run.run_id.clone(), run.clone());
4058        self.persist_automation_v2_runs().await?;
4059        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4060            .await
4061            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4062        Ok(run)
4063    }
4064
4065    pub async fn create_automation_v2_dry_run(
4066        &self,
4067        automation: &AutomationV2Spec,
4068        trigger_type: &str,
4069    ) -> anyhow::Result<AutomationV2RunRecord> {
4070        let now = now_ms();
4071        let run = AutomationV2RunRecord {
4072            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4073            automation_id: automation.automation_id.clone(),
4074            trigger_type: format!("{trigger_type}_dry_run"),
4075            status: AutomationRunStatus::Completed,
4076            created_at_ms: now,
4077            updated_at_ms: now,
4078            started_at_ms: Some(now),
4079            finished_at_ms: Some(now),
4080            active_session_ids: Vec::new(),
4081            latest_session_id: None,
4082            active_instance_ids: Vec::new(),
4083            checkpoint: AutomationRunCheckpoint {
4084                completed_nodes: Vec::new(),
4085                pending_nodes: Vec::new(),
4086                node_outputs: std::collections::HashMap::new(),
4087                node_attempts: std::collections::HashMap::new(),
4088                blocked_nodes: Vec::new(),
4089                awaiting_gate: None,
4090                gate_history: Vec::new(),
4091                lifecycle_history: Vec::new(),
4092                last_failure: None,
4093            },
4094            runtime_context: automation
4095                .runtime_context_materialization()
4096                .or_else(|| automation.approved_plan_runtime_context_materialization()),
4097            automation_snapshot: Some(automation.clone()),
4098            pause_reason: None,
4099            resume_reason: None,
4100            detail: Some("dry_run".to_string()),
4101            stop_kind: None,
4102            stop_reason: None,
4103            prompt_tokens: 0,
4104            completion_tokens: 0,
4105            total_tokens: 0,
4106            estimated_cost_usd: 0.0,
4107            scheduler: None,
4108        };
4109        self.automation_v2_runs
4110            .write()
4111            .await
4112            .insert(run.run_id.clone(), run.clone());
4113        self.persist_automation_v2_runs().await?;
4114        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4115            .await
4116            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4117        Ok(run)
4118    }
4119
4120    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
4121        self.automation_v2_runs.read().await.get(run_id).cloned()
4122    }
4123
4124    pub async fn list_automation_v2_runs(
4125        &self,
4126        automation_id: Option<&str>,
4127        limit: usize,
4128    ) -> Vec<AutomationV2RunRecord> {
4129        let mut rows = self
4130            .automation_v2_runs
4131            .read()
4132            .await
4133            .values()
4134            .filter(|row| {
4135                if let Some(id) = automation_id {
4136                    row.automation_id == id
4137                } else {
4138                    true
4139                }
4140            })
4141            .cloned()
4142            .collect::<Vec<_>>();
4143        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
4144        rows.truncate(limit.clamp(1, 500));
4145        rows
4146    }
4147
4148    async fn automation_v2_run_workspace_root(
4149        &self,
4150        run: &AutomationV2RunRecord,
4151    ) -> Option<String> {
4152        if let Some(root) = run
4153            .automation_snapshot
4154            .as_ref()
4155            .and_then(|automation| automation.workspace_root.as_ref())
4156            .map(|value| value.trim())
4157            .filter(|value| !value.is_empty())
4158        {
4159            return Some(root.to_string());
4160        }
4161        self.get_automation_v2(&run.automation_id)
4162            .await
4163            .and_then(|automation| automation.workspace_root)
4164            .map(|value| value.trim().to_string())
4165            .filter(|value| !value.is_empty())
4166    }
4167
4168    async fn sync_automation_scheduler_for_run_transition(
4169        &self,
4170        previous_status: AutomationRunStatus,
4171        run: &AutomationV2RunRecord,
4172    ) {
4173        let had_capacity = automation_status_uses_scheduler_capacity(&previous_status);
4174        let has_capacity = automation_status_uses_scheduler_capacity(&run.status);
4175        let had_lock = automation_status_holds_workspace_lock(&previous_status);
4176        let has_lock = automation_status_holds_workspace_lock(&run.status);
4177        let workspace_root = self.automation_v2_run_workspace_root(run).await;
4178        let mut scheduler = self.automation_scheduler.write().await;
4179
4180        if (had_capacity || had_lock) && !has_capacity && !has_lock {
4181            scheduler.release_run(&run.run_id);
4182            return;
4183        }
4184        if had_capacity && !has_capacity {
4185            scheduler.release_capacity(&run.run_id);
4186        }
4187        if had_lock && !has_lock {
4188            scheduler.release_workspace(&run.run_id);
4189        }
4190        if !had_lock && has_lock {
4191            if has_capacity {
4192                scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4193            } else {
4194                scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4195            }
4196            return;
4197        }
4198        if !had_capacity && has_capacity {
4199            scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4200        }
4201    }
4202
4203    pub async fn recover_in_flight_runs(&self) -> usize {
4204        let runs = self
4205            .automation_v2_runs
4206            .read()
4207            .await
4208            .values()
4209            .cloned()
4210            .collect::<Vec<_>>();
4211        let mut recovered = 0usize;
4212        for run in runs {
4213            match run.status {
4214                AutomationRunStatus::Running => {
4215                    let detail = "automation run interrupted by server restart".to_string();
4216                    if self
4217                        .update_automation_v2_run(&run.run_id, |row| {
4218                            row.status = AutomationRunStatus::Failed;
4219                            row.detail = Some(detail.clone());
4220                            row.stop_kind = Some(AutomationStopKind::ServerRestart);
4221                            row.stop_reason = Some(detail.clone());
4222                            automation::record_automation_lifecycle_event(
4223                                row,
4224                                "run_failed_server_restart",
4225                                Some(detail.clone()),
4226                                Some(AutomationStopKind::ServerRestart),
4227                            );
4228                        })
4229                        .await
4230                        .is_some()
4231                    {
4232                        recovered += 1;
4233                    }
4234                }
4235                AutomationRunStatus::Paused
4236                | AutomationRunStatus::Pausing
4237                | AutomationRunStatus::AwaitingApproval => {
4238                    let workspace_root = self.automation_v2_run_workspace_root(&run).await;
4239                    let mut scheduler = self.automation_scheduler.write().await;
4240                    scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4241                    for (node_id, output) in &run.checkpoint.node_outputs {
4242                        if let Some((path, content_digest)) =
4243                            automation::node_output::automation_output_validated_artifact(output)
4244                        {
4245                            scheduler.preexisting_registry.register_validated(
4246                                &run.run_id,
4247                                node_id,
4248                                automation::scheduler::ValidatedArtifact {
4249                                    path,
4250                                    content_digest,
4251                                },
4252                            );
4253                        }
4254                    }
4255                }
4256                _ => {}
4257            }
4258        }
4259        recovered
4260    }
4261
4262    pub fn is_automation_scheduler_stopping(&self) -> bool {
4263        self.automation_scheduler_stopping.load(Ordering::Relaxed)
4264    }
4265
4266    pub fn set_automation_scheduler_stopping(&self, stopping: bool) {
4267        self.automation_scheduler_stopping
4268            .store(stopping, Ordering::Relaxed);
4269    }
4270
4271    pub async fn fail_running_automation_runs_for_shutdown(&self) -> usize {
4272        let run_ids = self
4273            .automation_v2_runs
4274            .read()
4275            .await
4276            .values()
4277            .filter(|run| matches!(run.status, AutomationRunStatus::Running))
4278            .map(|run| run.run_id.clone())
4279            .collect::<Vec<_>>();
4280        let mut failed = 0usize;
4281        for run_id in run_ids {
4282            let detail = "automation run stopped during server shutdown".to_string();
4283            if self
4284                .update_automation_v2_run(&run_id, |row| {
4285                    row.status = AutomationRunStatus::Failed;
4286                    row.detail = Some(detail.clone());
4287                    row.stop_kind = Some(AutomationStopKind::Shutdown);
4288                    row.stop_reason = Some(detail.clone());
4289                    automation::record_automation_lifecycle_event(
4290                        row,
4291                        "run_failed_shutdown",
4292                        Some(detail.clone()),
4293                        Some(AutomationStopKind::Shutdown),
4294                    );
4295                })
4296                .await
4297                .is_some()
4298            {
4299                failed += 1;
4300            }
4301        }
4302        failed
4303    }
4304
4305    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
4306        let run_id = self
4307            .automation_v2_runs
4308            .read()
4309            .await
4310            .values()
4311            .filter(|row| row.status == AutomationRunStatus::Queued)
4312            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
4313            .map(|row| row.run_id.clone())?;
4314        self.claim_specific_automation_v2_run(&run_id).await
4315    }
4316    pub async fn claim_specific_automation_v2_run(
4317        &self,
4318        run_id: &str,
4319    ) -> Option<AutomationV2RunRecord> {
4320        let mut guard = self.automation_v2_runs.write().await;
4321        let run = guard.get_mut(run_id)?;
4322        if run.status != AutomationRunStatus::Queued {
4323            return None;
4324        }
4325        let previous_status = run.status.clone();
4326        let now = now_ms();
4327        if run.runtime_context.is_none() {
4328            run.runtime_context = run.automation_snapshot.as_ref().and_then(|automation| {
4329                automation
4330                    .runtime_context_materialization()
4331                    .or_else(|| automation.approved_plan_runtime_context_materialization())
4332            });
4333        }
4334        if run.runtime_context.is_none() {
4335            let previous_status = run.status.clone();
4336            let now = now_ms();
4337            run.status = AutomationRunStatus::Failed;
4338            run.updated_at_ms = now;
4339            run.finished_at_ms.get_or_insert(now);
4340            run.scheduler = None;
4341            run.detail = Some("runtime context partition missing for automation run".to_string());
4342            let claimed = run.clone();
4343            drop(guard);
4344            self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
4345                .await;
4346            let _ = self.persist_automation_v2_runs().await;
4347            return None;
4348        }
4349        run.status = AutomationRunStatus::Running;
4350        run.updated_at_ms = now;
4351        run.started_at_ms.get_or_insert(now);
4352        run.scheduler = None;
4353        let claimed = run.clone();
4354        drop(guard);
4355        self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
4356            .await;
4357        let _ = self.persist_automation_v2_runs().await;
4358        Some(claimed)
4359    }
4360    pub async fn update_automation_v2_run(
4361        &self,
4362        run_id: &str,
4363        update: impl FnOnce(&mut AutomationV2RunRecord),
4364    ) -> Option<AutomationV2RunRecord> {
4365        let mut guard = self.automation_v2_runs.write().await;
4366        let run = guard.get_mut(run_id)?;
4367        let previous_status = run.status.clone();
4368        update(run);
4369        if run.status != AutomationRunStatus::Queued {
4370            run.scheduler = None;
4371        }
4372        run.updated_at_ms = now_ms();
4373        if matches!(
4374            run.status,
4375            AutomationRunStatus::Completed
4376                | AutomationRunStatus::Blocked
4377                | AutomationRunStatus::Failed
4378                | AutomationRunStatus::Cancelled
4379        ) {
4380            run.finished_at_ms.get_or_insert_with(now_ms);
4381        }
4382        let out = run.clone();
4383        drop(guard);
4384        self.sync_automation_scheduler_for_run_transition(previous_status, &out)
4385            .await;
4386        let _ = self.persist_automation_v2_runs().await;
4387        Some(out)
4388    }
4389
4390    pub async fn set_automation_v2_run_scheduler_metadata(
4391        &self,
4392        run_id: &str,
4393        meta: automation::SchedulerMetadata,
4394    ) -> Option<AutomationV2RunRecord> {
4395        self.update_automation_v2_run(run_id, |row| {
4396            row.scheduler = Some(meta);
4397        })
4398        .await
4399    }
4400
4401    pub async fn clear_automation_v2_run_scheduler_metadata(
4402        &self,
4403        run_id: &str,
4404    ) -> Option<AutomationV2RunRecord> {
4405        self.update_automation_v2_run(run_id, |row| {
4406            row.scheduler = None;
4407        })
4408        .await
4409    }
4410
4411    pub async fn add_automation_v2_session(
4412        &self,
4413        run_id: &str,
4414        session_id: &str,
4415    ) -> Option<AutomationV2RunRecord> {
4416        let updated = self
4417            .update_automation_v2_run(run_id, |row| {
4418                if !row.active_session_ids.iter().any(|id| id == session_id) {
4419                    row.active_session_ids.push(session_id.to_string());
4420                }
4421                row.latest_session_id = Some(session_id.to_string());
4422            })
4423            .await;
4424        self.automation_v2_session_runs
4425            .write()
4426            .await
4427            .insert(session_id.to_string(), run_id.to_string());
4428        updated
4429    }
4430
4431    pub async fn clear_automation_v2_session(
4432        &self,
4433        run_id: &str,
4434        session_id: &str,
4435    ) -> Option<AutomationV2RunRecord> {
4436        self.automation_v2_session_runs
4437            .write()
4438            .await
4439            .remove(session_id);
4440        self.update_automation_v2_run(run_id, |row| {
4441            row.active_session_ids.retain(|id| id != session_id);
4442        })
4443        .await
4444    }
4445
4446    pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
4447        let mut guard = self.automation_v2_session_runs.write().await;
4448        for session_id in session_ids {
4449            guard.remove(session_id);
4450        }
4451    }
4452
4453    pub async fn add_automation_v2_instance(
4454        &self,
4455        run_id: &str,
4456        instance_id: &str,
4457    ) -> Option<AutomationV2RunRecord> {
4458        self.update_automation_v2_run(run_id, |row| {
4459            if !row.active_instance_ids.iter().any(|id| id == instance_id) {
4460                row.active_instance_ids.push(instance_id.to_string());
4461            }
4462        })
4463        .await
4464    }
4465
4466    pub async fn clear_automation_v2_instance(
4467        &self,
4468        run_id: &str,
4469        instance_id: &str,
4470    ) -> Option<AutomationV2RunRecord> {
4471        self.update_automation_v2_run(run_id, |row| {
4472            row.active_instance_ids.retain(|id| id != instance_id);
4473        })
4474        .await
4475    }
4476
4477    pub async fn apply_provider_usage_to_runs(
4478        &self,
4479        session_id: &str,
4480        prompt_tokens: u64,
4481        completion_tokens: u64,
4482        total_tokens: u64,
4483    ) {
4484        if let Some(policy) = self.routine_session_policy(session_id).await {
4485            let rate = self.token_cost_per_1k_usd.max(0.0);
4486            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4487            let mut guard = self.routine_runs.write().await;
4488            if let Some(run) = guard.get_mut(&policy.run_id) {
4489                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4490                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4491                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4492                run.estimated_cost_usd += delta_cost;
4493                run.updated_at_ms = now_ms();
4494            }
4495            drop(guard);
4496            let _ = self.persist_routine_runs().await;
4497        }
4498
4499        let maybe_v2_run_id = self
4500            .automation_v2_session_runs
4501            .read()
4502            .await
4503            .get(session_id)
4504            .cloned();
4505        if let Some(run_id) = maybe_v2_run_id {
4506            let rate = self.token_cost_per_1k_usd.max(0.0);
4507            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4508            let mut guard = self.automation_v2_runs.write().await;
4509            if let Some(run) = guard.get_mut(&run_id) {
4510                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4511                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4512                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4513                run.estimated_cost_usd += delta_cost;
4514                run.updated_at_ms = now_ms();
4515            }
4516            drop(guard);
4517            let _ = self.persist_automation_v2_runs().await;
4518        }
4519    }
4520
4521    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
4522        let mut fired = Vec::new();
4523        let mut guard = self.automations_v2.write().await;
4524        for automation in guard.values_mut() {
4525            if automation.status != AutomationV2Status::Active {
4526                continue;
4527            }
4528            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
4529                automation.next_fire_at_ms =
4530                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4531                continue;
4532            };
4533            if now_ms < next_fire_at_ms {
4534                continue;
4535            }
4536            let run_count =
4537                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
4538            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4539            automation.next_fire_at_ms = next;
4540            automation.last_fired_at_ms = Some(now_ms);
4541            for _ in 0..run_count {
4542                fired.push(automation.automation_id.clone());
4543            }
4544        }
4545        drop(guard);
4546        let _ = self.persist_automations_v2().await;
4547        fired
4548    }
4549}
4550
4551async fn build_channels_config(
4552    state: &AppState,
4553    channels: &ChannelsConfigFile,
4554) -> Option<ChannelsConfig> {
4555    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
4556        return None;
4557    }
4558    Some(ChannelsConfig {
4559        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
4560            bot_token: cfg.bot_token,
4561            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4562            mention_only: cfg.mention_only,
4563            style_profile: cfg.style_profile,
4564            security_profile: cfg.security_profile,
4565        }),
4566        discord: channels.discord.clone().map(|cfg| DiscordConfig {
4567            bot_token: cfg.bot_token,
4568            guild_id: cfg.guild_id,
4569            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4570            mention_only: cfg.mention_only,
4571            security_profile: cfg.security_profile,
4572        }),
4573        slack: channels.slack.clone().map(|cfg| SlackConfig {
4574            bot_token: cfg.bot_token,
4575            channel_id: cfg.channel_id,
4576            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4577            mention_only: cfg.mention_only,
4578            security_profile: cfg.security_profile,
4579        }),
4580        server_base_url: state.server_base_url(),
4581        api_token: state.api_token().await.unwrap_or_default(),
4582        tool_policy: channels.tool_policy.clone(),
4583    })
4584}
4585
4586// channel config normalization moved to crate::config::channels
4587
4588fn is_valid_owner_repo_slug(value: &str) -> bool {
4589    let trimmed = value.trim();
4590    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
4591        return false;
4592    }
4593    let mut parts = trimmed.split('/');
4594    let Some(owner) = parts.next() else {
4595        return false;
4596    };
4597    let Some(repo) = parts.next() else {
4598        return false;
4599    };
4600    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
4601}
4602
4603fn legacy_automations_v2_path() -> Option<PathBuf> {
4604    config::paths::resolve_legacy_root_file_path("automations_v2.json")
4605        .filter(|path| path != &config::paths::resolve_automations_v2_path())
4606}
4607
4608fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4609    let mut candidates = vec![active_path.clone()];
4610    if let Some(legacy_path) = legacy_automations_v2_path() {
4611        if !candidates.contains(&legacy_path) {
4612            candidates.push(legacy_path);
4613        }
4614    }
4615    let default_path = config::paths::default_state_dir().join("automations_v2.json");
4616    if !candidates.contains(&default_path) {
4617        candidates.push(default_path);
4618    }
4619    candidates
4620}
4621
4622async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
4623    let Some(legacy_path) = legacy_automations_v2_path() else {
4624        return Ok(());
4625    };
4626    if legacy_path == *active_path || !legacy_path.exists() {
4627        return Ok(());
4628    }
4629    fs::remove_file(&legacy_path).await?;
4630    tracing::info!(
4631        active_path = active_path.display().to_string(),
4632        removed_path = legacy_path.display().to_string(),
4633        "removed stale legacy automation v2 file after canonical persistence"
4634    );
4635    Ok(())
4636}
4637
4638fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
4639    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
4640        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
4641}
4642
4643fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4644    let mut candidates = vec![active_path.clone()];
4645    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
4646        if !candidates.contains(&legacy_path) {
4647            candidates.push(legacy_path);
4648        }
4649    }
4650    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
4651    if !candidates.contains(&default_path) {
4652        candidates.push(default_path);
4653    }
4654    candidates
4655}
4656
4657fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4658    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4659        .unwrap_or_default()
4660}
4661
4662fn parse_automation_v2_runs_file(
4663    raw: &str,
4664) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4665    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4666        .unwrap_or_default()
4667}
4668
4669fn parse_optimization_campaigns_file(
4670    raw: &str,
4671) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
4672    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
4673        .unwrap_or_default()
4674}
4675
4676fn parse_optimization_experiments_file(
4677    raw: &str,
4678) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
4679    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
4680        .unwrap_or_default()
4681}
4682
4683fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4684    match schedule {
4685        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4686        RoutineSchedule::Cron { .. } => None,
4687    }
4688}
4689
4690fn parse_timezone(timezone: &str) -> Option<Tz> {
4691    timezone.trim().parse::<Tz>().ok()
4692}
4693
4694fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4695    let tz = parse_timezone(timezone)?;
4696    let schedule = Schedule::from_str(expression).ok()?;
4697    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4698    let local_from = from_dt.with_timezone(&tz);
4699    let next = schedule.after(&local_from).next()?;
4700    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4701}
4702
4703fn compute_next_schedule_fire_at_ms(
4704    schedule: &RoutineSchedule,
4705    timezone: &str,
4706    from_ms: u64,
4707) -> Option<u64> {
4708    let _ = parse_timezone(timezone)?;
4709    match schedule {
4710        RoutineSchedule::IntervalSeconds { seconds } => {
4711            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4712        }
4713        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4714    }
4715}
4716
4717fn compute_misfire_plan_for_schedule(
4718    now_ms: u64,
4719    next_fire_at_ms: u64,
4720    schedule: &RoutineSchedule,
4721    timezone: &str,
4722    policy: &RoutineMisfirePolicy,
4723) -> (u32, u64) {
4724    match schedule {
4725        RoutineSchedule::IntervalSeconds { .. } => {
4726            let Some(interval_ms) = routine_interval_ms(schedule) else {
4727                return (0, next_fire_at_ms);
4728            };
4729            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4730        }
4731        RoutineSchedule::Cron { expression } => {
4732            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4733                .unwrap_or_else(|| now_ms.saturating_add(60_000));
4734            match policy {
4735                RoutineMisfirePolicy::Skip => (0, aligned_next),
4736                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4737                RoutineMisfirePolicy::CatchUp { max_runs } => {
4738                    let mut count = 0u32;
4739                    let mut cursor = next_fire_at_ms;
4740                    while cursor <= now_ms && count < *max_runs {
4741                        count = count.saturating_add(1);
4742                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4743                            break;
4744                        };
4745                        if next <= cursor {
4746                            break;
4747                        }
4748                        cursor = next;
4749                    }
4750                    (count, aligned_next)
4751                }
4752            }
4753        }
4754    }
4755}
4756
4757fn compute_misfire_plan(
4758    now_ms: u64,
4759    next_fire_at_ms: u64,
4760    interval_ms: u64,
4761    policy: &RoutineMisfirePolicy,
4762) -> (u32, u64) {
4763    if now_ms < next_fire_at_ms || interval_ms == 0 {
4764        return (0, next_fire_at_ms);
4765    }
4766    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4767    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4768    match policy {
4769        RoutineMisfirePolicy::Skip => (0, aligned_next),
4770        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4771        RoutineMisfirePolicy::CatchUp { max_runs } => {
4772            let count = missed.min(u64::from(*max_runs)) as u32;
4773            (count, aligned_next)
4774        }
4775    }
4776}
4777
4778fn auto_generated_agent_name(agent_id: &str) -> String {
4779    let names = [
4780        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4781    ];
4782    let digest = Sha256::digest(agent_id.as_bytes());
4783    let idx = usize::from(digest[0]) % names.len();
4784    format!("{}-{:02x}", names[idx], digest[1])
4785}
4786
4787fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4788    match schedule.schedule_type {
4789        AutomationV2ScheduleType::Manual => None,
4790        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4791            seconds: schedule.interval_seconds.unwrap_or(60),
4792        }),
4793        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4794            expression: schedule.cron_expression.clone().unwrap_or_default(),
4795        }),
4796    }
4797}
4798
4799fn automation_schedule_next_fire_at_ms(
4800    schedule: &AutomationV2Schedule,
4801    from_ms: u64,
4802) -> Option<u64> {
4803    let routine_schedule = schedule_from_automation_v2(schedule)?;
4804    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4805}
4806
4807fn automation_schedule_due_count(
4808    schedule: &AutomationV2Schedule,
4809    now_ms: u64,
4810    next_fire_at_ms: u64,
4811) -> u32 {
4812    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4813        return 0;
4814    };
4815    let (count, _) = compute_misfire_plan_for_schedule(
4816        now_ms,
4817        next_fire_at_ms,
4818        &routine_schedule,
4819        &schedule.timezone,
4820        &schedule.misfire_policy,
4821    );
4822    count.max(1)
4823}
4824
4825#[derive(Debug, Clone, PartialEq, Eq)]
4826pub enum RoutineExecutionDecision {
4827    Allowed,
4828    RequiresApproval { reason: String },
4829    Blocked { reason: String },
4830}
4831
4832pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4833    let entrypoint = routine.entrypoint.to_ascii_lowercase();
4834    if entrypoint.starts_with("connector.")
4835        || entrypoint.starts_with("integration.")
4836        || entrypoint.contains("external")
4837    {
4838        return true;
4839    }
4840    routine
4841        .args
4842        .get("uses_external_integrations")
4843        .and_then(|v| v.as_bool())
4844        .unwrap_or(false)
4845        || routine
4846            .args
4847            .get("connector_id")
4848            .and_then(|v| v.as_str())
4849            .is_some()
4850}
4851
4852pub fn evaluate_routine_execution_policy(
4853    routine: &RoutineSpec,
4854    trigger_type: &str,
4855) -> RoutineExecutionDecision {
4856    if !routine_uses_external_integrations(routine) {
4857        return RoutineExecutionDecision::Allowed;
4858    }
4859    if !routine.external_integrations_allowed {
4860        return RoutineExecutionDecision::Blocked {
4861            reason: "external integrations are disabled by policy".to_string(),
4862        };
4863    }
4864    if routine.requires_approval {
4865        return RoutineExecutionDecision::RequiresApproval {
4866            reason: format!(
4867                "manual approval required before external side effects ({})",
4868                trigger_type
4869            ),
4870        };
4871    }
4872    RoutineExecutionDecision::Allowed
4873}
4874
4875fn is_valid_resource_key(key: &str) -> bool {
4876    let trimmed = key.trim();
4877    if trimmed.is_empty() {
4878        return false;
4879    }
4880    if trimmed == "swarm.active_tasks" {
4881        return true;
4882    }
4883    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4884    if !allowed_prefix
4885        .iter()
4886        .any(|prefix| trimmed.starts_with(prefix))
4887    {
4888        return false;
4889    }
4890    !trimmed.contains("//")
4891}
4892
4893impl Deref for AppState {
4894    type Target = RuntimeState;
4895
4896    fn deref(&self) -> &Self::Target {
4897        self.runtime
4898            .get()
4899            .expect("runtime accessed before startup completion")
4900    }
4901}
4902
4903#[derive(Clone)]
4904struct ServerPromptContextHook {
4905    state: AppState,
4906}
4907
4908impl ServerPromptContextHook {
4909    fn new(state: AppState) -> Self {
4910        Self { state }
4911    }
4912
4913    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4914        let paths = resolve_shared_paths().ok()?;
4915        MemoryDatabase::new(&paths.memory_db_path).await.ok()
4916    }
4917
4918    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4919        let paths = resolve_shared_paths().ok()?;
4920        tandem_memory::MemoryManager::new(&paths.memory_db_path)
4921            .await
4922            .ok()
4923    }
4924
4925    fn hash_query(input: &str) -> String {
4926        let mut hasher = Sha256::new();
4927        hasher.update(input.as_bytes());
4928        format!("{:x}", hasher.finalize())
4929    }
4930
4931    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4932        let mut out = vec!["<memory_context>".to_string()];
4933        let mut used = 0usize;
4934        for hit in hits {
4935            let text = hit
4936                .record
4937                .content
4938                .split_whitespace()
4939                .take(60)
4940                .collect::<Vec<_>>()
4941                .join(" ");
4942            let line = format!(
4943                "- [{:.3}] {} (source={}, run={})",
4944                hit.score, text, hit.record.source_type, hit.record.run_id
4945            );
4946            used = used.saturating_add(line.len());
4947            if used > 2200 {
4948                break;
4949            }
4950            out.push(line);
4951        }
4952        out.push("</memory_context>".to_string());
4953        out.join("\n")
4954    }
4955
4956    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4957        chunk
4958            .metadata
4959            .as_ref()
4960            .and_then(|meta| meta.get("source_url"))
4961            .and_then(Value::as_str)
4962            .map(str::trim)
4963            .filter(|v| !v.is_empty())
4964            .map(ToString::to_string)
4965    }
4966
4967    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4968        if let Some(path) = chunk
4969            .metadata
4970            .as_ref()
4971            .and_then(|meta| meta.get("relative_path"))
4972            .and_then(Value::as_str)
4973            .map(str::trim)
4974            .filter(|v| !v.is_empty())
4975        {
4976            return path.to_string();
4977        }
4978        chunk
4979            .source
4980            .strip_prefix("guide_docs:")
4981            .unwrap_or(chunk.source.as_str())
4982            .to_string()
4983    }
4984
4985    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4986        let mut out = vec!["<docs_context>".to_string()];
4987        let mut used = 0usize;
4988        for hit in hits {
4989            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4990            let path = Self::extract_docs_relative_path(&hit.chunk);
4991            let text = hit
4992                .chunk
4993                .content
4994                .split_whitespace()
4995                .take(70)
4996                .collect::<Vec<_>>()
4997                .join(" ");
4998            let line = format!(
4999                "- [{:.3}] {} (doc_path={}, source_url={})",
5000                hit.similarity, text, path, url
5001            );
5002            used = used.saturating_add(line.len());
5003            if used > 2800 {
5004                break;
5005            }
5006            out.push(line);
5007        }
5008        out.push("</docs_context>".to_string());
5009        out.join("\n")
5010    }
5011
5012    async fn search_embedded_docs(
5013        &self,
5014        query: &str,
5015        limit: usize,
5016    ) -> Vec<tandem_memory::types::MemorySearchResult> {
5017        let Some(manager) = self.open_memory_manager().await else {
5018            return Vec::new();
5019        };
5020        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
5021        manager
5022            .search(
5023                query,
5024                Some(MemoryTier::Global),
5025                None,
5026                None,
5027                Some(search_limit),
5028            )
5029            .await
5030            .unwrap_or_default()
5031            .into_iter()
5032            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
5033            .take(limit)
5034            .collect()
5035    }
5036
5037    fn should_skip_memory_injection(query: &str) -> bool {
5038        let trimmed = query.trim();
5039        if trimmed.is_empty() {
5040            return true;
5041        }
5042        let lower = trimmed.to_ascii_lowercase();
5043        let social = [
5044            "hi",
5045            "hello",
5046            "hey",
5047            "thanks",
5048            "thank you",
5049            "ok",
5050            "okay",
5051            "cool",
5052            "nice",
5053            "yo",
5054            "good morning",
5055            "good afternoon",
5056            "good evening",
5057        ];
5058        lower.len() <= 32 && social.contains(&lower.as_str())
5059    }
5060
5061    fn personality_preset_text(preset: &str) -> &'static str {
5062        match preset {
5063            "concise" => {
5064                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
5065            }
5066            "friendly" => {
5067                "Default style: friendly and supportive while staying technically rigorous and concrete."
5068            }
5069            "mentor" => {
5070                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
5071            }
5072            "critical" => {
5073                "Default style: critical and risk-first. Surface failure modes and assumptions early."
5074            }
5075            _ => {
5076                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
5077            }
5078        }
5079    }
5080
5081    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
5082        let allow_agent_override = agent_name
5083            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
5084            .unwrap_or(false);
5085        let legacy_bot_name = config
5086            .get("bot_name")
5087            .and_then(Value::as_str)
5088            .map(str::trim)
5089            .filter(|v| !v.is_empty());
5090        let bot_name = config
5091            .get("identity")
5092            .and_then(|identity| identity.get("bot"))
5093            .and_then(|bot| bot.get("canonical_name"))
5094            .and_then(Value::as_str)
5095            .map(str::trim)
5096            .filter(|v| !v.is_empty())
5097            .or(legacy_bot_name)
5098            .unwrap_or("Tandem");
5099
5100        let default_profile = config
5101            .get("identity")
5102            .and_then(|identity| identity.get("personality"))
5103            .and_then(|personality| personality.get("default"));
5104        let default_preset = default_profile
5105            .and_then(|profile| profile.get("preset"))
5106            .and_then(Value::as_str)
5107            .map(str::trim)
5108            .filter(|v| !v.is_empty())
5109            .unwrap_or("balanced");
5110        let default_custom = default_profile
5111            .and_then(|profile| profile.get("custom_instructions"))
5112            .and_then(Value::as_str)
5113            .map(str::trim)
5114            .filter(|v| !v.is_empty())
5115            .map(ToString::to_string);
5116        let legacy_persona = config
5117            .get("persona")
5118            .and_then(Value::as_str)
5119            .map(str::trim)
5120            .filter(|v| !v.is_empty())
5121            .map(ToString::to_string);
5122
5123        let per_agent_profile = if allow_agent_override {
5124            agent_name.and_then(|name| {
5125                config
5126                    .get("identity")
5127                    .and_then(|identity| identity.get("personality"))
5128                    .and_then(|personality| personality.get("per_agent"))
5129                    .and_then(|per_agent| per_agent.get(name))
5130            })
5131        } else {
5132            None
5133        };
5134        let preset = per_agent_profile
5135            .and_then(|profile| profile.get("preset"))
5136            .and_then(Value::as_str)
5137            .map(str::trim)
5138            .filter(|v| !v.is_empty())
5139            .unwrap_or(default_preset);
5140        let custom = per_agent_profile
5141            .and_then(|profile| profile.get("custom_instructions"))
5142            .and_then(Value::as_str)
5143            .map(str::trim)
5144            .filter(|v| !v.is_empty())
5145            .map(ToString::to_string)
5146            .or(default_custom)
5147            .or(legacy_persona);
5148
5149        let mut lines = vec![
5150            format!("You are {bot_name}, an AI assistant."),
5151            Self::personality_preset_text(preset).to_string(),
5152        ];
5153        if let Some(custom) = custom {
5154            lines.push(format!("Additional personality instructions: {custom}"));
5155        }
5156        Some(lines.join("\n"))
5157    }
5158
5159    fn build_memory_scope_block(
5160        session_id: &str,
5161        project_id: Option<&str>,
5162        workspace_root: Option<&str>,
5163    ) -> String {
5164        let mut lines = vec![
5165            "<memory_scope>".to_string(),
5166            format!("- current_session_id: {}", session_id),
5167        ];
5168        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
5169            lines.push(format!("- current_project_id: {}", project_id));
5170        }
5171        if let Some(workspace_root) = workspace_root
5172            .map(str::trim)
5173            .filter(|value| !value.is_empty())
5174        {
5175            lines.push(format!("- workspace_root: {}", workspace_root));
5176        }
5177        lines.push(
5178            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
5179                .to_string(),
5180        );
5181        lines.push(
5182            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
5183                .to_string(),
5184        );
5185        lines.push(
5186            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
5187                .to_string(),
5188        );
5189        lines.push("</memory_scope>".to_string());
5190        lines.join("\n")
5191    }
5192}
5193
5194impl PromptContextHook for ServerPromptContextHook {
5195    fn augment_provider_messages(
5196        &self,
5197        ctx: PromptContextHookContext,
5198        mut messages: Vec<ChatMessage>,
5199    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
5200        let this = self.clone();
5201        Box::pin(async move {
5202            // Startup can invoke prompt plumbing before RuntimeState is installed.
5203            // Never panic from context hooks; fail-open and continue without augmentation.
5204            if !this.state.is_ready() {
5205                return Ok(messages);
5206            }
5207            let run = this.state.run_registry.get(&ctx.session_id).await;
5208            let Some(run) = run else {
5209                return Ok(messages);
5210            };
5211            let config = this.state.config.get_effective_value().await;
5212            if let Some(identity_block) =
5213                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
5214            {
5215                messages.push(ChatMessage {
5216                    role: "system".to_string(),
5217                    content: identity_block,
5218                    attachments: Vec::new(),
5219                });
5220            }
5221            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
5222                messages.push(ChatMessage {
5223                    role: "system".to_string(),
5224                    content: Self::build_memory_scope_block(
5225                        &ctx.session_id,
5226                        session.project_id.as_deref(),
5227                        session.workspace_root.as_deref(),
5228                    ),
5229                    attachments: Vec::new(),
5230                });
5231            }
5232            let run_id = run.run_id;
5233            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
5234            let query = messages
5235                .iter()
5236                .rev()
5237                .find(|m| m.role == "user")
5238                .map(|m| m.content.clone())
5239                .unwrap_or_default();
5240            if query.trim().is_empty() {
5241                return Ok(messages);
5242            }
5243            if Self::should_skip_memory_injection(&query) {
5244                return Ok(messages);
5245            }
5246
5247            let docs_hits = this.search_embedded_docs(&query, 6).await;
5248            if !docs_hits.is_empty() {
5249                let docs_block = Self::build_docs_memory_block(&docs_hits);
5250                messages.push(ChatMessage {
5251                    role: "system".to_string(),
5252                    content: docs_block.clone(),
5253                    attachments: Vec::new(),
5254                });
5255                this.state.event_bus.publish(EngineEvent::new(
5256                    "memory.docs.context.injected",
5257                    json!({
5258                        "runID": run_id,
5259                        "sessionID": ctx.session_id,
5260                        "messageID": ctx.message_id,
5261                        "iteration": ctx.iteration,
5262                        "count": docs_hits.len(),
5263                        "tokenSizeApprox": docs_block.split_whitespace().count(),
5264                        "sourcePrefix": "guide_docs:"
5265                    }),
5266                ));
5267                return Ok(messages);
5268            }
5269
5270            let Some(db) = this.open_memory_db().await else {
5271                return Ok(messages);
5272            };
5273            let started = now_ms();
5274            let hits = db
5275                .search_global_memory(&user_id, &query, 8, None, None, None)
5276                .await
5277                .unwrap_or_default();
5278            let latency_ms = now_ms().saturating_sub(started);
5279            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
5280            this.state.event_bus.publish(EngineEvent::new(
5281                "memory.search.performed",
5282                json!({
5283                    "runID": run_id,
5284                    "sessionID": ctx.session_id,
5285                    "messageID": ctx.message_id,
5286                    "providerID": ctx.provider_id,
5287                    "modelID": ctx.model_id,
5288                    "iteration": ctx.iteration,
5289                    "queryHash": Self::hash_query(&query),
5290                    "resultCount": hits.len(),
5291                    "scoreMin": scores.iter().copied().reduce(f64::min),
5292                    "scoreMax": scores.iter().copied().reduce(f64::max),
5293                    "scores": scores,
5294                    "latencyMs": latency_ms,
5295                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
5296                }),
5297            ));
5298
5299            if hits.is_empty() {
5300                return Ok(messages);
5301            }
5302
5303            let memory_block = Self::build_memory_block(&hits);
5304            messages.push(ChatMessage {
5305                role: "system".to_string(),
5306                content: memory_block.clone(),
5307                attachments: Vec::new(),
5308            });
5309            this.state.event_bus.publish(EngineEvent::new(
5310                "memory.context.injected",
5311                json!({
5312                    "runID": run_id,
5313                    "sessionID": ctx.session_id,
5314                    "messageID": ctx.message_id,
5315                    "iteration": ctx.iteration,
5316                    "count": hits.len(),
5317                    "tokenSizeApprox": memory_block.split_whitespace().count(),
5318                }),
5319            ));
5320            Ok(messages)
5321        })
5322    }
5323}
5324
5325fn extract_event_session_id(properties: &Value) -> Option<String> {
5326    properties
5327        .get("sessionID")
5328        .or_else(|| properties.get("sessionId"))
5329        .or_else(|| properties.get("id"))
5330        .or_else(|| {
5331            properties
5332                .get("part")
5333                .and_then(|part| part.get("sessionID"))
5334        })
5335        .or_else(|| {
5336            properties
5337                .get("part")
5338                .and_then(|part| part.get("sessionId"))
5339        })
5340        .and_then(|v| v.as_str())
5341        .map(|s| s.to_string())
5342}
5343
5344fn extract_event_run_id(properties: &Value) -> Option<String> {
5345    properties
5346        .get("runID")
5347        .or_else(|| properties.get("run_id"))
5348        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
5349        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
5350        .and_then(|v| v.as_str())
5351        .map(|s| s.to_string())
5352}
5353
5354pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
5355    let part = properties.get("part")?;
5356    let part_type = part
5357        .get("type")
5358        .and_then(|v| v.as_str())
5359        .unwrap_or_default()
5360        .to_ascii_lowercase();
5361    if part_type != "tool"
5362        && part_type != "tool-invocation"
5363        && part_type != "tool-result"
5364        && part_type != "tool_invocation"
5365        && part_type != "tool_result"
5366    {
5367        return None;
5368    }
5369    let part_state = part
5370        .get("state")
5371        .and_then(|v| v.as_str())
5372        .unwrap_or_default()
5373        .to_ascii_lowercase();
5374    let has_result = part.get("result").is_some_and(|value| !value.is_null());
5375    let has_error = part
5376        .get("error")
5377        .and_then(|v| v.as_str())
5378        .is_some_and(|value| !value.trim().is_empty());
5379    // Skip transient "running" deltas to avoid persistence storms from streamed
5380    // tool-argument chunks; keep actionable/final updates.
5381    if part_state == "running" && !has_result && !has_error {
5382        return None;
5383    }
5384    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
5385    let message_id = part
5386        .get("messageID")
5387        .or_else(|| part.get("message_id"))
5388        .and_then(|v| v.as_str())?
5389        .to_string();
5390    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
5391    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5392        if let Some(preview) = properties
5393            .get("toolCallDelta")
5394            .and_then(|delta| delta.get("parsedArgsPreview"))
5395            .cloned()
5396        {
5397            let preview_nonempty = !preview.is_null()
5398                && !preview.as_object().is_some_and(|value| value.is_empty())
5399                && !preview
5400                    .as_str()
5401                    .map(|value| value.trim().is_empty())
5402                    .unwrap_or(false);
5403            if preview_nonempty {
5404                args = preview;
5405            }
5406        }
5407        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5408            if let Some(raw_preview) = properties
5409                .get("toolCallDelta")
5410                .and_then(|delta| delta.get("rawArgsPreview"))
5411                .and_then(|value| value.as_str())
5412                .map(str::trim)
5413                .filter(|value| !value.is_empty())
5414            {
5415                args = Value::String(raw_preview.to_string());
5416            }
5417        }
5418    }
5419    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
5420    {
5421        tracing::info!(
5422            message_id = %message_id,
5423            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
5424            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
5425            has_result = part.get("result").is_some(),
5426            has_error = part.get("error").is_some(),
5427            "persistable write tool part still has empty args"
5428        );
5429    }
5430    let result = part.get("result").cloned().filter(|value| !value.is_null());
5431    let error = part
5432        .get("error")
5433        .and_then(|v| v.as_str())
5434        .map(|value| value.to_string());
5435    Some((
5436        message_id,
5437        MessagePart::ToolInvocation {
5438            tool,
5439            args,
5440            result,
5441            error,
5442        },
5443    ))
5444}
5445
5446pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
5447    let session_id = extract_event_session_id(&event.properties)?;
5448    let run_id = extract_event_run_id(&event.properties);
5449    let key = format!("run/{session_id}/status");
5450
5451    let mut base = serde_json::Map::new();
5452    base.insert("sessionID".to_string(), Value::String(session_id));
5453    if let Some(run_id) = run_id {
5454        base.insert("runID".to_string(), Value::String(run_id));
5455    }
5456
5457    match event.event_type.as_str() {
5458        "session.run.started" => {
5459            base.insert("state".to_string(), Value::String("running".to_string()));
5460            base.insert("phase".to_string(), Value::String("run".to_string()));
5461            base.insert(
5462                "eventType".to_string(),
5463                Value::String("session.run.started".to_string()),
5464            );
5465            Some(StatusIndexUpdate {
5466                key,
5467                value: Value::Object(base),
5468            })
5469        }
5470        "session.run.finished" => {
5471            base.insert("state".to_string(), Value::String("finished".to_string()));
5472            base.insert("phase".to_string(), Value::String("run".to_string()));
5473            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
5474                base.insert("result".to_string(), Value::String(status.to_string()));
5475            }
5476            base.insert(
5477                "eventType".to_string(),
5478                Value::String("session.run.finished".to_string()),
5479            );
5480            Some(StatusIndexUpdate {
5481                key,
5482                value: Value::Object(base),
5483            })
5484        }
5485        "message.part.updated" => {
5486            let part = event.properties.get("part")?;
5487            let part_type = part.get("type").and_then(|v| v.as_str())?;
5488            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
5489            let (phase, tool_active) = match (part_type, part_state) {
5490                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
5491                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
5492                _ => return None,
5493            };
5494            base.insert("state".to_string(), Value::String("running".to_string()));
5495            base.insert("phase".to_string(), Value::String(phase.to_string()));
5496            base.insert("toolActive".to_string(), Value::Bool(tool_active));
5497            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
5498                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
5499            }
5500            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
5501                base.insert(
5502                    "toolState".to_string(),
5503                    Value::String(tool_state.to_string()),
5504                );
5505            }
5506            if let Some(tool_error) = part
5507                .get("error")
5508                .and_then(|v| v.as_str())
5509                .map(str::trim)
5510                .filter(|value| !value.is_empty())
5511            {
5512                base.insert(
5513                    "toolError".to_string(),
5514                    Value::String(tool_error.to_string()),
5515                );
5516            }
5517            if let Some(tool_call_id) = part
5518                .get("id")
5519                .and_then(|v| v.as_str())
5520                .map(str::trim)
5521                .filter(|value| !value.is_empty())
5522            {
5523                base.insert(
5524                    "toolCallID".to_string(),
5525                    Value::String(tool_call_id.to_string()),
5526                );
5527            }
5528            if let Some(args_preview) = part
5529                .get("args")
5530                .filter(|value| {
5531                    !value.is_null()
5532                        && !value.as_object().is_some_and(|map| map.is_empty())
5533                        && !value
5534                            .as_str()
5535                            .map(|text| text.trim().is_empty())
5536                            .unwrap_or(false)
5537                })
5538                .map(|value| truncate_text(&value.to_string(), 500))
5539            {
5540                base.insert(
5541                    "toolArgsPreview".to_string(),
5542                    Value::String(args_preview.to_string()),
5543                );
5544            }
5545            base.insert(
5546                "eventType".to_string(),
5547                Value::String("message.part.updated".to_string()),
5548            );
5549            Some(StatusIndexUpdate {
5550                key,
5551                value: Value::Object(base),
5552            })
5553        }
5554        _ => None,
5555    }
5556}
5557
5558pub async fn run_session_part_persister(state: AppState) {
5559    crate::app::tasks::run_session_part_persister(state).await
5560}
5561
5562pub async fn run_status_indexer(state: AppState) {
5563    crate::app::tasks::run_status_indexer(state).await
5564}
5565
5566pub async fn run_agent_team_supervisor(state: AppState) {
5567    crate::app::tasks::run_agent_team_supervisor(state).await
5568}
5569
5570pub async fn run_bug_monitor(state: AppState) {
5571    crate::app::tasks::run_bug_monitor(state).await
5572}
5573
5574pub async fn run_usage_aggregator(state: AppState) {
5575    crate::app::tasks::run_usage_aggregator(state).await
5576}
5577
5578pub async fn run_optimization_scheduler(state: AppState) {
5579    crate::app::tasks::run_optimization_scheduler(state).await
5580}
5581
5582pub async fn process_bug_monitor_event(
5583    state: &AppState,
5584    event: &EngineEvent,
5585    config: &BugMonitorConfig,
5586) -> anyhow::Result<BugMonitorIncidentRecord> {
5587    let submission =
5588        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
5589            .await?;
5590    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5591        state,
5592        submission.repo.as_deref().unwrap_or_default(),
5593        submission.fingerprint.as_deref().unwrap_or_default(),
5594        submission.title.as_deref(),
5595        submission.detail.as_deref(),
5596        &submission.excerpt,
5597        3,
5598    )
5599    .await;
5600    let fingerprint = submission
5601        .fingerprint
5602        .clone()
5603        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5604    let default_workspace_root = state.workspace_index.snapshot().await.root;
5605    let workspace_root = config
5606        .workspace_root
5607        .clone()
5608        .unwrap_or(default_workspace_root);
5609    let now = now_ms();
5610
5611    let existing = state
5612        .bug_monitor_incidents
5613        .read()
5614        .await
5615        .values()
5616        .find(|row| row.fingerprint == fingerprint)
5617        .cloned();
5618
5619    let mut incident = if let Some(mut row) = existing {
5620        row.occurrence_count = row.occurrence_count.saturating_add(1);
5621        row.updated_at_ms = now;
5622        row.last_seen_at_ms = Some(now);
5623        if row.excerpt.is_empty() {
5624            row.excerpt = submission.excerpt.clone();
5625        }
5626        row
5627    } else {
5628        BugMonitorIncidentRecord {
5629            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5630            fingerprint: fingerprint.clone(),
5631            event_type: event.event_type.clone(),
5632            status: "queued".to_string(),
5633            repo: submission.repo.clone().unwrap_or_default(),
5634            workspace_root,
5635            title: submission
5636                .title
5637                .clone()
5638                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5639            detail: submission.detail.clone(),
5640            excerpt: submission.excerpt.clone(),
5641            source: submission.source.clone(),
5642            run_id: submission.run_id.clone(),
5643            session_id: submission.session_id.clone(),
5644            correlation_id: submission.correlation_id.clone(),
5645            component: submission.component.clone(),
5646            level: submission.level.clone(),
5647            occurrence_count: 1,
5648            created_at_ms: now,
5649            updated_at_ms: now,
5650            last_seen_at_ms: Some(now),
5651            draft_id: None,
5652            triage_run_id: None,
5653            last_error: None,
5654            duplicate_summary: None,
5655            duplicate_matches: None,
5656            event_payload: Some(event.properties.clone()),
5657        }
5658    };
5659    state.put_bug_monitor_incident(incident.clone()).await?;
5660
5661    if !duplicate_matches.is_empty() {
5662        incident.status = "duplicate_suppressed".to_string();
5663        let duplicate_summary =
5664            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5665        incident.duplicate_summary = Some(duplicate_summary.clone());
5666        incident.duplicate_matches = Some(duplicate_matches.clone());
5667        incident.updated_at_ms = now_ms();
5668        state.put_bug_monitor_incident(incident.clone()).await?;
5669        state.event_bus.publish(EngineEvent::new(
5670            "bug_monitor.incident.duplicate_suppressed",
5671            serde_json::json!({
5672                "incident_id": incident.incident_id,
5673                "fingerprint": incident.fingerprint,
5674                "eventType": incident.event_type,
5675                "status": incident.status,
5676                "duplicate_summary": duplicate_summary,
5677                "duplicate_matches": duplicate_matches,
5678            }),
5679        ));
5680        return Ok(incident);
5681    }
5682
5683    let draft = match state.submit_bug_monitor_draft(submission).await {
5684        Ok(draft) => draft,
5685        Err(error) => {
5686            incident.status = "draft_failed".to_string();
5687            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5688            incident.updated_at_ms = now_ms();
5689            state.put_bug_monitor_incident(incident.clone()).await?;
5690            state.event_bus.publish(EngineEvent::new(
5691                "bug_monitor.incident.detected",
5692                serde_json::json!({
5693                    "incident_id": incident.incident_id,
5694                    "fingerprint": incident.fingerprint,
5695                    "eventType": incident.event_type,
5696                    "draft_id": incident.draft_id,
5697                    "triage_run_id": incident.triage_run_id,
5698                    "status": incident.status,
5699                    "detail": incident.last_error,
5700                }),
5701            ));
5702            return Ok(incident);
5703        }
5704    };
5705    incident.draft_id = Some(draft.draft_id.clone());
5706    incident.status = "draft_created".to_string();
5707    state.put_bug_monitor_incident(incident.clone()).await?;
5708
5709    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5710        state.clone(),
5711        &draft.draft_id,
5712        true,
5713    )
5714    .await
5715    {
5716        Ok((updated_draft, _run_id, _deduped)) => {
5717            incident.triage_run_id = updated_draft.triage_run_id.clone();
5718            if incident.triage_run_id.is_some() {
5719                incident.status = "triage_queued".to_string();
5720            }
5721            incident.last_error = None;
5722        }
5723        Err(error) => {
5724            incident.status = "draft_created".to_string();
5725            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5726        }
5727    }
5728
5729    if let Some(draft_id) = incident.draft_id.clone() {
5730        let latest_draft = state
5731            .get_bug_monitor_draft(&draft_id)
5732            .await
5733            .unwrap_or(draft.clone());
5734        match crate::bug_monitor_github::publish_draft(
5735            state,
5736            &draft_id,
5737            Some(&incident.incident_id),
5738            crate::bug_monitor_github::PublishMode::Auto,
5739        )
5740        .await
5741        {
5742            Ok(outcome) => {
5743                incident.status = outcome.action;
5744                incident.last_error = None;
5745            }
5746            Err(error) => {
5747                let detail = truncate_text(&error.to_string(), 500);
5748                incident.last_error = Some(detail.clone());
5749                let mut failed_draft = latest_draft;
5750                failed_draft.status = "github_post_failed".to_string();
5751                failed_draft.github_status = Some("github_post_failed".to_string());
5752                failed_draft.last_post_error = Some(detail.clone());
5753                let evidence_digest = failed_draft.evidence_digest.clone();
5754                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5755                let _ = crate::bug_monitor_github::record_post_failure(
5756                    state,
5757                    &failed_draft,
5758                    Some(&incident.incident_id),
5759                    "auto_post",
5760                    evidence_digest.as_deref(),
5761                    &detail,
5762                )
5763                .await;
5764            }
5765        }
5766    }
5767
5768    incident.updated_at_ms = now_ms();
5769    state.put_bug_monitor_incident(incident.clone()).await?;
5770    state.event_bus.publish(EngineEvent::new(
5771        "bug_monitor.incident.detected",
5772        serde_json::json!({
5773            "incident_id": incident.incident_id,
5774            "fingerprint": incident.fingerprint,
5775            "eventType": incident.event_type,
5776            "draft_id": incident.draft_id,
5777            "triage_run_id": incident.triage_run_id,
5778            "status": incident.status,
5779        }),
5780    ));
5781    Ok(incident)
5782}
5783
5784pub fn sha256_hex(parts: &[&str]) -> String {
5785    let mut hasher = Sha256::new();
5786    for part in parts {
5787        hasher.update(part.as_bytes());
5788        hasher.update([0u8]);
5789    }
5790    format!("{:x}", hasher.finalize())
5791}
5792
5793fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
5794    matches!(status, AutomationRunStatus::Running)
5795}
5796
5797fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
5798    matches!(
5799        status,
5800        AutomationRunStatus::Running
5801            | AutomationRunStatus::Pausing
5802            | AutomationRunStatus::Paused
5803            | AutomationRunStatus::AwaitingApproval
5804    )
5805}
5806
5807pub async fn run_routine_scheduler(state: AppState) {
5808    crate::app::tasks::run_routine_scheduler(state).await
5809}
5810
5811pub async fn run_routine_executor(state: AppState) {
5812    crate::app::tasks::run_routine_executor(state).await
5813}
5814
5815pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
5816    crate::app::routines::build_routine_prompt(state, run).await
5817}
5818
5819pub fn truncate_text(input: &str, max_len: usize) -> String {
5820    if input.len() <= max_len {
5821        return input.to_string();
5822    }
5823    let mut out = input[..max_len].to_string();
5824    out.push_str("...<truncated>");
5825    out
5826}
5827
5828pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
5829    crate::app::routines::append_configured_output_artifacts(state, run).await
5830}
5831
5832pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
5833    let provider_id = config
5834        .get("default_provider")
5835        .and_then(|v| v.as_str())
5836        .map(str::trim)
5837        .filter(|v| !v.is_empty())?;
5838    let model_id = config
5839        .get("providers")
5840        .and_then(|v| v.get(provider_id))
5841        .and_then(|v| v.get("default_model"))
5842        .and_then(|v| v.as_str())
5843        .map(str::trim)
5844        .filter(|v| !v.is_empty())?;
5845    Some(ModelSpec {
5846        provider_id: provider_id.to_string(),
5847        model_id: model_id.to_string(),
5848    })
5849}
5850
5851pub async fn resolve_routine_model_spec_for_run(
5852    state: &AppState,
5853    run: &RoutineRunRecord,
5854) -> (Option<ModelSpec>, String) {
5855    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
5856}
5857
5858fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
5859    let mut out = Vec::new();
5860    let mut seen = std::collections::HashSet::new();
5861    for item in raw {
5862        let normalized = item.trim().to_string();
5863        if normalized.is_empty() {
5864            continue;
5865        }
5866        if seen.insert(normalized.clone()) {
5867            out.push(normalized);
5868        }
5869    }
5870    out
5871}
5872
5873#[cfg(not(feature = "browser"))]
5874impl AppState {
5875    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
5876        0
5877    }
5878
5879    pub async fn close_all_browser_sessions(&self) -> usize {
5880        0
5881    }
5882
5883    pub async fn browser_status(&self) -> serde_json::Value {
5884        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
5885    }
5886
5887    pub async fn browser_smoke_test(
5888        &self,
5889        _url: Option<String>,
5890    ) -> anyhow::Result<serde_json::Value> {
5891        anyhow::bail!("browser feature disabled")
5892    }
5893
5894    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
5895        anyhow::bail!("browser feature disabled")
5896    }
5897
5898    pub async fn browser_health_summary(&self) -> serde_json::Value {
5899        serde_json::json!({ "enabled": false })
5900    }
5901}
5902
5903pub mod automation;
5904pub use automation::*;
5905
5906#[cfg(test)]
5907mod tests;