Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
22use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
23use tandem_memory::db::MemoryDatabase;
24use tandem_providers::ChatMessage;
25use tandem_workflows::{
26    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
27    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
28    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
29};
30
31use crate::agent_teams::AgentTeamRuntime;
32use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
33use crate::automation_v2::types::*;
34use crate::bug_monitor::types::*;
35use crate::capability_resolver::CapabilityResolver;
36use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
37use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
38use crate::pack_manager::PackManager;
39use crate::preset_registry::PresetRegistry;
40use crate::routines::{errors::RoutineStoreError, types::*};
41use crate::runtime::{
42    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
43};
44use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
45use crate::util::{host::detect_host_runtime_context, time::now_ms};
46use crate::{
47    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
48    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
49    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
50    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
51    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
52    OptimizationPromotionDecisionKind,
53};
54
55#[derive(Clone)]
56pub struct AppState {
57    pub runtime: Arc<OnceLock<RuntimeState>>,
58    pub startup: Arc<RwLock<StartupState>>,
59    pub in_process_mode: Arc<AtomicBool>,
60    pub api_token: Arc<RwLock<Option<String>>>,
61    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
62    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
63    pub run_registry: RunRegistry,
64    pub run_stale_ms: u64,
65    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
66    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
67    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
68    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
69    pub shared_resources_path: PathBuf,
70    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
71    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
72    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
73    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
74    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
75    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
76    pub workflow_plan_drafts:
77        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
78    pub optimization_campaigns:
79        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
80    pub optimization_experiments:
81        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
82    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
83    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
84    pub bug_monitor_incidents:
85        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
86    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
87    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
88    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
89    pub workflows: Arc<RwLock<WorkflowRegistry>>,
90    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
91    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
92    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
93    pub routine_session_policies:
94        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
95    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
96    pub token_cost_per_1k_usd: f64,
97    pub routines_path: PathBuf,
98    pub routine_history_path: PathBuf,
99    pub routine_runs_path: PathBuf,
100    pub automations_v2_path: PathBuf,
101    pub automation_v2_runs_path: PathBuf,
102    pub optimization_campaigns_path: PathBuf,
103    pub optimization_experiments_path: PathBuf,
104    pub bug_monitor_config_path: PathBuf,
105    pub bug_monitor_drafts_path: PathBuf,
106    pub bug_monitor_incidents_path: PathBuf,
107    pub bug_monitor_posts_path: PathBuf,
108    pub external_actions_path: PathBuf,
109    pub workflow_runs_path: PathBuf,
110    pub workflow_hook_overrides_path: PathBuf,
111    pub agent_teams: AgentTeamRuntime,
112    pub web_ui_enabled: Arc<AtomicBool>,
113    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
114    pub server_base_url: Arc<std::sync::RwLock<String>>,
115    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
116    pub host_runtime_context: HostRuntimeContext,
117    pub pack_manager: Arc<PackManager>,
118    pub capability_resolver: Arc<CapabilityResolver>,
119    pub preset_registry: Arc<PresetRegistry>,
120}
121#[derive(Debug, Clone, Serialize, Deserialize, Default)]
122pub struct ChannelStatus {
123    pub enabled: bool,
124    pub connected: bool,
125    pub last_error: Option<String>,
126    pub active_sessions: u64,
127    pub meta: Value,
128}
129#[derive(Debug, Clone, Serialize, Deserialize, Default)]
130struct EffectiveAppConfig {
131    #[serde(default)]
132    pub channels: ChannelsConfigFile,
133    #[serde(default)]
134    pub web_ui: WebUiConfig,
135    #[serde(default)]
136    pub browser: tandem_core::BrowserConfig,
137    #[serde(default)]
138    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
139}
140
141#[derive(Default)]
142pub struct ChannelRuntime {
143    pub listeners: Option<tokio::task::JoinSet<()>>,
144    pub statuses: std::collections::HashMap<String, ChannelStatus>,
145}
146
147#[derive(Debug, Clone)]
148pub struct StatusIndexUpdate {
149    pub key: String,
150    pub value: Value,
151}
152
153impl AppState {
154    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
155        Self {
156            runtime: Arc::new(OnceLock::new()),
157            startup: Arc::new(RwLock::new(StartupState {
158                status: StartupStatus::Starting,
159                phase: "boot".to_string(),
160                started_at_ms: now_ms(),
161                attempt_id,
162                last_error: None,
163            })),
164            in_process_mode: Arc::new(AtomicBool::new(in_process)),
165            api_token: Arc::new(RwLock::new(None)),
166            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
167            managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
168            run_registry: RunRegistry::new(),
169            run_stale_ms: config::env::resolve_run_stale_ms(),
170            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
171            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
172            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
173            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
174            shared_resources_path: config::paths::resolve_shared_resources_path(),
175            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
176            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
177            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
178            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
179            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
180            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
181            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
182            optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
183            optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
184            bug_monitor_config: Arc::new(
185                RwLock::new(config::env::resolve_bug_monitor_env_config()),
186            ),
187            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
188            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
189            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
190            external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
191            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
192            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
193            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
194            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
195            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
196            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
197            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
198            routines_path: config::paths::resolve_routines_path(),
199            routine_history_path: config::paths::resolve_routine_history_path(),
200            routine_runs_path: config::paths::resolve_routine_runs_path(),
201            automations_v2_path: config::paths::resolve_automations_v2_path(),
202            automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
203            optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
204            optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
205            bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
206            bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
207            bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
208            bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
209            external_actions_path: config::paths::resolve_external_actions_path(),
210            workflow_runs_path: config::paths::resolve_workflow_runs_path(),
211            workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
212            agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
213            web_ui_enabled: Arc::new(AtomicBool::new(false)),
214            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
215            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
216            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
217            host_runtime_context: detect_host_runtime_context(),
218            token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
219            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
220            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
221            preset_registry: Arc::new(PresetRegistry::new(
222                PackManager::default_root(),
223                resolve_shared_paths()
224                    .map(|paths| paths.canonical_root)
225                    .unwrap_or_else(|_| {
226                        dirs::home_dir()
227                            .unwrap_or_else(|| PathBuf::from("."))
228                            .join(".tandem")
229                    }),
230            )),
231        }
232    }
233
234    pub fn is_ready(&self) -> bool {
235        self.runtime.get().is_some()
236    }
237
238    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
239        for _ in 0..attempts {
240            if self.is_ready() {
241                return true;
242            }
243            let startup = self.startup_snapshot().await;
244            if matches!(startup.status, StartupStatus::Failed) {
245                return false;
246            }
247            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
248        }
249        self.is_ready()
250    }
251
252    pub fn mode_label(&self) -> &'static str {
253        if self.in_process_mode.load(Ordering::Relaxed) {
254            "in-process"
255        } else {
256            "sidecar"
257        }
258    }
259
260    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
261        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
262        if let Ok(mut guard) = self.web_ui_prefix.write() {
263            *guard = config::webui::normalize_web_ui_prefix(&prefix);
264        }
265    }
266
267    pub fn web_ui_enabled(&self) -> bool {
268        self.web_ui_enabled.load(Ordering::Relaxed)
269    }
270
271    pub fn web_ui_prefix(&self) -> String {
272        self.web_ui_prefix
273            .read()
274            .map(|v| v.clone())
275            .unwrap_or_else(|_| "/admin".to_string())
276    }
277
278    pub fn set_server_base_url(&self, base_url: String) {
279        if let Ok(mut guard) = self.server_base_url.write() {
280            *guard = base_url;
281        }
282    }
283
284    pub fn server_base_url(&self) -> String {
285        self.server_base_url
286            .read()
287            .map(|v| v.clone())
288            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
289    }
290
291    pub async fn api_token(&self) -> Option<String> {
292        self.api_token.read().await.clone()
293    }
294
295    pub async fn set_api_token(&self, token: Option<String>) {
296        *self.api_token.write().await = token;
297    }
298
299    pub async fn startup_snapshot(&self) -> StartupSnapshot {
300        let state = self.startup.read().await.clone();
301        StartupSnapshot {
302            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
303            status: state.status,
304            phase: state.phase,
305            started_at_ms: state.started_at_ms,
306            attempt_id: state.attempt_id,
307            last_error: state.last_error,
308        }
309    }
310
311    pub fn host_runtime_context(&self) -> HostRuntimeContext {
312        self.runtime
313            .get()
314            .map(|runtime| runtime.host_runtime_context.clone())
315            .unwrap_or_else(|| self.host_runtime_context.clone())
316    }
317
318    pub async fn set_phase(&self, phase: impl Into<String>) {
319        let mut startup = self.startup.write().await;
320        startup.phase = phase.into();
321    }
322
323    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
324        self.runtime
325            .set(runtime)
326            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
327        #[cfg(feature = "browser")]
328        self.register_browser_tools().await?;
329        self.tools
330            .register_tool(
331                "pack_builder".to_string(),
332                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
333            )
334            .await;
335        self.engine_loop
336            .set_spawn_agent_hook(std::sync::Arc::new(
337                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
338            ))
339            .await;
340        self.engine_loop
341            .set_tool_policy_hook(std::sync::Arc::new(
342                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
343            ))
344            .await;
345        self.engine_loop
346            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
347                self.clone(),
348            )))
349            .await;
350        let _ = self.load_shared_resources().await;
351        self.load_routines().await?;
352        let _ = self.load_routine_history().await;
353        let _ = self.load_routine_runs().await;
354        self.load_automations_v2().await?;
355        let _ = self.load_automation_v2_runs().await;
356        let _ = self.load_optimization_campaigns().await;
357        let _ = self.load_optimization_experiments().await;
358        let _ = self.load_bug_monitor_config().await;
359        let _ = self.load_bug_monitor_drafts().await;
360        let _ = self.load_bug_monitor_incidents().await;
361        let _ = self.load_bug_monitor_posts().await;
362        let _ = self.load_external_actions().await;
363        let _ = self.load_workflow_runs().await;
364        let _ = self.load_workflow_hook_overrides().await;
365        let _ = self.reload_workflows().await;
366        let workspace_root = self.workspace_index.snapshot().await.root;
367        let _ = self
368            .agent_teams
369            .ensure_loaded_for_workspace(&workspace_root)
370            .await;
371        let mut startup = self.startup.write().await;
372        startup.status = StartupStatus::Ready;
373        startup.phase = "ready".to_string();
374        startup.last_error = None;
375        Ok(())
376    }
377
378    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
379        let mut startup = self.startup.write().await;
380        startup.status = StartupStatus::Failed;
381        startup.phase = phase.into();
382        startup.last_error = Some(error.into());
383    }
384
385    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
386        let runtime = self.channels_runtime.lock().await;
387        runtime.statuses.clone()
388    }
389
390    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
391        let effective = self.config.get_effective_value().await;
392        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
393        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
394
395        let mut runtime = self.channels_runtime.lock().await;
396        if let Some(listeners) = runtime.listeners.as_mut() {
397            listeners.abort_all();
398        }
399        runtime.listeners = None;
400        runtime.statuses.clear();
401
402        let mut status_map = std::collections::HashMap::new();
403        status_map.insert(
404            "telegram".to_string(),
405            ChannelStatus {
406                enabled: parsed.channels.telegram.is_some(),
407                connected: false,
408                last_error: None,
409                active_sessions: 0,
410                meta: serde_json::json!({}),
411            },
412        );
413        status_map.insert(
414            "discord".to_string(),
415            ChannelStatus {
416                enabled: parsed.channels.discord.is_some(),
417                connected: false,
418                last_error: None,
419                active_sessions: 0,
420                meta: serde_json::json!({}),
421            },
422        );
423        status_map.insert(
424            "slack".to_string(),
425            ChannelStatus {
426                enabled: parsed.channels.slack.is_some(),
427                connected: false,
428                last_error: None,
429                active_sessions: 0,
430                meta: serde_json::json!({}),
431            },
432        );
433
434        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
435            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
436            runtime.listeners = Some(listeners);
437            for status in status_map.values_mut() {
438                if status.enabled {
439                    status.connected = true;
440                }
441            }
442        }
443
444        runtime.statuses = status_map.clone();
445        drop(runtime);
446
447        self.event_bus.publish(EngineEvent::new(
448            "channel.status.changed",
449            serde_json::json!({ "channels": status_map }),
450        ));
451        Ok(())
452    }
453
454    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
455        if !self.shared_resources_path.exists() {
456            return Ok(());
457        }
458        let raw = fs::read_to_string(&self.shared_resources_path).await?;
459        let parsed =
460            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
461                .unwrap_or_default();
462        let mut guard = self.shared_resources.write().await;
463        *guard = parsed;
464        Ok(())
465    }
466
467    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
468        if let Some(parent) = self.shared_resources_path.parent() {
469            fs::create_dir_all(parent).await?;
470        }
471        let payload = {
472            let guard = self.shared_resources.read().await;
473            serde_json::to_string_pretty(&*guard)?
474        };
475        fs::write(&self.shared_resources_path, payload).await?;
476        Ok(())
477    }
478
479    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
480        self.shared_resources.read().await.get(key).cloned()
481    }
482
483    pub async fn list_shared_resources(
484        &self,
485        prefix: Option<&str>,
486        limit: usize,
487    ) -> Vec<SharedResourceRecord> {
488        let limit = limit.clamp(1, 500);
489        let mut rows = self
490            .shared_resources
491            .read()
492            .await
493            .values()
494            .filter(|record| {
495                if let Some(prefix) = prefix {
496                    record.key.starts_with(prefix)
497                } else {
498                    true
499                }
500            })
501            .cloned()
502            .collect::<Vec<_>>();
503        rows.sort_by(|a, b| a.key.cmp(&b.key));
504        rows.truncate(limit);
505        rows
506    }
507
508    pub async fn put_shared_resource(
509        &self,
510        key: String,
511        value: Value,
512        if_match_rev: Option<u64>,
513        updated_by: String,
514        ttl_ms: Option<u64>,
515    ) -> Result<SharedResourceRecord, ResourceStoreError> {
516        if !is_valid_resource_key(&key) {
517            return Err(ResourceStoreError::InvalidKey { key });
518        }
519
520        let now = now_ms();
521        let mut guard = self.shared_resources.write().await;
522        let existing = guard.get(&key).cloned();
523
524        if let Some(expected) = if_match_rev {
525            let current = existing.as_ref().map(|row| row.rev);
526            if current != Some(expected) {
527                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
528                    key,
529                    expected_rev: Some(expected),
530                    current_rev: current,
531                }));
532            }
533        }
534
535        let next_rev = existing
536            .as_ref()
537            .map(|row| row.rev.saturating_add(1))
538            .unwrap_or(1);
539
540        let record = SharedResourceRecord {
541            key: key.clone(),
542            value,
543            rev: next_rev,
544            updated_at_ms: now,
545            updated_by,
546            ttl_ms,
547        };
548
549        let previous = guard.insert(key.clone(), record.clone());
550        drop(guard);
551
552        if let Err(error) = self.persist_shared_resources().await {
553            let mut rollback = self.shared_resources.write().await;
554            if let Some(previous) = previous {
555                rollback.insert(key, previous);
556            } else {
557                rollback.remove(&key);
558            }
559            return Err(ResourceStoreError::PersistFailed {
560                message: error.to_string(),
561            });
562        }
563
564        Ok(record)
565    }
566
567    pub async fn delete_shared_resource(
568        &self,
569        key: &str,
570        if_match_rev: Option<u64>,
571    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
572        if !is_valid_resource_key(key) {
573            return Err(ResourceStoreError::InvalidKey {
574                key: key.to_string(),
575            });
576        }
577
578        let mut guard = self.shared_resources.write().await;
579        let current = guard.get(key).cloned();
580        if let Some(expected) = if_match_rev {
581            let current_rev = current.as_ref().map(|row| row.rev);
582            if current_rev != Some(expected) {
583                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
584                    key: key.to_string(),
585                    expected_rev: Some(expected),
586                    current_rev,
587                }));
588            }
589        }
590
591        let removed = guard.remove(key);
592        drop(guard);
593
594        if let Err(error) = self.persist_shared_resources().await {
595            if let Some(record) = removed.clone() {
596                self.shared_resources
597                    .write()
598                    .await
599                    .insert(record.key.clone(), record);
600            }
601            return Err(ResourceStoreError::PersistFailed {
602                message: error.to_string(),
603            });
604        }
605
606        Ok(removed)
607    }
608
609    pub async fn load_routines(&self) -> anyhow::Result<()> {
610        if !self.routines_path.exists() {
611            return Ok(());
612        }
613        let raw = fs::read_to_string(&self.routines_path).await?;
614        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
615            Ok(parsed) => {
616                let mut guard = self.routines.write().await;
617                *guard = parsed;
618                Ok(())
619            }
620            Err(primary_err) => {
621                let backup_path = config::paths::sibling_backup_path(&self.routines_path);
622                if backup_path.exists() {
623                    let backup_raw = fs::read_to_string(&backup_path).await?;
624                    if let Ok(parsed_backup) = serde_json::from_str::<
625                        std::collections::HashMap<String, RoutineSpec>,
626                    >(&backup_raw)
627                    {
628                        let mut guard = self.routines.write().await;
629                        *guard = parsed_backup;
630                        return Ok(());
631                    }
632                }
633                Err(anyhow::anyhow!(
634                    "failed to parse routines store {}: {primary_err}",
635                    self.routines_path.display()
636                ))
637            }
638        }
639    }
640
641    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
642        if !self.routine_history_path.exists() {
643            return Ok(());
644        }
645        let raw = fs::read_to_string(&self.routine_history_path).await?;
646        let parsed = serde_json::from_str::<
647            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
648        >(&raw)
649        .unwrap_or_default();
650        let mut guard = self.routine_history.write().await;
651        *guard = parsed;
652        Ok(())
653    }
654
655    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
656        if !self.routine_runs_path.exists() {
657            return Ok(());
658        }
659        let raw = fs::read_to_string(&self.routine_runs_path).await?;
660        let parsed =
661            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
662                .unwrap_or_default();
663        let mut guard = self.routine_runs.write().await;
664        *guard = parsed;
665        Ok(())
666    }
667
668    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
669        if let Some(parent) = self.routines_path.parent() {
670            fs::create_dir_all(parent).await?;
671        }
672        let (payload, is_empty) = {
673            let guard = self.routines.read().await;
674            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
675        };
676        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
677            let existing_raw = fs::read_to_string(&self.routines_path)
678                .await
679                .unwrap_or_default();
680            let existing_has_rows = serde_json::from_str::<
681                std::collections::HashMap<String, RoutineSpec>,
682            >(&existing_raw)
683            .map(|rows| !rows.is_empty())
684            .unwrap_or(true);
685            if existing_has_rows {
686                return Err(anyhow::anyhow!(
687                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
688                    self.routines_path.display()
689                ));
690            }
691        }
692        let backup_path = config::paths::sibling_backup_path(&self.routines_path);
693        if self.routines_path.exists() {
694            let _ = fs::copy(&self.routines_path, &backup_path).await;
695        }
696        let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
697        fs::write(&tmp_path, payload).await?;
698        fs::rename(&tmp_path, &self.routines_path).await?;
699        Ok(())
700    }
701
702    pub async fn persist_routines(&self) -> anyhow::Result<()> {
703        self.persist_routines_inner(false).await
704    }
705
706    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
707        if let Some(parent) = self.routine_history_path.parent() {
708            fs::create_dir_all(parent).await?;
709        }
710        let payload = {
711            let guard = self.routine_history.read().await;
712            serde_json::to_string_pretty(&*guard)?
713        };
714        fs::write(&self.routine_history_path, payload).await?;
715        Ok(())
716    }
717
718    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
719        if let Some(parent) = self.routine_runs_path.parent() {
720            fs::create_dir_all(parent).await?;
721        }
722        let payload = {
723            let guard = self.routine_runs.read().await;
724            serde_json::to_string_pretty(&*guard)?
725        };
726        fs::write(&self.routine_runs_path, payload).await?;
727        Ok(())
728    }
729
730    pub async fn put_routine(
731        &self,
732        mut routine: RoutineSpec,
733    ) -> Result<RoutineSpec, RoutineStoreError> {
734        if routine.routine_id.trim().is_empty() {
735            return Err(RoutineStoreError::InvalidRoutineId {
736                routine_id: routine.routine_id,
737            });
738        }
739
740        routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
741        routine.output_targets = normalize_non_empty_list(routine.output_targets);
742
743        let now = now_ms();
744        let next_schedule_fire =
745            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
746                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
747                    detail: "invalid schedule or timezone".to_string(),
748                })?;
749        match routine.schedule {
750            RoutineSchedule::IntervalSeconds { seconds } => {
751                if seconds == 0 {
752                    return Err(RoutineStoreError::InvalidSchedule {
753                        detail: "interval_seconds must be > 0".to_string(),
754                    });
755                }
756                let _ = seconds;
757            }
758            RoutineSchedule::Cron { .. } => {}
759        }
760        if routine.next_fire_at_ms.is_none() {
761            routine.next_fire_at_ms = Some(next_schedule_fire);
762        }
763
764        let mut guard = self.routines.write().await;
765        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
766        drop(guard);
767
768        if let Err(error) = self.persist_routines().await {
769            let mut rollback = self.routines.write().await;
770            if let Some(previous) = previous {
771                rollback.insert(previous.routine_id.clone(), previous);
772            } else {
773                rollback.remove(&routine.routine_id);
774            }
775            return Err(RoutineStoreError::PersistFailed {
776                message: error.to_string(),
777            });
778        }
779
780        Ok(routine)
781    }
782
783    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
784        let mut rows = self
785            .routines
786            .read()
787            .await
788            .values()
789            .cloned()
790            .collect::<Vec<_>>();
791        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
792        rows
793    }
794
795    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
796        self.routines.read().await.get(routine_id).cloned()
797    }
798
799    pub async fn delete_routine(
800        &self,
801        routine_id: &str,
802    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
803        let mut guard = self.routines.write().await;
804        let removed = guard.remove(routine_id);
805        drop(guard);
806
807        let allow_empty_overwrite = self.routines.read().await.is_empty();
808        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
809            if let Some(removed) = removed.clone() {
810                self.routines
811                    .write()
812                    .await
813                    .insert(removed.routine_id.clone(), removed);
814            }
815            return Err(RoutineStoreError::PersistFailed {
816                message: error.to_string(),
817            });
818        }
819        Ok(removed)
820    }
821
822    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
823        let mut plans = Vec::new();
824        let mut guard = self.routines.write().await;
825        for routine in guard.values_mut() {
826            if routine.status != RoutineStatus::Active {
827                continue;
828            }
829            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
830                continue;
831            };
832            if now_ms < next_fire_at_ms {
833                continue;
834            }
835            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
836                now_ms,
837                next_fire_at_ms,
838                &routine.schedule,
839                &routine.timezone,
840                &routine.misfire_policy,
841            );
842            routine.next_fire_at_ms = Some(next_fire_at_ms);
843            if run_count == 0 {
844                continue;
845            }
846            plans.push(RoutineTriggerPlan {
847                routine_id: routine.routine_id.clone(),
848                run_count,
849                scheduled_at_ms: now_ms,
850                next_fire_at_ms,
851            });
852        }
853        drop(guard);
854        let _ = self.persist_routines().await;
855        plans
856    }
857
858    pub async fn mark_routine_fired(
859        &self,
860        routine_id: &str,
861        fired_at_ms: u64,
862    ) -> Option<RoutineSpec> {
863        let mut guard = self.routines.write().await;
864        let routine = guard.get_mut(routine_id)?;
865        routine.last_fired_at_ms = Some(fired_at_ms);
866        let updated = routine.clone();
867        drop(guard);
868        let _ = self.persist_routines().await;
869        Some(updated)
870    }
871
872    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
873        let mut history = self.routine_history.write().await;
874        history
875            .entry(event.routine_id.clone())
876            .or_default()
877            .push(event);
878        drop(history);
879        let _ = self.persist_routine_history().await;
880    }
881
882    pub async fn list_routine_history(
883        &self,
884        routine_id: &str,
885        limit: usize,
886    ) -> Vec<RoutineHistoryEvent> {
887        let limit = limit.clamp(1, 500);
888        let mut rows = self
889            .routine_history
890            .read()
891            .await
892            .get(routine_id)
893            .cloned()
894            .unwrap_or_default();
895        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
896        rows.truncate(limit);
897        rows
898    }
899
900    pub async fn create_routine_run(
901        &self,
902        routine: &RoutineSpec,
903        trigger_type: &str,
904        run_count: u32,
905        status: RoutineRunStatus,
906        detail: Option<String>,
907    ) -> RoutineRunRecord {
908        let now = now_ms();
909        let record = RoutineRunRecord {
910            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
911            routine_id: routine.routine_id.clone(),
912            trigger_type: trigger_type.to_string(),
913            run_count,
914            status,
915            created_at_ms: now,
916            updated_at_ms: now,
917            fired_at_ms: Some(now),
918            started_at_ms: None,
919            finished_at_ms: None,
920            requires_approval: routine.requires_approval,
921            approval_reason: None,
922            denial_reason: None,
923            paused_reason: None,
924            detail,
925            entrypoint: routine.entrypoint.clone(),
926            args: routine.args.clone(),
927            allowed_tools: routine.allowed_tools.clone(),
928            output_targets: routine.output_targets.clone(),
929            artifacts: Vec::new(),
930            active_session_ids: Vec::new(),
931            latest_session_id: None,
932            prompt_tokens: 0,
933            completion_tokens: 0,
934            total_tokens: 0,
935            estimated_cost_usd: 0.0,
936        };
937        self.routine_runs
938            .write()
939            .await
940            .insert(record.run_id.clone(), record.clone());
941        let _ = self.persist_routine_runs().await;
942        record
943    }
944
945    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
946        self.routine_runs.read().await.get(run_id).cloned()
947    }
948
949    pub async fn list_routine_runs(
950        &self,
951        routine_id: Option<&str>,
952        limit: usize,
953    ) -> Vec<RoutineRunRecord> {
954        let mut rows = self
955            .routine_runs
956            .read()
957            .await
958            .values()
959            .filter(|row| {
960                if let Some(id) = routine_id {
961                    row.routine_id == id
962                } else {
963                    true
964                }
965            })
966            .cloned()
967            .collect::<Vec<_>>();
968        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
969        rows.truncate(limit.clamp(1, 500));
970        rows
971    }
972
973    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
974        let mut guard = self.routine_runs.write().await;
975        let next_run_id = guard
976            .values()
977            .filter(|row| row.status == RoutineRunStatus::Queued)
978            .min_by(|a, b| {
979                a.created_at_ms
980                    .cmp(&b.created_at_ms)
981                    .then_with(|| a.run_id.cmp(&b.run_id))
982            })
983            .map(|row| row.run_id.clone())?;
984        let now = now_ms();
985        let row = guard.get_mut(&next_run_id)?;
986        row.status = RoutineRunStatus::Running;
987        row.updated_at_ms = now;
988        row.started_at_ms = Some(now);
989        let claimed = row.clone();
990        drop(guard);
991        let _ = self.persist_routine_runs().await;
992        Some(claimed)
993    }
994
995    pub async fn set_routine_session_policy(
996        &self,
997        session_id: String,
998        run_id: String,
999        routine_id: String,
1000        allowed_tools: Vec<String>,
1001    ) {
1002        let policy = RoutineSessionPolicy {
1003            session_id: session_id.clone(),
1004            run_id,
1005            routine_id,
1006            allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1007        };
1008        self.routine_session_policies
1009            .write()
1010            .await
1011            .insert(session_id, policy);
1012    }
1013
1014    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1015        self.routine_session_policies
1016            .read()
1017            .await
1018            .get(session_id)
1019            .cloned()
1020    }
1021
1022    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1023        self.routine_session_policies
1024            .write()
1025            .await
1026            .remove(session_id);
1027    }
1028
1029    pub async fn update_routine_run_status(
1030        &self,
1031        run_id: &str,
1032        status: RoutineRunStatus,
1033        reason: Option<String>,
1034    ) -> Option<RoutineRunRecord> {
1035        let mut guard = self.routine_runs.write().await;
1036        let row = guard.get_mut(run_id)?;
1037        row.status = status.clone();
1038        row.updated_at_ms = now_ms();
1039        match status {
1040            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1041            RoutineRunStatus::Running => {
1042                row.started_at_ms.get_or_insert_with(now_ms);
1043                if let Some(detail) = reason {
1044                    row.detail = Some(detail);
1045                }
1046            }
1047            RoutineRunStatus::Denied => row.denial_reason = reason,
1048            RoutineRunStatus::Paused => row.paused_reason = reason,
1049            RoutineRunStatus::Completed
1050            | RoutineRunStatus::Failed
1051            | RoutineRunStatus::Cancelled => {
1052                row.finished_at_ms = Some(now_ms());
1053                if let Some(detail) = reason {
1054                    row.detail = Some(detail);
1055                }
1056            }
1057            _ => {
1058                if let Some(detail) = reason {
1059                    row.detail = Some(detail);
1060                }
1061            }
1062        }
1063        let updated = row.clone();
1064        drop(guard);
1065        let _ = self.persist_routine_runs().await;
1066        Some(updated)
1067    }
1068
1069    pub async fn append_routine_run_artifact(
1070        &self,
1071        run_id: &str,
1072        artifact: RoutineRunArtifact,
1073    ) -> Option<RoutineRunRecord> {
1074        let mut guard = self.routine_runs.write().await;
1075        let row = guard.get_mut(run_id)?;
1076        row.updated_at_ms = now_ms();
1077        row.artifacts.push(artifact);
1078        let updated = row.clone();
1079        drop(guard);
1080        let _ = self.persist_routine_runs().await;
1081        Some(updated)
1082    }
1083
1084    pub async fn add_active_session_id(
1085        &self,
1086        run_id: &str,
1087        session_id: String,
1088    ) -> Option<RoutineRunRecord> {
1089        let mut guard = self.routine_runs.write().await;
1090        let row = guard.get_mut(run_id)?;
1091        if !row.active_session_ids.iter().any(|id| id == &session_id) {
1092            row.active_session_ids.push(session_id);
1093        }
1094        row.latest_session_id = row.active_session_ids.last().cloned();
1095        row.updated_at_ms = now_ms();
1096        let updated = row.clone();
1097        drop(guard);
1098        let _ = self.persist_routine_runs().await;
1099        Some(updated)
1100    }
1101
1102    pub async fn clear_active_session_id(
1103        &self,
1104        run_id: &str,
1105        session_id: &str,
1106    ) -> Option<RoutineRunRecord> {
1107        let mut guard = self.routine_runs.write().await;
1108        let row = guard.get_mut(run_id)?;
1109        row.active_session_ids.retain(|id| id != session_id);
1110        row.updated_at_ms = now_ms();
1111        let updated = row.clone();
1112        drop(guard);
1113        let _ = self.persist_routine_runs().await;
1114        Some(updated)
1115    }
1116
1117    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1118        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1119        let mut loaded_from_alternate = false;
1120        let mut migrated = false;
1121        let mut path_counts = Vec::new();
1122        let mut canonical_loaded = false;
1123        if self.automations_v2_path.exists() {
1124            let raw = fs::read_to_string(&self.automations_v2_path).await?;
1125            if raw.trim().is_empty() || raw.trim() == "{}" {
1126                path_counts.push((self.automations_v2_path.clone(), 0usize));
1127            } else {
1128                let parsed = parse_automation_v2_file(&raw);
1129                path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1130                canonical_loaded = !parsed.is_empty();
1131                merged = parsed;
1132            }
1133        } else {
1134            path_counts.push((self.automations_v2_path.clone(), 0usize));
1135        }
1136        if !canonical_loaded {
1137            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1138                if path == self.automations_v2_path {
1139                    continue;
1140                }
1141                if !path.exists() {
1142                    path_counts.push((path, 0usize));
1143                    continue;
1144                }
1145                let raw = fs::read_to_string(&path).await?;
1146                if raw.trim().is_empty() || raw.trim() == "{}" {
1147                    path_counts.push((path, 0usize));
1148                    continue;
1149                }
1150                let parsed = parse_automation_v2_file(&raw);
1151                path_counts.push((path.clone(), parsed.len()));
1152                if !parsed.is_empty() {
1153                    loaded_from_alternate = true;
1154                }
1155                for (automation_id, automation) in parsed {
1156                    match merged.get(&automation_id) {
1157                        Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1158                        _ => {
1159                            merged.insert(automation_id, automation);
1160                        }
1161                    }
1162                }
1163            }
1164        } else {
1165            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1166                if path == self.automations_v2_path {
1167                    continue;
1168                }
1169                if !path.exists() {
1170                    path_counts.push((path, 0usize));
1171                    continue;
1172                }
1173                let raw = fs::read_to_string(&path).await?;
1174                let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1175                    0usize
1176                } else {
1177                    parse_automation_v2_file(&raw).len()
1178                };
1179                path_counts.push((path, count));
1180            }
1181        }
1182        let active_path = self.automations_v2_path.display().to_string();
1183        let path_count_summary = path_counts
1184            .iter()
1185            .map(|(path, count)| format!("{}={count}", path.display()))
1186            .collect::<Vec<_>>();
1187        tracing::info!(
1188            active_path,
1189            canonical_loaded,
1190            path_counts = ?path_count_summary,
1191            merged_count = merged.len(),
1192            "loaded automation v2 definitions"
1193        );
1194        for automation in merged.values_mut() {
1195            migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1196        }
1197        *self.automations_v2.write().await = merged;
1198        if loaded_from_alternate || migrated {
1199            let _ = self.persist_automations_v2().await;
1200        } else if canonical_loaded {
1201            let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1202        }
1203        Ok(())
1204    }
1205
1206    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1207        let payload = {
1208            let guard = self.automations_v2.read().await;
1209            serde_json::to_string_pretty(&*guard)?
1210        };
1211        if let Some(parent) = self.automations_v2_path.parent() {
1212            fs::create_dir_all(parent).await?;
1213        }
1214        fs::write(&self.automations_v2_path, &payload).await?;
1215        let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1216        Ok(())
1217    }
1218
1219    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1220        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1221        let mut loaded_from_alternate = false;
1222        let mut path_counts = Vec::new();
1223        for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1224            if !path.exists() {
1225                path_counts.push((path, 0usize));
1226                continue;
1227            }
1228            let raw = fs::read_to_string(&path).await?;
1229            if raw.trim().is_empty() || raw.trim() == "{}" {
1230                path_counts.push((path, 0usize));
1231                continue;
1232            }
1233            let parsed = parse_automation_v2_runs_file(&raw);
1234            path_counts.push((path.clone(), parsed.len()));
1235            if path != self.automation_v2_runs_path {
1236                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1237            }
1238            for (run_id, run) in parsed {
1239                match merged.get(&run_id) {
1240                    Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1241                    _ => {
1242                        merged.insert(run_id, run);
1243                    }
1244                }
1245            }
1246        }
1247        let active_runs_path = self.automation_v2_runs_path.display().to_string();
1248        let run_path_count_summary = path_counts
1249            .iter()
1250            .map(|(path, count)| format!("{}={count}", path.display()))
1251            .collect::<Vec<_>>();
1252        tracing::info!(
1253            active_path = active_runs_path,
1254            path_counts = ?run_path_count_summary,
1255            merged_count = merged.len(),
1256            "loaded automation v2 runs"
1257        );
1258        *self.automation_v2_runs.write().await = merged;
1259        let recovered = self
1260            .recover_automation_definitions_from_run_snapshots()
1261            .await?;
1262        let automation_count = self.automations_v2.read().await.len();
1263        let run_count = self.automation_v2_runs.read().await.len();
1264        if automation_count == 0 && run_count > 0 {
1265            let active_automations_path = self.automations_v2_path.display().to_string();
1266            let active_runs_path = self.automation_v2_runs_path.display().to_string();
1267            tracing::warn!(
1268                active_automations_path,
1269                active_runs_path,
1270                run_count,
1271                "automation v2 definitions are empty while run history exists"
1272            );
1273        }
1274        if loaded_from_alternate || recovered > 0 {
1275            let _ = self.persist_automation_v2_runs().await;
1276        }
1277        Ok(())
1278    }
1279
1280    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1281        let payload = {
1282            let guard = self.automation_v2_runs.read().await;
1283            serde_json::to_string_pretty(&*guard)?
1284        };
1285        if let Some(parent) = self.automation_v2_runs_path.parent() {
1286            fs::create_dir_all(parent).await?;
1287        }
1288        fs::write(&self.automation_v2_runs_path, &payload).await?;
1289        Ok(())
1290    }
1291
1292    pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1293        if !self.optimization_campaigns_path.exists() {
1294            return Ok(());
1295        }
1296        let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1297        let parsed = parse_optimization_campaigns_file(&raw);
1298        *self.optimization_campaigns.write().await = parsed;
1299        Ok(())
1300    }
1301
1302    pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1303        let payload = {
1304            let guard = self.optimization_campaigns.read().await;
1305            serde_json::to_string_pretty(&*guard)?
1306        };
1307        if let Some(parent) = self.optimization_campaigns_path.parent() {
1308            fs::create_dir_all(parent).await?;
1309        }
1310        fs::write(&self.optimization_campaigns_path, payload).await?;
1311        Ok(())
1312    }
1313
1314    pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1315        if !self.optimization_experiments_path.exists() {
1316            return Ok(());
1317        }
1318        let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1319        let parsed = parse_optimization_experiments_file(&raw);
1320        *self.optimization_experiments.write().await = parsed;
1321        Ok(())
1322    }
1323
1324    pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1325        let payload = {
1326            let guard = self.optimization_experiments.read().await;
1327            serde_json::to_string_pretty(&*guard)?
1328        };
1329        if let Some(parent) = self.optimization_experiments_path.parent() {
1330            fs::create_dir_all(parent).await?;
1331        }
1332        fs::write(&self.optimization_experiments_path, payload).await?;
1333        Ok(())
1334    }
1335
1336    async fn verify_automation_v2_persisted(
1337        &self,
1338        automation_id: &str,
1339        expected_present: bool,
1340    ) -> anyhow::Result<()> {
1341        let active_raw = if self.automations_v2_path.exists() {
1342            fs::read_to_string(&self.automations_v2_path).await?
1343        } else {
1344            String::new()
1345        };
1346        let active_parsed = parse_automation_v2_file(&active_raw);
1347        let active_present = active_parsed.contains_key(automation_id);
1348        if active_present != expected_present {
1349            let active_path = self.automations_v2_path.display().to_string();
1350            tracing::error!(
1351                automation_id,
1352                expected_present,
1353                actual_present = active_present,
1354                count = active_parsed.len(),
1355                active_path,
1356                "automation v2 persistence verification failed"
1357            );
1358            anyhow::bail!(
1359                "automation v2 persistence verification failed for `{}`",
1360                automation_id
1361            );
1362        }
1363        let mut alternate_mismatches = Vec::new();
1364        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1365            if path == self.automations_v2_path {
1366                continue;
1367            }
1368            let raw = if path.exists() {
1369                fs::read_to_string(&path).await?
1370            } else {
1371                String::new()
1372            };
1373            let parsed = parse_automation_v2_file(&raw);
1374            let present = parsed.contains_key(automation_id);
1375            if present != expected_present {
1376                alternate_mismatches.push(format!(
1377                    "{} expected_present={} actual_present={} count={}",
1378                    path.display(),
1379                    expected_present,
1380                    present,
1381                    parsed.len()
1382                ));
1383            }
1384        }
1385        if !alternate_mismatches.is_empty() {
1386            let active_path = self.automations_v2_path.display().to_string();
1387            tracing::warn!(
1388                automation_id,
1389                expected_present,
1390                mismatches = ?alternate_mismatches,
1391                active_path,
1392                "automation v2 alternate persistence paths are stale"
1393            );
1394        }
1395        Ok(())
1396    }
1397
1398    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1399        let runs = self
1400            .automation_v2_runs
1401            .read()
1402            .await
1403            .values()
1404            .cloned()
1405            .collect::<Vec<_>>();
1406        let mut guard = self.automations_v2.write().await;
1407        let mut recovered = 0usize;
1408        for run in runs {
1409            let Some(snapshot) = run.automation_snapshot.clone() else {
1410                continue;
1411            };
1412            let should_replace = match guard.get(&run.automation_id) {
1413                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1414                None => true,
1415            };
1416            if should_replace {
1417                if !guard.contains_key(&run.automation_id) {
1418                    recovered += 1;
1419                }
1420                guard.insert(run.automation_id.clone(), snapshot);
1421            }
1422        }
1423        drop(guard);
1424        if recovered > 0 {
1425            let active_path = self.automations_v2_path.display().to_string();
1426            tracing::warn!(
1427                recovered,
1428                active_path,
1429                "recovered automation v2 definitions from run snapshots"
1430            );
1431            self.persist_automations_v2().await?;
1432        }
1433        Ok(recovered)
1434    }
1435
1436    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1437        let path = if self.bug_monitor_config_path.exists() {
1438            self.bug_monitor_config_path.clone()
1439        } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1440            .exists()
1441        {
1442            config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1443        } else {
1444            return Ok(());
1445        };
1446        let raw = fs::read_to_string(path).await?;
1447        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1448            .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1449        *self.bug_monitor_config.write().await = parsed;
1450        Ok(())
1451    }
1452
1453    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1454        if let Some(parent) = self.bug_monitor_config_path.parent() {
1455            fs::create_dir_all(parent).await?;
1456        }
1457        let payload = {
1458            let guard = self.bug_monitor_config.read().await;
1459            serde_json::to_string_pretty(&*guard)?
1460        };
1461        fs::write(&self.bug_monitor_config_path, payload).await?;
1462        Ok(())
1463    }
1464
1465    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1466        self.bug_monitor_config.read().await.clone()
1467    }
1468
1469    pub async fn put_bug_monitor_config(
1470        &self,
1471        mut config: BugMonitorConfig,
1472    ) -> anyhow::Result<BugMonitorConfig> {
1473        config.workspace_root = config
1474            .workspace_root
1475            .as_ref()
1476            .map(|v| v.trim().to_string())
1477            .filter(|v| !v.is_empty());
1478        if let Some(repo) = config.repo.as_ref() {
1479            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1480                anyhow::bail!("repo must be in owner/repo format");
1481            }
1482        }
1483        if let Some(server) = config.mcp_server.as_ref() {
1484            let servers = self.mcp.list().await;
1485            if !servers.contains_key(server) {
1486                anyhow::bail!("unknown mcp server `{server}`");
1487            }
1488        }
1489        if let Some(model_policy) = config.model_policy.as_ref() {
1490            crate::http::routines_automations::validate_model_policy(model_policy)
1491                .map_err(anyhow::Error::msg)?;
1492        }
1493        config.updated_at_ms = now_ms();
1494        *self.bug_monitor_config.write().await = config.clone();
1495        self.persist_bug_monitor_config().await?;
1496        Ok(config)
1497    }
1498
1499    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1500        let path = if self.bug_monitor_drafts_path.exists() {
1501            self.bug_monitor_drafts_path.clone()
1502        } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1503            .exists()
1504        {
1505            config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1506        } else {
1507            return Ok(());
1508        };
1509        let raw = fs::read_to_string(path).await?;
1510        let parsed =
1511            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1512                .unwrap_or_default();
1513        *self.bug_monitor_drafts.write().await = parsed;
1514        Ok(())
1515    }
1516
1517    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1518        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1519            fs::create_dir_all(parent).await?;
1520        }
1521        let payload = {
1522            let guard = self.bug_monitor_drafts.read().await;
1523            serde_json::to_string_pretty(&*guard)?
1524        };
1525        fs::write(&self.bug_monitor_drafts_path, payload).await?;
1526        Ok(())
1527    }
1528
1529    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1530        let path = if self.bug_monitor_incidents_path.exists() {
1531            self.bug_monitor_incidents_path.clone()
1532        } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1533            .exists()
1534        {
1535            config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1536        } else {
1537            return Ok(());
1538        };
1539        let raw = fs::read_to_string(path).await?;
1540        let parsed = serde_json::from_str::<
1541            std::collections::HashMap<String, BugMonitorIncidentRecord>,
1542        >(&raw)
1543        .unwrap_or_default();
1544        *self.bug_monitor_incidents.write().await = parsed;
1545        Ok(())
1546    }
1547
1548    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1549        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1550            fs::create_dir_all(parent).await?;
1551        }
1552        let payload = {
1553            let guard = self.bug_monitor_incidents.read().await;
1554            serde_json::to_string_pretty(&*guard)?
1555        };
1556        fs::write(&self.bug_monitor_incidents_path, payload).await?;
1557        Ok(())
1558    }
1559
1560    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1561        let path = if self.bug_monitor_posts_path.exists() {
1562            self.bug_monitor_posts_path.clone()
1563        } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1564            .exists()
1565        {
1566            config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1567        } else {
1568            return Ok(());
1569        };
1570        let raw = fs::read_to_string(path).await?;
1571        let parsed =
1572            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1573                .unwrap_or_default();
1574        *self.bug_monitor_posts.write().await = parsed;
1575        Ok(())
1576    }
1577
1578    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1579        if let Some(parent) = self.bug_monitor_posts_path.parent() {
1580            fs::create_dir_all(parent).await?;
1581        }
1582        let payload = {
1583            let guard = self.bug_monitor_posts.read().await;
1584            serde_json::to_string_pretty(&*guard)?
1585        };
1586        fs::write(&self.bug_monitor_posts_path, payload).await?;
1587        Ok(())
1588    }
1589
1590    pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1591        if !self.external_actions_path.exists() {
1592            return Ok(());
1593        }
1594        let raw = fs::read_to_string(&self.external_actions_path).await?;
1595        let parsed =
1596            serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1597                .unwrap_or_default();
1598        *self.external_actions.write().await = parsed;
1599        Ok(())
1600    }
1601
1602    pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1603        if let Some(parent) = self.external_actions_path.parent() {
1604            fs::create_dir_all(parent).await?;
1605        }
1606        let payload = {
1607            let guard = self.external_actions.read().await;
1608            serde_json::to_string_pretty(&*guard)?
1609        };
1610        fs::write(&self.external_actions_path, payload).await?;
1611        Ok(())
1612    }
1613
1614    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1615        let mut rows = self
1616            .bug_monitor_incidents
1617            .read()
1618            .await
1619            .values()
1620            .cloned()
1621            .collect::<Vec<_>>();
1622        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1623        rows.truncate(limit.clamp(1, 200));
1624        rows
1625    }
1626
1627    pub async fn get_bug_monitor_incident(
1628        &self,
1629        incident_id: &str,
1630    ) -> Option<BugMonitorIncidentRecord> {
1631        self.bug_monitor_incidents
1632            .read()
1633            .await
1634            .get(incident_id)
1635            .cloned()
1636    }
1637
1638    pub async fn put_bug_monitor_incident(
1639        &self,
1640        incident: BugMonitorIncidentRecord,
1641    ) -> anyhow::Result<BugMonitorIncidentRecord> {
1642        self.bug_monitor_incidents
1643            .write()
1644            .await
1645            .insert(incident.incident_id.clone(), incident.clone());
1646        self.persist_bug_monitor_incidents().await?;
1647        Ok(incident)
1648    }
1649
1650    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1651        let mut rows = self
1652            .bug_monitor_posts
1653            .read()
1654            .await
1655            .values()
1656            .cloned()
1657            .collect::<Vec<_>>();
1658        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1659        rows.truncate(limit.clamp(1, 200));
1660        rows
1661    }
1662
1663    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1664        self.bug_monitor_posts.read().await.get(post_id).cloned()
1665    }
1666
1667    pub async fn put_bug_monitor_post(
1668        &self,
1669        post: BugMonitorPostRecord,
1670    ) -> anyhow::Result<BugMonitorPostRecord> {
1671        self.bug_monitor_posts
1672            .write()
1673            .await
1674            .insert(post.post_id.clone(), post.clone());
1675        self.persist_bug_monitor_posts().await?;
1676        Ok(post)
1677    }
1678
1679    pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1680        let mut rows = self
1681            .external_actions
1682            .read()
1683            .await
1684            .values()
1685            .cloned()
1686            .collect::<Vec<_>>();
1687        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1688        rows.truncate(limit.clamp(1, 200));
1689        rows
1690    }
1691
1692    pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1693        self.external_actions.read().await.get(action_id).cloned()
1694    }
1695
1696    pub async fn put_external_action(
1697        &self,
1698        action: ExternalActionRecord,
1699    ) -> anyhow::Result<ExternalActionRecord> {
1700        self.external_actions
1701            .write()
1702            .await
1703            .insert(action.action_id.clone(), action.clone());
1704        self.persist_external_actions().await?;
1705        Ok(action)
1706    }
1707
1708    pub async fn record_external_action(
1709        &self,
1710        action: ExternalActionRecord,
1711    ) -> anyhow::Result<ExternalActionRecord> {
1712        let action = self.put_external_action(action).await?;
1713        if let Some(run_id) = action.routine_run_id.as_deref() {
1714            let artifact = RoutineRunArtifact {
1715                artifact_id: format!("external-action-{}", action.action_id),
1716                uri: format!("external-action://{}", action.action_id),
1717                kind: "external_action_receipt".to_string(),
1718                label: Some(format!("external action receipt: {}", action.operation)),
1719                created_at_ms: action.updated_at_ms,
1720                metadata: Some(json!({
1721                    "actionID": action.action_id,
1722                    "operation": action.operation,
1723                    "status": action.status,
1724                    "sourceKind": action.source_kind,
1725                    "sourceID": action.source_id,
1726                    "capabilityID": action.capability_id,
1727                    "target": action.target,
1728                })),
1729            };
1730            let _ = self
1731                .append_routine_run_artifact(run_id, artifact.clone())
1732                .await;
1733            if let Some(runtime) = self.runtime.get() {
1734                runtime.event_bus.publish(EngineEvent::new(
1735                    "routine.run.artifact_added",
1736                    json!({
1737                        "runID": run_id,
1738                        "artifact": artifact,
1739                    }),
1740                ));
1741            }
1742        }
1743        if let Some(context_run_id) = action.context_run_id.as_deref() {
1744            let payload = serde_json::to_value(&action)?;
1745            if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1746                self,
1747                context_run_id,
1748                &format!("external-action-{}", action.action_id),
1749                "external_action_receipt",
1750                &format!("external-actions/{}.json", action.action_id),
1751                &payload,
1752            )
1753            .await
1754            {
1755                tracing::warn!(
1756                    "failed to append external action artifact {} to context run {}: {}",
1757                    action.action_id,
1758                    context_run_id,
1759                    error
1760                );
1761            }
1762        }
1763        Ok(action)
1764    }
1765
1766    pub async fn update_bug_monitor_runtime_status(
1767        &self,
1768        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1769    ) -> BugMonitorRuntimeStatus {
1770        let mut guard = self.bug_monitor_runtime_status.write().await;
1771        update(&mut guard);
1772        guard.clone()
1773    }
1774
1775    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1776        let mut rows = self
1777            .bug_monitor_drafts
1778            .read()
1779            .await
1780            .values()
1781            .cloned()
1782            .collect::<Vec<_>>();
1783        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1784        rows.truncate(limit.clamp(1, 200));
1785        rows
1786    }
1787
1788    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1789        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1790    }
1791
1792    pub async fn put_bug_monitor_draft(
1793        &self,
1794        draft: BugMonitorDraftRecord,
1795    ) -> anyhow::Result<BugMonitorDraftRecord> {
1796        self.bug_monitor_drafts
1797            .write()
1798            .await
1799            .insert(draft.draft_id.clone(), draft.clone());
1800        self.persist_bug_monitor_drafts().await?;
1801        Ok(draft)
1802    }
1803
1804    pub async fn submit_bug_monitor_draft(
1805        &self,
1806        mut submission: BugMonitorSubmission,
1807    ) -> anyhow::Result<BugMonitorDraftRecord> {
1808        fn normalize_optional(value: Option<String>) -> Option<String> {
1809            value
1810                .map(|v| v.trim().to_string())
1811                .filter(|v| !v.is_empty())
1812        }
1813
1814        fn compute_fingerprint(parts: &[&str]) -> String {
1815            use std::hash::{Hash, Hasher};
1816
1817            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1818            for part in parts {
1819                part.hash(&mut hasher);
1820            }
1821            format!("{:016x}", hasher.finish())
1822        }
1823
1824        submission.repo = normalize_optional(submission.repo);
1825        submission.title = normalize_optional(submission.title);
1826        submission.detail = normalize_optional(submission.detail);
1827        submission.source = normalize_optional(submission.source);
1828        submission.run_id = normalize_optional(submission.run_id);
1829        submission.session_id = normalize_optional(submission.session_id);
1830        submission.correlation_id = normalize_optional(submission.correlation_id);
1831        submission.file_name = normalize_optional(submission.file_name);
1832        submission.process = normalize_optional(submission.process);
1833        submission.component = normalize_optional(submission.component);
1834        submission.event = normalize_optional(submission.event);
1835        submission.level = normalize_optional(submission.level);
1836        submission.fingerprint = normalize_optional(submission.fingerprint);
1837        submission.excerpt = submission
1838            .excerpt
1839            .into_iter()
1840            .map(|line| line.trim_end().to_string())
1841            .filter(|line| !line.is_empty())
1842            .take(50)
1843            .collect();
1844
1845        let config = self.bug_monitor_config().await;
1846        let repo = submission
1847            .repo
1848            .clone()
1849            .or(config.repo.clone())
1850            .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1851        if !is_valid_owner_repo_slug(&repo) {
1852            anyhow::bail!("Bug Monitor repo must be in owner/repo format");
1853        }
1854
1855        let title = submission.title.clone().unwrap_or_else(|| {
1856            if let Some(event) = submission.event.as_ref() {
1857                format!("Failure detected in {event}")
1858            } else if let Some(component) = submission.component.as_ref() {
1859                format!("Failure detected in {component}")
1860            } else if let Some(process) = submission.process.as_ref() {
1861                format!("Failure detected in {process}")
1862            } else if let Some(source) = submission.source.as_ref() {
1863                format!("Failure report from {source}")
1864            } else {
1865                "Failure report".to_string()
1866            }
1867        });
1868
1869        let mut detail_lines = Vec::new();
1870        if let Some(source) = submission.source.as_ref() {
1871            detail_lines.push(format!("source: {source}"));
1872        }
1873        if let Some(file_name) = submission.file_name.as_ref() {
1874            detail_lines.push(format!("file: {file_name}"));
1875        }
1876        if let Some(level) = submission.level.as_ref() {
1877            detail_lines.push(format!("level: {level}"));
1878        }
1879        if let Some(process) = submission.process.as_ref() {
1880            detail_lines.push(format!("process: {process}"));
1881        }
1882        if let Some(component) = submission.component.as_ref() {
1883            detail_lines.push(format!("component: {component}"));
1884        }
1885        if let Some(event) = submission.event.as_ref() {
1886            detail_lines.push(format!("event: {event}"));
1887        }
1888        if let Some(run_id) = submission.run_id.as_ref() {
1889            detail_lines.push(format!("run_id: {run_id}"));
1890        }
1891        if let Some(session_id) = submission.session_id.as_ref() {
1892            detail_lines.push(format!("session_id: {session_id}"));
1893        }
1894        if let Some(correlation_id) = submission.correlation_id.as_ref() {
1895            detail_lines.push(format!("correlation_id: {correlation_id}"));
1896        }
1897        if let Some(detail) = submission.detail.as_ref() {
1898            detail_lines.push(String::new());
1899            detail_lines.push(detail.clone());
1900        }
1901        if !submission.excerpt.is_empty() {
1902            if !detail_lines.is_empty() {
1903                detail_lines.push(String::new());
1904            }
1905            detail_lines.push("excerpt:".to_string());
1906            detail_lines.extend(submission.excerpt.iter().map(|line| format!("  {line}")));
1907        }
1908        let detail = if detail_lines.is_empty() {
1909            None
1910        } else {
1911            Some(detail_lines.join("\n"))
1912        };
1913
1914        let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
1915            compute_fingerprint(&[
1916                repo.as_str(),
1917                title.as_str(),
1918                detail.as_deref().unwrap_or(""),
1919                submission.source.as_deref().unwrap_or(""),
1920                submission.run_id.as_deref().unwrap_or(""),
1921                submission.session_id.as_deref().unwrap_or(""),
1922                submission.correlation_id.as_deref().unwrap_or(""),
1923            ])
1924        });
1925
1926        let mut drafts = self.bug_monitor_drafts.write().await;
1927        if let Some(existing) = drafts
1928            .values()
1929            .find(|row| row.repo == repo && row.fingerprint == fingerprint)
1930            .cloned()
1931        {
1932            return Ok(existing);
1933        }
1934
1935        let draft = BugMonitorDraftRecord {
1936            draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
1937            fingerprint,
1938            repo,
1939            status: if config.require_approval_for_new_issues {
1940                "approval_required".to_string()
1941            } else {
1942                "draft_ready".to_string()
1943            },
1944            created_at_ms: now_ms(),
1945            triage_run_id: None,
1946            issue_number: None,
1947            title: Some(title),
1948            detail,
1949            github_status: None,
1950            github_issue_url: None,
1951            github_comment_url: None,
1952            github_posted_at_ms: None,
1953            matched_issue_number: None,
1954            matched_issue_state: None,
1955            evidence_digest: None,
1956            last_post_error: None,
1957        };
1958        drafts.insert(draft.draft_id.clone(), draft.clone());
1959        drop(drafts);
1960        self.persist_bug_monitor_drafts().await?;
1961        Ok(draft)
1962    }
1963
1964    pub async fn update_bug_monitor_draft_status(
1965        &self,
1966        draft_id: &str,
1967        next_status: &str,
1968        reason: Option<&str>,
1969    ) -> anyhow::Result<BugMonitorDraftRecord> {
1970        let normalized_status = next_status.trim().to_ascii_lowercase();
1971        if normalized_status != "draft_ready" && normalized_status != "denied" {
1972            anyhow::bail!("unsupported Bug Monitor draft status");
1973        }
1974
1975        let mut drafts = self.bug_monitor_drafts.write().await;
1976        let Some(draft) = drafts.get_mut(draft_id) else {
1977            anyhow::bail!("Bug Monitor draft not found");
1978        };
1979        if !draft.status.eq_ignore_ascii_case("approval_required") {
1980            anyhow::bail!("Bug Monitor draft is not waiting for approval");
1981        }
1982        draft.status = normalized_status.clone();
1983        if let Some(reason) = reason
1984            .map(|value| value.trim())
1985            .filter(|value| !value.is_empty())
1986        {
1987            let next_detail = if let Some(detail) = draft.detail.as_ref() {
1988                format!("{detail}\n\noperator_note: {reason}")
1989            } else {
1990                format!("operator_note: {reason}")
1991            };
1992            draft.detail = Some(next_detail);
1993        }
1994        let updated = draft.clone();
1995        drop(drafts);
1996        self.persist_bug_monitor_drafts().await?;
1997
1998        let event_name = if normalized_status == "draft_ready" {
1999            "bug_monitor.draft.approved"
2000        } else {
2001            "bug_monitor.draft.denied"
2002        };
2003        self.event_bus.publish(EngineEvent::new(
2004            event_name,
2005            serde_json::json!({
2006                "draft_id": updated.draft_id,
2007                "repo": updated.repo,
2008                "status": updated.status,
2009                "reason": reason,
2010            }),
2011        ));
2012        Ok(updated)
2013    }
2014
2015    pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2016        let required_capabilities = vec![
2017            "github.list_issues".to_string(),
2018            "github.get_issue".to_string(),
2019            "github.create_issue".to_string(),
2020            "github.comment_on_issue".to_string(),
2021        ];
2022        let config = self.bug_monitor_config().await;
2023        let drafts = self.bug_monitor_drafts.read().await;
2024        let incidents = self.bug_monitor_incidents.read().await;
2025        let posts = self.bug_monitor_posts.read().await;
2026        let total_incidents = incidents.len();
2027        let pending_incidents = incidents
2028            .values()
2029            .filter(|row| {
2030                matches!(
2031                    row.status.as_str(),
2032                    "queued"
2033                        | "draft_created"
2034                        | "triage_queued"
2035                        | "analysis_queued"
2036                        | "triage_pending"
2037                        | "issue_draft_pending"
2038                )
2039            })
2040            .count();
2041        let pending_drafts = drafts
2042            .values()
2043            .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2044            .count();
2045        let pending_posts = posts
2046            .values()
2047            .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2048            .count();
2049        let last_activity_at_ms = drafts
2050            .values()
2051            .map(|row| row.created_at_ms)
2052            .chain(posts.values().map(|row| row.updated_at_ms))
2053            .max();
2054        drop(drafts);
2055        drop(incidents);
2056        drop(posts);
2057        let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2058        runtime.paused = config.paused;
2059        runtime.total_incidents = total_incidents;
2060        runtime.pending_incidents = pending_incidents;
2061        runtime.pending_posts = pending_posts;
2062
2063        let mut status = BugMonitorStatus {
2064            config: config.clone(),
2065            runtime,
2066            pending_drafts,
2067            pending_posts,
2068            last_activity_at_ms,
2069            ..BugMonitorStatus::default()
2070        };
2071        let repo_valid = config
2072            .repo
2073            .as_ref()
2074            .map(|repo| is_valid_owner_repo_slug(repo))
2075            .unwrap_or(false);
2076        let servers = self.mcp.list().await;
2077        let selected_server = config
2078            .mcp_server
2079            .as_ref()
2080            .and_then(|name| servers.get(name))
2081            .cloned();
2082        let provider_catalog = self.providers.list().await;
2083        let selected_model = config
2084            .model_policy
2085            .as_ref()
2086            .and_then(|policy| policy.get("default_model"))
2087            .and_then(crate::app::routines::parse_model_spec);
2088        let selected_model_ready = selected_model
2089            .as_ref()
2090            .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2091            .unwrap_or(false);
2092        let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2093            self.mcp.server_tools(server_name).await
2094        } else {
2095            Vec::new()
2096        };
2097        let discovered_tools = self
2098            .capability_resolver
2099            .discover_from_runtime(selected_server_tools, Vec::new())
2100            .await;
2101        status.discovered_mcp_tools = discovered_tools
2102            .iter()
2103            .map(|row| row.tool_name.clone())
2104            .collect();
2105        let discovered_providers = discovered_tools
2106            .iter()
2107            .map(|row| row.provider.to_ascii_lowercase())
2108            .collect::<std::collections::HashSet<_>>();
2109        let provider_preference = match config.provider_preference {
2110            BugMonitorProviderPreference::OfficialGithub => {
2111                vec![
2112                    "mcp".to_string(),
2113                    "composio".to_string(),
2114                    "arcade".to_string(),
2115                ]
2116            }
2117            BugMonitorProviderPreference::Composio => {
2118                vec![
2119                    "composio".to_string(),
2120                    "mcp".to_string(),
2121                    "arcade".to_string(),
2122                ]
2123            }
2124            BugMonitorProviderPreference::Arcade => {
2125                vec![
2126                    "arcade".to_string(),
2127                    "mcp".to_string(),
2128                    "composio".to_string(),
2129                ]
2130            }
2131            BugMonitorProviderPreference::Auto => {
2132                vec![
2133                    "mcp".to_string(),
2134                    "composio".to_string(),
2135                    "arcade".to_string(),
2136                ]
2137            }
2138        };
2139        let capability_resolution = self
2140            .capability_resolver
2141            .resolve(
2142                crate::capability_resolver::CapabilityResolveInput {
2143                    workflow_id: Some("bug_monitor".to_string()),
2144                    required_capabilities: required_capabilities.clone(),
2145                    optional_capabilities: Vec::new(),
2146                    provider_preference,
2147                    available_tools: discovered_tools,
2148                },
2149                Vec::new(),
2150            )
2151            .await
2152            .ok();
2153        let bindings_file = self.capability_resolver.list_bindings().await.ok();
2154        if let Some(bindings) = bindings_file.as_ref() {
2155            status.binding_source_version = bindings.builtin_version.clone();
2156            status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2157            status.selected_server_binding_candidates = bindings
2158                .bindings
2159                .iter()
2160                .filter(|binding| required_capabilities.contains(&binding.capability_id))
2161                .filter(|binding| {
2162                    discovered_providers.is_empty()
2163                        || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2164                })
2165                .map(|binding| {
2166                    let binding_key = format!(
2167                        "{}::{}",
2168                        binding.capability_id,
2169                        binding.tool_name.to_ascii_lowercase()
2170                    );
2171                    let matched = capability_resolution
2172                        .as_ref()
2173                        .map(|resolution| {
2174                            resolution.resolved.iter().any(|row| {
2175                                row.capability_id == binding.capability_id
2176                                    && format!(
2177                                        "{}::{}",
2178                                        row.capability_id,
2179                                        row.tool_name.to_ascii_lowercase()
2180                                    ) == binding_key
2181                            })
2182                        })
2183                        .unwrap_or(false);
2184                    BugMonitorBindingCandidate {
2185                        capability_id: binding.capability_id.clone(),
2186                        binding_tool_name: binding.tool_name.clone(),
2187                        aliases: binding.tool_name_aliases.clone(),
2188                        matched,
2189                    }
2190                })
2191                .collect();
2192            status.selected_server_binding_candidates.sort_by(|a, b| {
2193                a.capability_id
2194                    .cmp(&b.capability_id)
2195                    .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2196            });
2197        }
2198        let capability_ready = |capability_id: &str| -> bool {
2199            capability_resolution
2200                .as_ref()
2201                .map(|resolved| {
2202                    resolved
2203                        .resolved
2204                        .iter()
2205                        .any(|row| row.capability_id == capability_id)
2206                })
2207                .unwrap_or(false)
2208        };
2209        if let Some(resolution) = capability_resolution.as_ref() {
2210            status.missing_required_capabilities = resolution.missing_required.clone();
2211            status.resolved_capabilities = resolution
2212                .resolved
2213                .iter()
2214                .map(|row| BugMonitorCapabilityMatch {
2215                    capability_id: row.capability_id.clone(),
2216                    provider: row.provider.clone(),
2217                    tool_name: row.tool_name.clone(),
2218                    binding_index: row.binding_index,
2219                })
2220                .collect();
2221        } else {
2222            status.missing_required_capabilities = required_capabilities.clone();
2223        }
2224        status.required_capabilities = BugMonitorCapabilityReadiness {
2225            github_list_issues: capability_ready("github.list_issues"),
2226            github_get_issue: capability_ready("github.get_issue"),
2227            github_create_issue: capability_ready("github.create_issue"),
2228            github_comment_on_issue: capability_ready("github.comment_on_issue"),
2229        };
2230        status.selected_model = selected_model;
2231        status.readiness = BugMonitorReadiness {
2232            config_valid: repo_valid
2233                && selected_server.is_some()
2234                && status.required_capabilities.github_list_issues
2235                && status.required_capabilities.github_get_issue
2236                && status.required_capabilities.github_create_issue
2237                && status.required_capabilities.github_comment_on_issue
2238                && selected_model_ready,
2239            repo_valid,
2240            mcp_server_present: selected_server.is_some(),
2241            mcp_connected: selected_server
2242                .as_ref()
2243                .map(|row| row.connected)
2244                .unwrap_or(false),
2245            github_read_ready: status.required_capabilities.github_list_issues
2246                && status.required_capabilities.github_get_issue,
2247            github_write_ready: status.required_capabilities.github_create_issue
2248                && status.required_capabilities.github_comment_on_issue,
2249            selected_model_ready,
2250            ingest_ready: config.enabled && !config.paused && repo_valid,
2251            publish_ready: config.enabled
2252                && !config.paused
2253                && repo_valid
2254                && selected_server
2255                    .as_ref()
2256                    .map(|row| row.connected)
2257                    .unwrap_or(false)
2258                && status.required_capabilities.github_list_issues
2259                && status.required_capabilities.github_get_issue
2260                && status.required_capabilities.github_create_issue
2261                && status.required_capabilities.github_comment_on_issue
2262                && selected_model_ready,
2263            runtime_ready: config.enabled
2264                && !config.paused
2265                && repo_valid
2266                && selected_server
2267                    .as_ref()
2268                    .map(|row| row.connected)
2269                    .unwrap_or(false)
2270                && status.required_capabilities.github_list_issues
2271                && status.required_capabilities.github_get_issue
2272                && status.required_capabilities.github_create_issue
2273                && status.required_capabilities.github_comment_on_issue
2274                && selected_model_ready,
2275        };
2276        if config.enabled {
2277            if config.paused {
2278                status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2279            } else if !repo_valid {
2280                status.last_error = Some("Target repo is missing or invalid.".to_string());
2281            } else if selected_server.is_none() {
2282                status.last_error = Some("Selected MCP server is missing.".to_string());
2283            } else if !status.readiness.mcp_connected {
2284                status.last_error = Some("Selected MCP server is disconnected.".to_string());
2285            } else if !selected_model_ready {
2286                status.last_error = Some(
2287                    "Selected provider/model is unavailable. Bug monitor is fail-closed."
2288                        .to_string(),
2289                );
2290            } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2291                let missing = if status.missing_required_capabilities.is_empty() {
2292                    "unknown".to_string()
2293                } else {
2294                    status.missing_required_capabilities.join(", ")
2295                };
2296                status.last_error = Some(format!(
2297                    "Selected MCP server is missing required GitHub capabilities: {missing}"
2298                ));
2299            }
2300        }
2301        status.runtime.monitoring_active = status.readiness.ingest_ready;
2302        status
2303    }
2304
2305    pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2306        if !self.workflow_runs_path.exists() {
2307            return Ok(());
2308        }
2309        let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2310        let parsed =
2311            serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2312                .unwrap_or_default();
2313        *self.workflow_runs.write().await = parsed;
2314        Ok(())
2315    }
2316
2317    pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2318        if let Some(parent) = self.workflow_runs_path.parent() {
2319            fs::create_dir_all(parent).await?;
2320        }
2321        let payload = {
2322            let guard = self.workflow_runs.read().await;
2323            serde_json::to_string_pretty(&*guard)?
2324        };
2325        fs::write(&self.workflow_runs_path, payload).await?;
2326        Ok(())
2327    }
2328
2329    pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2330        if !self.workflow_hook_overrides_path.exists() {
2331            return Ok(());
2332        }
2333        let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2334        let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2335            .unwrap_or_default();
2336        *self.workflow_hook_overrides.write().await = parsed;
2337        Ok(())
2338    }
2339
2340    pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2341        if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2342            fs::create_dir_all(parent).await?;
2343        }
2344        let payload = {
2345            let guard = self.workflow_hook_overrides.read().await;
2346            serde_json::to_string_pretty(&*guard)?
2347        };
2348        fs::write(&self.workflow_hook_overrides_path, payload).await?;
2349        Ok(())
2350    }
2351
2352    pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2353        let mut sources = Vec::new();
2354        sources.push(WorkflowLoadSource {
2355            root: config::paths::resolve_builtin_workflows_dir(),
2356            kind: WorkflowSourceKind::BuiltIn,
2357            pack_id: None,
2358        });
2359
2360        let workspace_root = self.workspace_index.snapshot().await.root;
2361        sources.push(WorkflowLoadSource {
2362            root: PathBuf::from(workspace_root).join(".tandem"),
2363            kind: WorkflowSourceKind::Workspace,
2364            pack_id: None,
2365        });
2366
2367        if let Ok(packs) = self.pack_manager.list().await {
2368            for pack in packs {
2369                sources.push(WorkflowLoadSource {
2370                    root: PathBuf::from(pack.install_path),
2371                    kind: WorkflowSourceKind::Pack,
2372                    pack_id: Some(pack.pack_id),
2373                });
2374            }
2375        }
2376
2377        let mut registry = load_workflow_registry(&sources)?;
2378        let overrides = self.workflow_hook_overrides.read().await.clone();
2379        for hook in &mut registry.hooks {
2380            if let Some(enabled) = overrides.get(&hook.binding_id) {
2381                hook.enabled = *enabled;
2382            }
2383        }
2384        for workflow in registry.workflows.values_mut() {
2385            workflow.hooks = registry
2386                .hooks
2387                .iter()
2388                .filter(|hook| hook.workflow_id == workflow.workflow_id)
2389                .cloned()
2390                .collect();
2391        }
2392        let messages = validate_workflow_registry(&registry);
2393        *self.workflows.write().await = registry;
2394        Ok(messages)
2395    }
2396
2397    pub async fn workflow_registry(&self) -> WorkflowRegistry {
2398        self.workflows.read().await.clone()
2399    }
2400
2401    pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2402        let mut rows = self
2403            .workflows
2404            .read()
2405            .await
2406            .workflows
2407            .values()
2408            .cloned()
2409            .collect::<Vec<_>>();
2410        rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2411        rows
2412    }
2413
2414    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2415        self.workflows
2416            .read()
2417            .await
2418            .workflows
2419            .get(workflow_id)
2420            .cloned()
2421    }
2422
2423    pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2424        let mut rows = self
2425            .workflows
2426            .read()
2427            .await
2428            .hooks
2429            .iter()
2430            .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2431            .cloned()
2432            .collect::<Vec<_>>();
2433        rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2434        rows
2435    }
2436
2437    pub async fn set_workflow_hook_enabled(
2438        &self,
2439        binding_id: &str,
2440        enabled: bool,
2441    ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2442        self.workflow_hook_overrides
2443            .write()
2444            .await
2445            .insert(binding_id.to_string(), enabled);
2446        self.persist_workflow_hook_overrides().await?;
2447        let _ = self.reload_workflows().await?;
2448        Ok(self
2449            .workflows
2450            .read()
2451            .await
2452            .hooks
2453            .iter()
2454            .find(|hook| hook.binding_id == binding_id)
2455            .cloned())
2456    }
2457
2458    pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2459        self.workflow_runs
2460            .write()
2461            .await
2462            .insert(run.run_id.clone(), run);
2463        self.persist_workflow_runs().await
2464    }
2465
2466    pub async fn update_workflow_run(
2467        &self,
2468        run_id: &str,
2469        update: impl FnOnce(&mut WorkflowRunRecord),
2470    ) -> Option<WorkflowRunRecord> {
2471        let mut guard = self.workflow_runs.write().await;
2472        let row = guard.get_mut(run_id)?;
2473        update(row);
2474        row.updated_at_ms = now_ms();
2475        if matches!(
2476            row.status,
2477            WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2478        ) {
2479            row.finished_at_ms.get_or_insert_with(now_ms);
2480        }
2481        let out = row.clone();
2482        drop(guard);
2483        let _ = self.persist_workflow_runs().await;
2484        Some(out)
2485    }
2486
2487    pub async fn list_workflow_runs(
2488        &self,
2489        workflow_id: Option<&str>,
2490        limit: usize,
2491    ) -> Vec<WorkflowRunRecord> {
2492        let mut rows = self
2493            .workflow_runs
2494            .read()
2495            .await
2496            .values()
2497            .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2498            .cloned()
2499            .collect::<Vec<_>>();
2500        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2501        rows.truncate(limit.clamp(1, 500));
2502        rows
2503    }
2504
2505    pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2506        self.workflow_runs.read().await.get(run_id).cloned()
2507    }
2508
2509    pub async fn put_automation_v2(
2510        &self,
2511        mut automation: AutomationV2Spec,
2512    ) -> anyhow::Result<AutomationV2Spec> {
2513        if automation.automation_id.trim().is_empty() {
2514            anyhow::bail!("automation_id is required");
2515        }
2516        for agent in &mut automation.agents {
2517            if agent.display_name.trim().is_empty() {
2518                agent.display_name = auto_generated_agent_name(&agent.agent_id);
2519            }
2520            agent.tool_policy.allowlist =
2521                config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2522            agent.tool_policy.denylist =
2523                config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2524            agent.mcp_policy.allowed_servers =
2525                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2526            agent.mcp_policy.allowed_tools = agent
2527                .mcp_policy
2528                .allowed_tools
2529                .take()
2530                .map(normalize_allowed_tools);
2531        }
2532        let now = now_ms();
2533        if automation.created_at_ms == 0 {
2534            automation.created_at_ms = now;
2535        }
2536        automation.updated_at_ms = now;
2537        if automation.next_fire_at_ms.is_none() {
2538            automation.next_fire_at_ms =
2539                automation_schedule_next_fire_at_ms(&automation.schedule, now);
2540        }
2541        migrate_bundled_studio_research_split_automation(&mut automation);
2542        self.automations_v2
2543            .write()
2544            .await
2545            .insert(automation.automation_id.clone(), automation.clone());
2546        self.persist_automations_v2().await?;
2547        self.verify_automation_v2_persisted(&automation.automation_id, true)
2548            .await?;
2549        Ok(automation)
2550    }
2551
2552    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2553        self.automations_v2.read().await.get(automation_id).cloned()
2554    }
2555
2556    pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2557        self.workflow_plans
2558            .write()
2559            .await
2560            .insert(plan.plan_id.clone(), plan);
2561    }
2562
2563    pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2564        self.workflow_plans.read().await.get(plan_id).cloned()
2565    }
2566
2567    pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2568        self.workflow_plan_drafts
2569            .write()
2570            .await
2571            .insert(draft.current_plan.plan_id.clone(), draft.clone());
2572        self.put_workflow_plan(draft.current_plan).await;
2573    }
2574
2575    pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2576        self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2577    }
2578
2579    pub async fn put_optimization_campaign(
2580        &self,
2581        mut campaign: OptimizationCampaignRecord,
2582    ) -> anyhow::Result<OptimizationCampaignRecord> {
2583        if campaign.optimization_id.trim().is_empty() {
2584            anyhow::bail!("optimization_id is required");
2585        }
2586        if campaign.source_workflow_id.trim().is_empty() {
2587            anyhow::bail!("source_workflow_id is required");
2588        }
2589        if campaign.name.trim().is_empty() {
2590            anyhow::bail!("name is required");
2591        }
2592        let now = now_ms();
2593        if campaign.created_at_ms == 0 {
2594            campaign.created_at_ms = now;
2595        }
2596        campaign.updated_at_ms = now;
2597        campaign.source_workflow_snapshot_hash =
2598            optimization_snapshot_hash(&campaign.source_workflow_snapshot);
2599        campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
2600        self.optimization_campaigns
2601            .write()
2602            .await
2603            .insert(campaign.optimization_id.clone(), campaign.clone());
2604        self.persist_optimization_campaigns().await?;
2605        Ok(campaign)
2606    }
2607
2608    pub async fn get_optimization_campaign(
2609        &self,
2610        optimization_id: &str,
2611    ) -> Option<OptimizationCampaignRecord> {
2612        self.optimization_campaigns
2613            .read()
2614            .await
2615            .get(optimization_id)
2616            .cloned()
2617    }
2618
2619    pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
2620        let mut rows = self
2621            .optimization_campaigns
2622            .read()
2623            .await
2624            .values()
2625            .cloned()
2626            .collect::<Vec<_>>();
2627        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2628        rows
2629    }
2630
2631    pub async fn put_optimization_experiment(
2632        &self,
2633        mut experiment: OptimizationExperimentRecord,
2634    ) -> anyhow::Result<OptimizationExperimentRecord> {
2635        if experiment.experiment_id.trim().is_empty() {
2636            anyhow::bail!("experiment_id is required");
2637        }
2638        if experiment.optimization_id.trim().is_empty() {
2639            anyhow::bail!("optimization_id is required");
2640        }
2641        let now = now_ms();
2642        if experiment.created_at_ms == 0 {
2643            experiment.created_at_ms = now;
2644        }
2645        experiment.updated_at_ms = now;
2646        experiment.candidate_snapshot_hash =
2647            optimization_snapshot_hash(&experiment.candidate_snapshot);
2648        self.optimization_experiments
2649            .write()
2650            .await
2651            .insert(experiment.experiment_id.clone(), experiment.clone());
2652        self.persist_optimization_experiments().await?;
2653        Ok(experiment)
2654    }
2655
2656    pub async fn get_optimization_experiment(
2657        &self,
2658        optimization_id: &str,
2659        experiment_id: &str,
2660    ) -> Option<OptimizationExperimentRecord> {
2661        self.optimization_experiments
2662            .read()
2663            .await
2664            .get(experiment_id)
2665            .filter(|row| row.optimization_id == optimization_id)
2666            .cloned()
2667    }
2668
2669    pub async fn list_optimization_experiments(
2670        &self,
2671        optimization_id: &str,
2672    ) -> Vec<OptimizationExperimentRecord> {
2673        let mut rows = self
2674            .optimization_experiments
2675            .read()
2676            .await
2677            .values()
2678            .filter(|row| row.optimization_id == optimization_id)
2679            .cloned()
2680            .collect::<Vec<_>>();
2681        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2682        rows
2683    }
2684
2685    pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
2686        self.optimization_experiments
2687            .read()
2688            .await
2689            .values()
2690            .filter(|row| row.optimization_id == optimization_id)
2691            .count()
2692    }
2693
2694    fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
2695        matches!(
2696            status,
2697            crate::AutomationRunStatus::Completed
2698                | crate::AutomationRunStatus::Blocked
2699                | crate::AutomationRunStatus::Failed
2700                | crate::AutomationRunStatus::Cancelled
2701        )
2702    }
2703
2704    fn optimization_consecutive_failure_count(
2705        experiments: &[OptimizationExperimentRecord],
2706    ) -> usize {
2707        let mut ordered = experiments.to_vec();
2708        ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
2709        ordered
2710            .iter()
2711            .rev()
2712            .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
2713            .count()
2714    }
2715
2716    fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
2717        match field {
2718            OptimizationMutableField::Objective => "objective",
2719            OptimizationMutableField::OutputContractSummaryGuidance => {
2720                "output_contract.summary_guidance"
2721            }
2722            OptimizationMutableField::TimeoutMs => "timeout_ms",
2723            OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
2724            OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
2725        }
2726    }
2727
2728    fn optimization_node_field_value(
2729        node: &crate::AutomationFlowNode,
2730        field: OptimizationMutableField,
2731    ) -> Result<Value, String> {
2732        match field {
2733            OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
2734            OptimizationMutableField::OutputContractSummaryGuidance => node
2735                .output_contract
2736                .as_ref()
2737                .and_then(|contract| contract.summary_guidance.clone())
2738                .map(Value::String)
2739                .ok_or_else(|| {
2740                    format!(
2741                        "node `{}` is missing output_contract.summary_guidance",
2742                        node.node_id
2743                    )
2744                }),
2745            OptimizationMutableField::TimeoutMs => node
2746                .timeout_ms
2747                .map(|value| json!(value))
2748                .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
2749            OptimizationMutableField::RetryPolicyMaxAttempts => node
2750                .retry_policy
2751                .as_ref()
2752                .and_then(Value::as_object)
2753                .and_then(|policy| policy.get("max_attempts"))
2754                .cloned()
2755                .ok_or_else(|| {
2756                    format!(
2757                        "node `{}` is missing retry_policy.max_attempts",
2758                        node.node_id
2759                    )
2760                }),
2761            OptimizationMutableField::RetryPolicyRetries => node
2762                .retry_policy
2763                .as_ref()
2764                .and_then(Value::as_object)
2765                .and_then(|policy| policy.get("retries"))
2766                .cloned()
2767                .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
2768        }
2769    }
2770
2771    fn set_optimization_node_field_value(
2772        node: &mut crate::AutomationFlowNode,
2773        field: OptimizationMutableField,
2774        value: &Value,
2775    ) -> Result<(), String> {
2776        match field {
2777            OptimizationMutableField::Objective => {
2778                node.objective = value
2779                    .as_str()
2780                    .ok_or_else(|| "objective apply value must be a string".to_string())?
2781                    .to_string();
2782            }
2783            OptimizationMutableField::OutputContractSummaryGuidance => {
2784                let guidance = value
2785                    .as_str()
2786                    .ok_or_else(|| {
2787                        "output_contract.summary_guidance apply value must be a string".to_string()
2788                    })?
2789                    .to_string();
2790                let contract = node.output_contract.as_mut().ok_or_else(|| {
2791                    format!(
2792                        "node `{}` is missing output_contract for apply",
2793                        node.node_id
2794                    )
2795                })?;
2796                contract.summary_guidance = Some(guidance);
2797            }
2798            OptimizationMutableField::TimeoutMs => {
2799                node.timeout_ms = Some(
2800                    value
2801                        .as_u64()
2802                        .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
2803                );
2804            }
2805            OptimizationMutableField::RetryPolicyMaxAttempts => {
2806                let next = value.as_i64().ok_or_else(|| {
2807                    "retry_policy.max_attempts apply value must be an integer".to_string()
2808                })?;
2809                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
2810                let object = policy.as_object_mut().ok_or_else(|| {
2811                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
2812                })?;
2813                object.insert("max_attempts".to_string(), json!(next));
2814            }
2815            OptimizationMutableField::RetryPolicyRetries => {
2816                let next = value.as_i64().ok_or_else(|| {
2817                    "retry_policy.retries apply value must be an integer".to_string()
2818                })?;
2819                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
2820                let object = policy.as_object_mut().ok_or_else(|| {
2821                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
2822                })?;
2823                object.insert("retries".to_string(), json!(next));
2824            }
2825        }
2826        Ok(())
2827    }
2828
2829    fn append_optimization_apply_metadata(
2830        metadata: Option<Value>,
2831        record: Value,
2832    ) -> Result<Option<Value>, String> {
2833        let mut root = match metadata {
2834            Some(Value::Object(map)) => map,
2835            Some(_) => return Err("automation metadata must be a JSON object".to_string()),
2836            None => serde_json::Map::new(),
2837        };
2838        let history = root
2839            .entry("optimization_apply_history".to_string())
2840            .or_insert_with(|| Value::Array(Vec::new()));
2841        let Some(entries) = history.as_array_mut() else {
2842            return Err("optimization_apply_history metadata must be an array".to_string());
2843        };
2844        entries.push(record.clone());
2845        root.insert("last_optimization_apply".to_string(), record);
2846        Ok(Some(Value::Object(root)))
2847    }
2848
2849    fn build_optimization_apply_patch(
2850        baseline: &crate::AutomationV2Spec,
2851        candidate: &crate::AutomationV2Spec,
2852        mutation: &crate::OptimizationValidatedMutation,
2853        approved_at_ms: u64,
2854    ) -> Result<Value, String> {
2855        let baseline_node = baseline
2856            .flow
2857            .nodes
2858            .iter()
2859            .find(|node| node.node_id == mutation.node_id)
2860            .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
2861        let candidate_node = candidate
2862            .flow
2863            .nodes
2864            .iter()
2865            .find(|node| node.node_id == mutation.node_id)
2866            .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
2867        let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
2868        let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
2869        Ok(json!({
2870            "node_id": mutation.node_id,
2871            "field": mutation.field,
2872            "field_path": Self::optimization_mutation_field_path(mutation.field),
2873            "expected_before": before,
2874            "apply_value": after,
2875            "approved_at_ms": approved_at_ms,
2876        }))
2877    }
2878
2879    pub async fn apply_optimization_winner(
2880        &self,
2881        optimization_id: &str,
2882        experiment_id: &str,
2883    ) -> Result<
2884        (
2885            OptimizationCampaignRecord,
2886            OptimizationExperimentRecord,
2887            crate::AutomationV2Spec,
2888        ),
2889        String,
2890    > {
2891        let campaign = self
2892            .get_optimization_campaign(optimization_id)
2893            .await
2894            .ok_or_else(|| "optimization not found".to_string())?;
2895        let mut experiment = self
2896            .get_optimization_experiment(optimization_id, experiment_id)
2897            .await
2898            .ok_or_else(|| "experiment not found".to_string())?;
2899        if experiment.status != OptimizationExperimentStatus::PromotionApproved {
2900            return Err("only approved winner experiments may be applied".to_string());
2901        }
2902        if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
2903            return Err(
2904                "only the latest approved winner may be applied to the live workflow".to_string(),
2905            );
2906        }
2907        let patch = experiment
2908            .metadata
2909            .as_ref()
2910            .and_then(|metadata| metadata.get("apply_patch"))
2911            .cloned()
2912            .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
2913        let node_id = patch
2914            .get("node_id")
2915            .and_then(Value::as_str)
2916            .map(str::to_string)
2917            .filter(|value| !value.is_empty())
2918            .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
2919        let field: OptimizationMutableField = serde_json::from_value(
2920            patch
2921                .get("field")
2922                .cloned()
2923                .ok_or_else(|| "apply_patch.field is required".to_string())?,
2924        )
2925        .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
2926        let expected_before = patch
2927            .get("expected_before")
2928            .cloned()
2929            .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
2930        let apply_value = patch
2931            .get("apply_value")
2932            .cloned()
2933            .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
2934        let mut live = self
2935            .get_automation_v2(&campaign.source_workflow_id)
2936            .await
2937            .ok_or_else(|| "source workflow not found".to_string())?;
2938        let current_value = {
2939            let live_node = live
2940                .flow
2941                .nodes
2942                .iter()
2943                .find(|node| node.node_id == node_id)
2944                .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
2945            Self::optimization_node_field_value(live_node, field)?
2946        };
2947        if current_value != expected_before {
2948            return Err(format!(
2949                "live workflow drift detected for node `{node_id}` {}",
2950                Self::optimization_mutation_field_path(field)
2951            ));
2952        }
2953        let live_node = live
2954            .flow
2955            .nodes
2956            .iter_mut()
2957            .find(|node| node.node_id == node_id)
2958            .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
2959        Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
2960        let applied_at_ms = now_ms();
2961        let apply_record = json!({
2962            "optimization_id": campaign.optimization_id,
2963            "experiment_id": experiment.experiment_id,
2964            "node_id": node_id,
2965            "field": field,
2966            "field_path": Self::optimization_mutation_field_path(field),
2967            "previous_value": expected_before,
2968            "new_value": apply_value,
2969            "applied_at_ms": applied_at_ms,
2970        });
2971        live.metadata =
2972            Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
2973        let stored_live = self
2974            .put_automation_v2(live)
2975            .await
2976            .map_err(|error| error.to_string())?;
2977        let mut metadata = match experiment.metadata.take() {
2978            Some(Value::Object(map)) => map,
2979            Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
2980            None => serde_json::Map::new(),
2981        };
2982        metadata.insert(
2983            "applied_to_live".to_string(),
2984            json!({
2985                "automation_id": stored_live.automation_id,
2986                "applied_at_ms": applied_at_ms,
2987                "field": field,
2988                "node_id": node_id,
2989            }),
2990        );
2991        experiment.metadata = Some(Value::Object(metadata));
2992        let stored_experiment = self
2993            .put_optimization_experiment(experiment)
2994            .await
2995            .map_err(|error| error.to_string())?;
2996        Ok((campaign, stored_experiment, stored_live))
2997    }
2998
2999    fn optimization_objective_hint(text: &str) -> String {
3000        let cleaned = text
3001            .lines()
3002            .map(str::trim)
3003            .filter(|line| !line.is_empty() && !line.starts_with('#'))
3004            .collect::<Vec<_>>()
3005            .join(" ");
3006        let hint = if cleaned.is_empty() {
3007            "Prioritize validator-complete output with explicit evidence."
3008        } else {
3009            cleaned.as_str()
3010        };
3011        let trimmed = hint.trim();
3012        let clipped = if trimmed.len() > 140 {
3013            trimmed[..140].trim_end()
3014        } else {
3015            trimmed
3016        };
3017        let mut sentence = clipped.trim_end_matches('.').to_string();
3018        if sentence.is_empty() {
3019            sentence = "Prioritize validator-complete output with explicit evidence".to_string();
3020        }
3021        sentence.push('.');
3022        sentence
3023    }
3024
3025    fn build_phase1_candidate_options(
3026        baseline: &crate::AutomationV2Spec,
3027        phase1: &crate::OptimizationPhase1Config,
3028    ) -> Vec<(
3029        crate::AutomationV2Spec,
3030        crate::OptimizationValidatedMutation,
3031    )> {
3032        let mut options = Vec::new();
3033        let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
3034        for (index, node) in baseline.flow.nodes.iter().enumerate() {
3035            if phase1
3036                .mutation_policy
3037                .allowed_text_fields
3038                .contains(&OptimizationMutableField::Objective)
3039            {
3040                let addition = if node.objective.contains(&hint) {
3041                    "Prioritize validator-complete output with concrete evidence."
3042                } else {
3043                    &hint
3044                };
3045                let mut candidate = baseline.clone();
3046                candidate.flow.nodes[index].objective =
3047                    format!("{} {}", node.objective.trim(), addition.trim())
3048                        .trim()
3049                        .to_string();
3050                if let Ok(validated) =
3051                    validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3052                {
3053                    options.push((candidate, validated));
3054                }
3055            }
3056            if phase1
3057                .mutation_policy
3058                .allowed_text_fields
3059                .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
3060            {
3061                if let Some(summary_guidance) = node
3062                    .output_contract
3063                    .as_ref()
3064                    .and_then(|contract| contract.summary_guidance.as_ref())
3065                {
3066                    let addition = if summary_guidance.contains("Cite concrete evidence") {
3067                        "Keep evidence explicit."
3068                    } else {
3069                        "Cite concrete evidence in the summary."
3070                    };
3071                    let mut candidate = baseline.clone();
3072                    if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
3073                        contract.summary_guidance = Some(
3074                            format!("{} {}", summary_guidance.trim(), addition)
3075                                .trim()
3076                                .to_string(),
3077                        );
3078                    }
3079                    if let Ok(validated) =
3080                        validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3081                    {
3082                        options.push((candidate, validated));
3083                    }
3084                }
3085            }
3086            if phase1
3087                .mutation_policy
3088                .allowed_knob_fields
3089                .contains(&OptimizationMutableField::TimeoutMs)
3090            {
3091                if let Some(timeout_ms) = node.timeout_ms {
3092                    let delta_by_percent = ((timeout_ms as f64)
3093                        * phase1.mutation_policy.timeout_delta_percent)
3094                        .round() as u64;
3095                    let delta = delta_by_percent
3096                        .min(phase1.mutation_policy.timeout_delta_ms)
3097                        .max(1);
3098                    let next = timeout_ms
3099                        .saturating_add(delta)
3100                        .min(phase1.mutation_policy.timeout_max_ms);
3101                    if next != timeout_ms {
3102                        let mut candidate = baseline.clone();
3103                        candidate.flow.nodes[index].timeout_ms = Some(next);
3104                        if let Ok(validated) =
3105                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3106                        {
3107                            options.push((candidate, validated));
3108                        }
3109                    }
3110                }
3111            }
3112            if phase1
3113                .mutation_policy
3114                .allowed_knob_fields
3115                .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
3116            {
3117                let current = node
3118                    .retry_policy
3119                    .as_ref()
3120                    .and_then(Value::as_object)
3121                    .and_then(|row| row.get("max_attempts"))
3122                    .and_then(Value::as_i64);
3123                if let Some(before) = current {
3124                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3125                    if next != before {
3126                        let mut candidate = baseline.clone();
3127                        let policy = candidate.flow.nodes[index]
3128                            .retry_policy
3129                            .get_or_insert_with(|| json!({}));
3130                        if let Some(object) = policy.as_object_mut() {
3131                            object.insert("max_attempts".to_string(), json!(next));
3132                        }
3133                        if let Ok(validated) =
3134                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3135                        {
3136                            options.push((candidate, validated));
3137                        }
3138                    }
3139                }
3140            }
3141            if phase1
3142                .mutation_policy
3143                .allowed_knob_fields
3144                .contains(&OptimizationMutableField::RetryPolicyRetries)
3145            {
3146                let current = node
3147                    .retry_policy
3148                    .as_ref()
3149                    .and_then(Value::as_object)
3150                    .and_then(|row| row.get("retries"))
3151                    .and_then(Value::as_i64);
3152                if let Some(before) = current {
3153                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3154                    if next != before {
3155                        let mut candidate = baseline.clone();
3156                        let policy = candidate.flow.nodes[index]
3157                            .retry_policy
3158                            .get_or_insert_with(|| json!({}));
3159                        if let Some(object) = policy.as_object_mut() {
3160                            object.insert("retries".to_string(), json!(next));
3161                        }
3162                        if let Ok(validated) =
3163                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3164                        {
3165                            options.push((candidate, validated));
3166                        }
3167                    }
3168                }
3169            }
3170        }
3171        options
3172    }
3173
3174    async fn maybe_queue_phase1_candidate_experiment(
3175        &self,
3176        campaign: &mut OptimizationCampaignRecord,
3177    ) -> Result<bool, String> {
3178        let Some(phase1) = campaign.phase1.as_ref() else {
3179            return Ok(false);
3180        };
3181        let experiment_count = self
3182            .count_optimization_experiments(&campaign.optimization_id)
3183            .await;
3184        if experiment_count >= phase1.budget.max_experiments as usize {
3185            campaign.status = OptimizationCampaignStatus::Completed;
3186            campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
3187            campaign.updated_at_ms = now_ms();
3188            return Ok(true);
3189        }
3190        if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
3191        {
3192            return Ok(false);
3193        }
3194        let existing = self
3195            .list_optimization_experiments(&campaign.optimization_id)
3196            .await;
3197        let active_eval_exists = existing.iter().any(|experiment| {
3198            matches!(experiment.status, OptimizationExperimentStatus::Draft)
3199                && experiment
3200                    .metadata
3201                    .as_ref()
3202                    .and_then(|metadata| metadata.get("eval_run_id"))
3203                    .and_then(Value::as_str)
3204                    .is_some()
3205        });
3206        if active_eval_exists {
3207            return Ok(false);
3208        }
3209        let existing_hashes = existing
3210            .iter()
3211            .map(|experiment| experiment.candidate_snapshot_hash.clone())
3212            .collect::<std::collections::HashSet<_>>();
3213        let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
3214        let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
3215            !existing_hashes.contains(&optimization_snapshot_hash(candidate))
3216        }) else {
3217            campaign.status = OptimizationCampaignStatus::Completed;
3218            campaign.last_pause_reason = Some(
3219                "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
3220            );
3221            campaign.updated_at_ms = now_ms();
3222            return Ok(true);
3223        };
3224        let eval_run = self
3225            .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
3226            .await
3227            .map_err(|error| error.to_string())?;
3228        let now = now_ms();
3229        let experiment = OptimizationExperimentRecord {
3230            experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
3231            optimization_id: campaign.optimization_id.clone(),
3232            status: OptimizationExperimentStatus::Draft,
3233            candidate_snapshot: candidate_snapshot.clone(),
3234            candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
3235            baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
3236            mutation_summary: Some(mutation.summary.clone()),
3237            metrics: None,
3238            phase1_metrics: None,
3239            promotion_recommendation: None,
3240            promotion_decision: None,
3241            created_at_ms: now,
3242            updated_at_ms: now,
3243            metadata: Some(json!({
3244                "generator": "phase1_deterministic_v1",
3245                "eval_run_id": eval_run.run_id,
3246                "mutation": mutation,
3247            })),
3248        };
3249        self.put_optimization_experiment(experiment)
3250            .await
3251            .map_err(|error| error.to_string())?;
3252        campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
3253        campaign.updated_at_ms = now_ms();
3254        Ok(true)
3255    }
3256
3257    async fn reconcile_phase1_candidate_experiments(
3258        &self,
3259        campaign: &mut OptimizationCampaignRecord,
3260    ) -> Result<bool, String> {
3261        let Some(phase1) = campaign.phase1.as_ref() else {
3262            return Ok(false);
3263        };
3264        let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
3265            return Ok(false);
3266        };
3267        let experiments = self
3268            .list_optimization_experiments(&campaign.optimization_id)
3269            .await;
3270        let mut changed = false;
3271        for mut experiment in experiments {
3272            if experiment.status != OptimizationExperimentStatus::Draft {
3273                continue;
3274            }
3275            let Some(eval_run_id) = experiment
3276                .metadata
3277                .as_ref()
3278                .and_then(|metadata| metadata.get("eval_run_id"))
3279                .and_then(Value::as_str)
3280                .map(str::to_string)
3281            else {
3282                continue;
3283            };
3284            let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
3285                continue;
3286            };
3287            if !Self::automation_run_is_terminal(&run.status) {
3288                continue;
3289            }
3290            if run.status != crate::AutomationRunStatus::Completed {
3291                experiment.status = OptimizationExperimentStatus::Failed;
3292                let mut metadata = match experiment.metadata.take() {
3293                    Some(Value::Object(map)) => map,
3294                    Some(_) => serde_json::Map::new(),
3295                    None => serde_json::Map::new(),
3296                };
3297                metadata.insert(
3298                    "eval_failure".to_string(),
3299                    json!({
3300                        "run_id": run.run_id,
3301                        "status": run.status,
3302                    }),
3303                );
3304                experiment.metadata = Some(Value::Object(metadata));
3305                self.put_optimization_experiment(experiment)
3306                    .await
3307                    .map_err(|error| error.to_string())?;
3308                changed = true;
3309                continue;
3310            }
3311            if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3312                experiment.status = OptimizationExperimentStatus::Failed;
3313                let mut metadata = match experiment.metadata.take() {
3314                    Some(Value::Object(map)) => map,
3315                    Some(_) => serde_json::Map::new(),
3316                    None => serde_json::Map::new(),
3317                };
3318                metadata.insert(
3319                    "eval_failure".to_string(),
3320                    json!({
3321                        "run_id": run.run_id,
3322                        "status": run.status,
3323                        "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
3324                    }),
3325                );
3326                experiment.metadata = Some(Value::Object(metadata));
3327                self.put_optimization_experiment(experiment)
3328                    .await
3329                    .map_err(|error| error.to_string())?;
3330                changed = true;
3331                continue;
3332            }
3333            let metrics =
3334                match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
3335                    Ok(metrics) => metrics,
3336                    Err(error) => {
3337                        experiment.status = OptimizationExperimentStatus::Failed;
3338                        let mut metadata = match experiment.metadata.take() {
3339                            Some(Value::Object(map)) => map,
3340                            Some(_) => serde_json::Map::new(),
3341                            None => serde_json::Map::new(),
3342                        };
3343                        metadata.insert(
3344                            "eval_failure".to_string(),
3345                            json!({
3346                                "run_id": run.run_id,
3347                                "status": run.status,
3348                                "reason": error,
3349                            }),
3350                        );
3351                        experiment.metadata = Some(Value::Object(metadata));
3352                        self.put_optimization_experiment(experiment)
3353                            .await
3354                            .map_err(|error| error.to_string())?;
3355                        changed = true;
3356                        continue;
3357                    }
3358                };
3359            let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
3360            experiment.phase1_metrics = Some(metrics.clone());
3361            experiment.metrics = Some(json!({
3362                "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
3363                "unmet_requirement_count": metrics.unmet_requirement_count,
3364                "blocked_node_rate": metrics.blocked_node_rate,
3365                "budget_within_limits": metrics.budget_within_limits,
3366            }));
3367            experiment.promotion_recommendation = Some(
3368                match decision.decision {
3369                    OptimizationPromotionDecisionKind::Promote => "promote",
3370                    OptimizationPromotionDecisionKind::Discard => "discard",
3371                    OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3372                        "needs_operator_review"
3373                    }
3374                }
3375                .to_string(),
3376            );
3377            experiment.promotion_decision = Some(decision.clone());
3378            match decision.decision {
3379                OptimizationPromotionDecisionKind::Promote
3380                | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3381                    experiment.status = OptimizationExperimentStatus::PromotionRecommended;
3382                    campaign.pending_promotion_experiment_id =
3383                        Some(experiment.experiment_id.clone());
3384                    campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
3385                    campaign.last_pause_reason = Some(decision.reason.clone());
3386                }
3387                OptimizationPromotionDecisionKind::Discard => {
3388                    experiment.status = OptimizationExperimentStatus::Discarded;
3389                    if campaign.status == OptimizationCampaignStatus::Running {
3390                        campaign.last_pause_reason = Some(decision.reason.clone());
3391                    }
3392                }
3393            }
3394            self.put_optimization_experiment(experiment)
3395                .await
3396                .map_err(|error| error.to_string())?;
3397            changed = true;
3398        }
3399        let refreshed = self
3400            .list_optimization_experiments(&campaign.optimization_id)
3401            .await;
3402        let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
3403        if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
3404            && phase1.budget.max_consecutive_failures > 0
3405        {
3406            campaign.status = OptimizationCampaignStatus::Failed;
3407            campaign.last_pause_reason = Some(format!(
3408                "phase 1 candidate evaluations reached {} consecutive failures",
3409                consecutive_failures
3410            ));
3411            changed = true;
3412        }
3413        Ok(changed)
3414    }
3415
3416    async fn reconcile_pending_baseline_replays(
3417        &self,
3418        campaign: &mut OptimizationCampaignRecord,
3419    ) -> Result<bool, String> {
3420        let Some(phase1) = campaign.phase1.as_ref() else {
3421            return Ok(false);
3422        };
3423        let mut changed = false;
3424        let mut remaining = Vec::new();
3425        for run_id in campaign.pending_baseline_run_ids.clone() {
3426            let Some(run) = self.get_automation_v2_run(&run_id).await else {
3427                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3428                campaign.last_pause_reason = Some(format!(
3429                    "baseline replay run `{run_id}` was not found during optimization reconciliation"
3430                ));
3431                changed = true;
3432                continue;
3433            };
3434            if !Self::automation_run_is_terminal(&run.status) {
3435                remaining.push(run_id);
3436                continue;
3437            }
3438            if run.status != crate::AutomationRunStatus::Completed {
3439                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3440                campaign.last_pause_reason = Some(format!(
3441                    "baseline replay run `{}` finished with status `{:?}`",
3442                    run.run_id, run.status
3443                ));
3444                changed = true;
3445                continue;
3446            }
3447            if run.automation_id != campaign.source_workflow_id {
3448                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3449                campaign.last_pause_reason = Some(
3450                    "baseline replay run must belong to the optimization source workflow"
3451                        .to_string(),
3452                );
3453                changed = true;
3454                continue;
3455            }
3456            let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3457                "baseline replay run must include an automation snapshot".to_string()
3458            })?;
3459            if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3460                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3461                campaign.last_pause_reason = Some(
3462                    "baseline replay run does not match the current campaign baseline snapshot"
3463                        .to_string(),
3464                );
3465                changed = true;
3466                continue;
3467            }
3468            let metrics =
3469                derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3470            let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3471            campaign
3472                .baseline_replays
3473                .push(OptimizationBaselineReplayRecord {
3474                    replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3475                    automation_run_id: Some(run.run_id.clone()),
3476                    phase1_metrics: metrics,
3477                    validator_case_outcomes,
3478                    experiment_count_at_recording: self
3479                        .count_optimization_experiments(&campaign.optimization_id)
3480                        .await as u64,
3481                    recorded_at_ms: now_ms(),
3482                });
3483            changed = true;
3484        }
3485        if remaining != campaign.pending_baseline_run_ids {
3486            campaign.pending_baseline_run_ids = remaining;
3487            changed = true;
3488        }
3489        Ok(changed)
3490    }
3491
3492    pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
3493        let campaigns = self.list_optimization_campaigns().await;
3494        let mut updated = 0usize;
3495        for campaign in campaigns {
3496            let Some(mut latest) = self
3497                .get_optimization_campaign(&campaign.optimization_id)
3498                .await
3499            else {
3500                continue;
3501            };
3502            let Some(phase1) = latest.phase1.clone() else {
3503                continue;
3504            };
3505            let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
3506            changed |= self
3507                .reconcile_phase1_candidate_experiments(&mut latest)
3508                .await?;
3509            let experiment_count = self
3510                .count_optimization_experiments(&latest.optimization_id)
3511                .await;
3512            if latest.pending_baseline_run_ids.is_empty() {
3513                if phase1_baseline_replay_due(
3514                    &latest.baseline_replays,
3515                    latest.pending_baseline_run_ids.len(),
3516                    &phase1,
3517                    experiment_count,
3518                    now_ms(),
3519                ) {
3520                    if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
3521                        latest.status = OptimizationCampaignStatus::Draft;
3522                        changed = true;
3523                    }
3524                } else if latest.baseline_replays.len()
3525                    >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
3526                {
3527                    match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
3528                        Ok(metrics) => {
3529                            if latest.baseline_metrics.as_ref() != Some(&metrics) {
3530                                latest.baseline_metrics = Some(metrics);
3531                                changed = true;
3532                            }
3533                            if matches!(
3534                                latest.status,
3535                                OptimizationCampaignStatus::Draft
3536                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
3537                            ) || (latest.status == OptimizationCampaignStatus::Running
3538                                && latest.last_pause_reason.is_some())
3539                            {
3540                                latest.status = OptimizationCampaignStatus::Running;
3541                                latest.last_pause_reason = None;
3542                                changed = true;
3543                            }
3544                        }
3545                        Err(error) => {
3546                            if matches!(
3547                                latest.status,
3548                                OptimizationCampaignStatus::Draft
3549                                    | OptimizationCampaignStatus::Running
3550                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
3551                            ) && (latest.status
3552                                != OptimizationCampaignStatus::PausedEvaluatorUnstable
3553                                || latest.last_pause_reason.as_deref() != Some(error.as_str()))
3554                            {
3555                                latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3556                                latest.last_pause_reason = Some(error);
3557                                changed = true;
3558                            }
3559                        }
3560                    }
3561                }
3562            } else if latest.last_pause_reason.as_deref()
3563                != Some("waiting for phase 1 baseline replay completion")
3564            {
3565                latest.last_pause_reason =
3566                    Some("waiting for phase 1 baseline replay completion".to_string());
3567                changed = true;
3568            }
3569            if latest.status == OptimizationCampaignStatus::Running
3570                && latest.pending_baseline_run_ids.is_empty()
3571            {
3572                changed |= self
3573                    .maybe_queue_phase1_candidate_experiment(&mut latest)
3574                    .await?;
3575            }
3576            if changed {
3577                self.put_optimization_campaign(latest)
3578                    .await
3579                    .map_err(|error| error.to_string())?;
3580                updated = updated.saturating_add(1);
3581            }
3582        }
3583        Ok(updated)
3584    }
3585
3586    async fn maybe_queue_phase1_baseline_replay(
3587        &self,
3588        campaign: &mut OptimizationCampaignRecord,
3589    ) -> Result<bool, String> {
3590        let Some(phase1) = campaign.phase1.as_ref() else {
3591            return Ok(false);
3592        };
3593        if !campaign.pending_baseline_run_ids.is_empty() {
3594            campaign.last_pause_reason =
3595                Some("waiting for phase 1 baseline replay completion".into());
3596            campaign.updated_at_ms = now_ms();
3597            return Ok(true);
3598        }
3599        let experiment_count = self
3600            .count_optimization_experiments(&campaign.optimization_id)
3601            .await;
3602        if !phase1_baseline_replay_due(
3603            &campaign.baseline_replays,
3604            campaign.pending_baseline_run_ids.len(),
3605            phase1,
3606            experiment_count,
3607            now_ms(),
3608        ) {
3609            return Ok(false);
3610        }
3611        let replay_run = self
3612            .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
3613            .await
3614            .map_err(|error| error.to_string())?;
3615        if !campaign
3616            .pending_baseline_run_ids
3617            .iter()
3618            .any(|value| value == &replay_run.run_id)
3619        {
3620            campaign
3621                .pending_baseline_run_ids
3622                .push(replay_run.run_id.clone());
3623        }
3624        campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
3625        campaign.updated_at_ms = now_ms();
3626        Ok(true)
3627    }
3628
3629    async fn maybe_queue_initial_phase1_baseline_replay(
3630        &self,
3631        campaign: &mut OptimizationCampaignRecord,
3632    ) -> Result<bool, String> {
3633        let Some(phase1) = campaign.phase1.as_ref() else {
3634            return Ok(false);
3635        };
3636        let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
3637        if campaign.baseline_replays.len() >= required_runs {
3638            return Ok(false);
3639        }
3640        self.maybe_queue_phase1_baseline_replay(campaign).await
3641    }
3642
3643    pub async fn apply_optimization_action(
3644        &self,
3645        optimization_id: &str,
3646        action: &str,
3647        experiment_id: Option<&str>,
3648        run_id: Option<&str>,
3649        reason: Option<&str>,
3650    ) -> Result<OptimizationCampaignRecord, String> {
3651        let normalized = action.trim().to_ascii_lowercase();
3652        let mut campaign = self
3653            .get_optimization_campaign(optimization_id)
3654            .await
3655            .ok_or_else(|| "optimization not found".to_string())?;
3656        match normalized.as_str() {
3657            "start" => {
3658                if campaign.phase1.is_some() {
3659                    if self
3660                        .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3661                        .await?
3662                    {
3663                        campaign.status = OptimizationCampaignStatus::Draft;
3664                    } else {
3665                        let phase1 = campaign
3666                            .phase1
3667                            .as_ref()
3668                            .ok_or_else(|| "phase 1 config is required".to_string())?;
3669                        match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
3670                            Ok(metrics) => {
3671                                campaign.baseline_metrics = Some(metrics);
3672                                campaign.status = OptimizationCampaignStatus::Running;
3673                                campaign.last_pause_reason = None;
3674                            }
3675                            Err(error) => {
3676                                campaign.status =
3677                                    OptimizationCampaignStatus::PausedEvaluatorUnstable;
3678                                campaign.last_pause_reason = Some(error);
3679                            }
3680                        }
3681                    }
3682                } else {
3683                    campaign.status = OptimizationCampaignStatus::Running;
3684                    campaign.last_pause_reason = None;
3685                }
3686            }
3687            "pause" => {
3688                campaign.status = OptimizationCampaignStatus::PausedManual;
3689                campaign.last_pause_reason = reason
3690                    .map(str::trim)
3691                    .filter(|value| !value.is_empty())
3692                    .map(str::to_string);
3693            }
3694            "resume" => {
3695                if self
3696                    .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3697                    .await?
3698                {
3699                    campaign.status = OptimizationCampaignStatus::Draft;
3700                } else {
3701                    campaign.status = OptimizationCampaignStatus::Running;
3702                    campaign.last_pause_reason = None;
3703                }
3704            }
3705            "queue_baseline_replay" => {
3706                let replay_run = self
3707                    .create_automation_v2_run(
3708                        &campaign.baseline_snapshot,
3709                        "optimization_baseline_replay",
3710                    )
3711                    .await
3712                    .map_err(|error| error.to_string())?;
3713                if !campaign
3714                    .pending_baseline_run_ids
3715                    .iter()
3716                    .any(|value| value == &replay_run.run_id)
3717                {
3718                    campaign
3719                        .pending_baseline_run_ids
3720                        .push(replay_run.run_id.clone());
3721                }
3722                campaign.updated_at_ms = now_ms();
3723            }
3724            "record_baseline_replay" => {
3725                let run_id = run_id
3726                    .map(str::trim)
3727                    .filter(|value| !value.is_empty())
3728                    .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
3729                let phase1 = campaign
3730                    .phase1
3731                    .as_ref()
3732                    .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
3733                let run = self
3734                    .get_automation_v2_run(run_id)
3735                    .await
3736                    .ok_or_else(|| "automation run not found".to_string())?;
3737                if run.automation_id != campaign.source_workflow_id {
3738                    return Err(
3739                        "baseline replay run must belong to the optimization source workflow"
3740                            .to_string(),
3741                    );
3742                }
3743                let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3744                    "baseline replay run must include an automation snapshot".to_string()
3745                })?;
3746                if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3747                    return Err(
3748                        "baseline replay run does not match the current campaign baseline snapshot"
3749                            .to_string(),
3750                    );
3751                }
3752                let metrics =
3753                    derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3754                let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3755                campaign
3756                    .baseline_replays
3757                    .push(OptimizationBaselineReplayRecord {
3758                        replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3759                        automation_run_id: Some(run.run_id.clone()),
3760                        phase1_metrics: metrics,
3761                        validator_case_outcomes,
3762                        experiment_count_at_recording: self
3763                            .count_optimization_experiments(&campaign.optimization_id)
3764                            .await as u64,
3765                        recorded_at_ms: now_ms(),
3766                    });
3767                campaign
3768                    .pending_baseline_run_ids
3769                    .retain(|value| value != run_id);
3770                campaign.updated_at_ms = now_ms();
3771            }
3772            "approve_winner" => {
3773                let experiment_id = experiment_id
3774                    .map(str::trim)
3775                    .filter(|value| !value.is_empty())
3776                    .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
3777                let mut experiment = self
3778                    .get_optimization_experiment(optimization_id, experiment_id)
3779                    .await
3780                    .ok_or_else(|| "experiment not found".to_string())?;
3781                if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3782                    return Err(
3783                        "experiment baseline_snapshot_hash does not match current campaign baseline"
3784                            .to_string(),
3785                    );
3786                }
3787                if let Some(phase1) = campaign.phase1.as_ref() {
3788                    let validated = validate_phase1_candidate_mutation(
3789                        &campaign.baseline_snapshot,
3790                        &experiment.candidate_snapshot,
3791                        phase1,
3792                    )?;
3793                    if experiment.mutation_summary.is_none() {
3794                        experiment.mutation_summary = Some(validated.summary.clone());
3795                    }
3796                    let approved_at_ms = now_ms();
3797                    let apply_patch = Self::build_optimization_apply_patch(
3798                        &campaign.baseline_snapshot,
3799                        &experiment.candidate_snapshot,
3800                        &validated,
3801                        approved_at_ms,
3802                    )?;
3803                    let mut metadata = match experiment.metadata.take() {
3804                        Some(Value::Object(map)) => map,
3805                        Some(_) => {
3806                            return Err("experiment metadata must be a JSON object".to_string());
3807                        }
3808                        None => serde_json::Map::new(),
3809                    };
3810                    metadata.insert("apply_patch".to_string(), apply_patch);
3811                    experiment.metadata = Some(Value::Object(metadata));
3812                    if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
3813                        let candidate_metrics = experiment
3814                            .phase1_metrics
3815                            .clone()
3816                            .or_else(|| {
3817                                experiment
3818                                    .metrics
3819                                    .as_ref()
3820                                    .and_then(|metrics| parse_phase1_metrics(metrics).ok())
3821                            })
3822                            .ok_or_else(|| {
3823                                "phase 1 candidate is missing promotion metrics".to_string()
3824                            })?;
3825                        let decision =
3826                            evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
3827                        experiment.promotion_recommendation = Some(
3828                            match decision.decision {
3829                                OptimizationPromotionDecisionKind::Promote => "promote",
3830                                OptimizationPromotionDecisionKind::Discard => "discard",
3831                                OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3832                                    "needs_operator_review"
3833                                }
3834                            }
3835                            .to_string(),
3836                        );
3837                        experiment.promotion_decision = Some(decision.clone());
3838                        if decision.decision != OptimizationPromotionDecisionKind::Promote {
3839                            let _ = self
3840                                .put_optimization_experiment(experiment)
3841                                .await
3842                                .map_err(|e| e.to_string())?;
3843                            return Err(decision.reason);
3844                        }
3845                        campaign.baseline_metrics = Some(candidate_metrics);
3846                    }
3847                }
3848                campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
3849                campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
3850                campaign.baseline_replays.clear();
3851                campaign.pending_baseline_run_ids.clear();
3852                campaign.pending_promotion_experiment_id = None;
3853                campaign.status = OptimizationCampaignStatus::Draft;
3854                campaign.last_pause_reason = None;
3855                experiment.status = OptimizationExperimentStatus::PromotionApproved;
3856                let _ = self
3857                    .put_optimization_experiment(experiment)
3858                    .await
3859                    .map_err(|e| e.to_string())?;
3860            }
3861            "reject_winner" => {
3862                let experiment_id = experiment_id
3863                    .map(str::trim)
3864                    .filter(|value| !value.is_empty())
3865                    .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
3866                let mut experiment = self
3867                    .get_optimization_experiment(optimization_id, experiment_id)
3868                    .await
3869                    .ok_or_else(|| "experiment not found".to_string())?;
3870                campaign.pending_promotion_experiment_id = None;
3871                campaign.status = OptimizationCampaignStatus::Draft;
3872                campaign.last_pause_reason = reason
3873                    .map(str::trim)
3874                    .filter(|value| !value.is_empty())
3875                    .map(str::to_string);
3876                experiment.status = OptimizationExperimentStatus::PromotionRejected;
3877                let _ = self
3878                    .put_optimization_experiment(experiment)
3879                    .await
3880                    .map_err(|e| e.to_string())?;
3881            }
3882            _ => return Err("unsupported optimization action".to_string()),
3883        }
3884        self.put_optimization_campaign(campaign)
3885            .await
3886            .map_err(|e| e.to_string())
3887    }
3888
3889    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3890        let mut rows = self
3891            .automations_v2
3892            .read()
3893            .await
3894            .values()
3895            .cloned()
3896            .collect::<Vec<_>>();
3897        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3898        rows
3899    }
3900
3901    pub async fn delete_automation_v2(
3902        &self,
3903        automation_id: &str,
3904    ) -> anyhow::Result<Option<AutomationV2Spec>> {
3905        let removed = self.automations_v2.write().await.remove(automation_id);
3906        let removed_run_count = {
3907            let mut runs = self.automation_v2_runs.write().await;
3908            let before = runs.len();
3909            runs.retain(|_, run| run.automation_id != automation_id);
3910            before.saturating_sub(runs.len())
3911        };
3912        self.persist_automations_v2().await?;
3913        if removed_run_count > 0 {
3914            self.persist_automation_v2_runs().await?;
3915        }
3916        self.verify_automation_v2_persisted(automation_id, false)
3917            .await?;
3918        Ok(removed)
3919    }
3920
3921    pub async fn create_automation_v2_run(
3922        &self,
3923        automation: &AutomationV2Spec,
3924        trigger_type: &str,
3925    ) -> anyhow::Result<AutomationV2RunRecord> {
3926        let now = now_ms();
3927        let pending_nodes = automation
3928            .flow
3929            .nodes
3930            .iter()
3931            .map(|n| n.node_id.clone())
3932            .collect::<Vec<_>>();
3933        let run = AutomationV2RunRecord {
3934            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
3935            automation_id: automation.automation_id.clone(),
3936            trigger_type: trigger_type.to_string(),
3937            status: AutomationRunStatus::Queued,
3938            created_at_ms: now,
3939            updated_at_ms: now,
3940            started_at_ms: None,
3941            finished_at_ms: None,
3942            active_session_ids: Vec::new(),
3943            latest_session_id: None,
3944            active_instance_ids: Vec::new(),
3945            checkpoint: AutomationRunCheckpoint {
3946                completed_nodes: Vec::new(),
3947                pending_nodes,
3948                node_outputs: std::collections::HashMap::new(),
3949                node_attempts: std::collections::HashMap::new(),
3950                blocked_nodes: Vec::new(),
3951                awaiting_gate: None,
3952                gate_history: Vec::new(),
3953                lifecycle_history: Vec::new(),
3954                last_failure: None,
3955            },
3956            automation_snapshot: Some(automation.clone()),
3957            pause_reason: None,
3958            resume_reason: None,
3959            detail: None,
3960            stop_kind: None,
3961            stop_reason: None,
3962            prompt_tokens: 0,
3963            completion_tokens: 0,
3964            total_tokens: 0,
3965            estimated_cost_usd: 0.0,
3966        };
3967        self.automation_v2_runs
3968            .write()
3969            .await
3970            .insert(run.run_id.clone(), run.clone());
3971        self.persist_automation_v2_runs().await?;
3972        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
3973            .await
3974            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
3975        Ok(run)
3976    }
3977
3978    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
3979        self.automation_v2_runs.read().await.get(run_id).cloned()
3980    }
3981
3982    pub async fn list_automation_v2_runs(
3983        &self,
3984        automation_id: Option<&str>,
3985        limit: usize,
3986    ) -> Vec<AutomationV2RunRecord> {
3987        let mut rows = self
3988            .automation_v2_runs
3989            .read()
3990            .await
3991            .values()
3992            .filter(|row| {
3993                if let Some(id) = automation_id {
3994                    row.automation_id == id
3995                } else {
3996                    true
3997                }
3998            })
3999            .cloned()
4000            .collect::<Vec<_>>();
4001        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
4002        rows.truncate(limit.clamp(1, 500));
4003        rows
4004    }
4005
4006    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
4007        let mut guard = self.automation_v2_runs.write().await;
4008        let run_id = guard
4009            .values()
4010            .filter(|row| row.status == AutomationRunStatus::Queued)
4011            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
4012            .map(|row| row.run_id.clone())?;
4013        let now = now_ms();
4014        let run = guard.get_mut(&run_id)?;
4015        run.status = AutomationRunStatus::Running;
4016        run.updated_at_ms = now;
4017        run.started_at_ms.get_or_insert(now);
4018        let claimed = run.clone();
4019        drop(guard);
4020        let _ = self.persist_automation_v2_runs().await;
4021        Some(claimed)
4022    }
4023
4024    pub async fn update_automation_v2_run(
4025        &self,
4026        run_id: &str,
4027        update: impl FnOnce(&mut AutomationV2RunRecord),
4028    ) -> Option<AutomationV2RunRecord> {
4029        let mut guard = self.automation_v2_runs.write().await;
4030        let run = guard.get_mut(run_id)?;
4031        update(run);
4032        run.updated_at_ms = now_ms();
4033        if matches!(
4034            run.status,
4035            AutomationRunStatus::Completed
4036                | AutomationRunStatus::Blocked
4037                | AutomationRunStatus::Failed
4038                | AutomationRunStatus::Cancelled
4039        ) {
4040            run.finished_at_ms.get_or_insert_with(now_ms);
4041        }
4042        let out = run.clone();
4043        drop(guard);
4044        let _ = self.persist_automation_v2_runs().await;
4045        Some(out)
4046    }
4047
4048    pub async fn add_automation_v2_session(
4049        &self,
4050        run_id: &str,
4051        session_id: &str,
4052    ) -> Option<AutomationV2RunRecord> {
4053        let updated = self
4054            .update_automation_v2_run(run_id, |row| {
4055                if !row.active_session_ids.iter().any(|id| id == session_id) {
4056                    row.active_session_ids.push(session_id.to_string());
4057                }
4058                row.latest_session_id = Some(session_id.to_string());
4059            })
4060            .await;
4061        self.automation_v2_session_runs
4062            .write()
4063            .await
4064            .insert(session_id.to_string(), run_id.to_string());
4065        updated
4066    }
4067
4068    pub async fn clear_automation_v2_session(
4069        &self,
4070        run_id: &str,
4071        session_id: &str,
4072    ) -> Option<AutomationV2RunRecord> {
4073        self.automation_v2_session_runs
4074            .write()
4075            .await
4076            .remove(session_id);
4077        self.update_automation_v2_run(run_id, |row| {
4078            row.active_session_ids.retain(|id| id != session_id);
4079        })
4080        .await
4081    }
4082
4083    pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
4084        let mut guard = self.automation_v2_session_runs.write().await;
4085        for session_id in session_ids {
4086            guard.remove(session_id);
4087        }
4088    }
4089
4090    pub async fn add_automation_v2_instance(
4091        &self,
4092        run_id: &str,
4093        instance_id: &str,
4094    ) -> Option<AutomationV2RunRecord> {
4095        self.update_automation_v2_run(run_id, |row| {
4096            if !row.active_instance_ids.iter().any(|id| id == instance_id) {
4097                row.active_instance_ids.push(instance_id.to_string());
4098            }
4099        })
4100        .await
4101    }
4102
4103    pub async fn clear_automation_v2_instance(
4104        &self,
4105        run_id: &str,
4106        instance_id: &str,
4107    ) -> Option<AutomationV2RunRecord> {
4108        self.update_automation_v2_run(run_id, |row| {
4109            row.active_instance_ids.retain(|id| id != instance_id);
4110        })
4111        .await
4112    }
4113
4114    pub async fn apply_provider_usage_to_runs(
4115        &self,
4116        session_id: &str,
4117        prompt_tokens: u64,
4118        completion_tokens: u64,
4119        total_tokens: u64,
4120    ) {
4121        if let Some(policy) = self.routine_session_policy(session_id).await {
4122            let rate = self.token_cost_per_1k_usd.max(0.0);
4123            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4124            let mut guard = self.routine_runs.write().await;
4125            if let Some(run) = guard.get_mut(&policy.run_id) {
4126                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4127                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4128                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4129                run.estimated_cost_usd += delta_cost;
4130                run.updated_at_ms = now_ms();
4131            }
4132            drop(guard);
4133            let _ = self.persist_routine_runs().await;
4134        }
4135
4136        let maybe_v2_run_id = self
4137            .automation_v2_session_runs
4138            .read()
4139            .await
4140            .get(session_id)
4141            .cloned();
4142        if let Some(run_id) = maybe_v2_run_id {
4143            let rate = self.token_cost_per_1k_usd.max(0.0);
4144            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4145            let mut guard = self.automation_v2_runs.write().await;
4146            if let Some(run) = guard.get_mut(&run_id) {
4147                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4148                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4149                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4150                run.estimated_cost_usd += delta_cost;
4151                run.updated_at_ms = now_ms();
4152            }
4153            drop(guard);
4154            let _ = self.persist_automation_v2_runs().await;
4155        }
4156    }
4157
4158    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
4159        let mut fired = Vec::new();
4160        let mut guard = self.automations_v2.write().await;
4161        for automation in guard.values_mut() {
4162            if automation.status != AutomationV2Status::Active {
4163                continue;
4164            }
4165            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
4166                automation.next_fire_at_ms =
4167                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4168                continue;
4169            };
4170            if now_ms < next_fire_at_ms {
4171                continue;
4172            }
4173            let run_count =
4174                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
4175            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4176            automation.next_fire_at_ms = next;
4177            automation.last_fired_at_ms = Some(now_ms);
4178            for _ in 0..run_count {
4179                fired.push(automation.automation_id.clone());
4180            }
4181        }
4182        drop(guard);
4183        let _ = self.persist_automations_v2().await;
4184        fired
4185    }
4186}
4187
4188async fn build_channels_config(
4189    state: &AppState,
4190    channels: &ChannelsConfigFile,
4191) -> Option<ChannelsConfig> {
4192    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
4193        return None;
4194    }
4195    Some(ChannelsConfig {
4196        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
4197            bot_token: cfg.bot_token,
4198            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4199            mention_only: cfg.mention_only,
4200            style_profile: cfg.style_profile,
4201            security_profile: cfg.security_profile,
4202        }),
4203        discord: channels.discord.clone().map(|cfg| DiscordConfig {
4204            bot_token: cfg.bot_token,
4205            guild_id: cfg.guild_id,
4206            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4207            mention_only: cfg.mention_only,
4208            security_profile: cfg.security_profile,
4209        }),
4210        slack: channels.slack.clone().map(|cfg| SlackConfig {
4211            bot_token: cfg.bot_token,
4212            channel_id: cfg.channel_id,
4213            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4214            mention_only: cfg.mention_only,
4215            security_profile: cfg.security_profile,
4216        }),
4217        server_base_url: state.server_base_url(),
4218        api_token: state.api_token().await.unwrap_or_default(),
4219        tool_policy: channels.tool_policy.clone(),
4220    })
4221}
4222
4223// channel config normalization moved to crate::config::channels
4224
4225fn is_valid_owner_repo_slug(value: &str) -> bool {
4226    let trimmed = value.trim();
4227    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
4228        return false;
4229    }
4230    let mut parts = trimmed.split('/');
4231    let Some(owner) = parts.next() else {
4232        return false;
4233    };
4234    let Some(repo) = parts.next() else {
4235        return false;
4236    };
4237    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
4238}
4239
4240fn legacy_automations_v2_path() -> Option<PathBuf> {
4241    config::paths::resolve_legacy_root_file_path("automations_v2.json")
4242        .filter(|path| path != &config::paths::resolve_automations_v2_path())
4243}
4244
4245fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4246    let mut candidates = vec![active_path.clone()];
4247    if let Some(legacy_path) = legacy_automations_v2_path() {
4248        if !candidates.contains(&legacy_path) {
4249            candidates.push(legacy_path);
4250        }
4251    }
4252    let default_path = config::paths::default_state_dir().join("automations_v2.json");
4253    if !candidates.contains(&default_path) {
4254        candidates.push(default_path);
4255    }
4256    candidates
4257}
4258
4259async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
4260    let Some(legacy_path) = legacy_automations_v2_path() else {
4261        return Ok(());
4262    };
4263    if legacy_path == *active_path || !legacy_path.exists() {
4264        return Ok(());
4265    }
4266    fs::remove_file(&legacy_path).await?;
4267    tracing::info!(
4268        active_path = active_path.display().to_string(),
4269        removed_path = legacy_path.display().to_string(),
4270        "removed stale legacy automation v2 file after canonical persistence"
4271    );
4272    Ok(())
4273}
4274
4275fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
4276    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
4277        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
4278}
4279
4280fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4281    let mut candidates = vec![active_path.clone()];
4282    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
4283        if !candidates.contains(&legacy_path) {
4284            candidates.push(legacy_path);
4285        }
4286    }
4287    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
4288    if !candidates.contains(&default_path) {
4289        candidates.push(default_path);
4290    }
4291    candidates
4292}
4293
4294fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4295    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4296        .unwrap_or_default()
4297}
4298
4299fn parse_automation_v2_runs_file(
4300    raw: &str,
4301) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4302    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4303        .unwrap_or_default()
4304}
4305
4306fn parse_optimization_campaigns_file(
4307    raw: &str,
4308) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
4309    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
4310        .unwrap_or_default()
4311}
4312
4313fn parse_optimization_experiments_file(
4314    raw: &str,
4315) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
4316    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
4317        .unwrap_or_default()
4318}
4319
4320fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4321    match schedule {
4322        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4323        RoutineSchedule::Cron { .. } => None,
4324    }
4325}
4326
4327fn parse_timezone(timezone: &str) -> Option<Tz> {
4328    timezone.trim().parse::<Tz>().ok()
4329}
4330
4331fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4332    let tz = parse_timezone(timezone)?;
4333    let schedule = Schedule::from_str(expression).ok()?;
4334    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4335    let local_from = from_dt.with_timezone(&tz);
4336    let next = schedule.after(&local_from).next()?;
4337    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4338}
4339
4340fn compute_next_schedule_fire_at_ms(
4341    schedule: &RoutineSchedule,
4342    timezone: &str,
4343    from_ms: u64,
4344) -> Option<u64> {
4345    let _ = parse_timezone(timezone)?;
4346    match schedule {
4347        RoutineSchedule::IntervalSeconds { seconds } => {
4348            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4349        }
4350        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4351    }
4352}
4353
4354fn compute_misfire_plan_for_schedule(
4355    now_ms: u64,
4356    next_fire_at_ms: u64,
4357    schedule: &RoutineSchedule,
4358    timezone: &str,
4359    policy: &RoutineMisfirePolicy,
4360) -> (u32, u64) {
4361    match schedule {
4362        RoutineSchedule::IntervalSeconds { .. } => {
4363            let Some(interval_ms) = routine_interval_ms(schedule) else {
4364                return (0, next_fire_at_ms);
4365            };
4366            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4367        }
4368        RoutineSchedule::Cron { expression } => {
4369            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4370                .unwrap_or_else(|| now_ms.saturating_add(60_000));
4371            match policy {
4372                RoutineMisfirePolicy::Skip => (0, aligned_next),
4373                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4374                RoutineMisfirePolicy::CatchUp { max_runs } => {
4375                    let mut count = 0u32;
4376                    let mut cursor = next_fire_at_ms;
4377                    while cursor <= now_ms && count < *max_runs {
4378                        count = count.saturating_add(1);
4379                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4380                            break;
4381                        };
4382                        if next <= cursor {
4383                            break;
4384                        }
4385                        cursor = next;
4386                    }
4387                    (count, aligned_next)
4388                }
4389            }
4390        }
4391    }
4392}
4393
4394fn compute_misfire_plan(
4395    now_ms: u64,
4396    next_fire_at_ms: u64,
4397    interval_ms: u64,
4398    policy: &RoutineMisfirePolicy,
4399) -> (u32, u64) {
4400    if now_ms < next_fire_at_ms || interval_ms == 0 {
4401        return (0, next_fire_at_ms);
4402    }
4403    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4404    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4405    match policy {
4406        RoutineMisfirePolicy::Skip => (0, aligned_next),
4407        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4408        RoutineMisfirePolicy::CatchUp { max_runs } => {
4409            let count = missed.min(u64::from(*max_runs)) as u32;
4410            (count, aligned_next)
4411        }
4412    }
4413}
4414
4415fn auto_generated_agent_name(agent_id: &str) -> String {
4416    let names = [
4417        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4418    ];
4419    let digest = Sha256::digest(agent_id.as_bytes());
4420    let idx = usize::from(digest[0]) % names.len();
4421    format!("{}-{:02x}", names[idx], digest[1])
4422}
4423
4424fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4425    match schedule.schedule_type {
4426        AutomationV2ScheduleType::Manual => None,
4427        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4428            seconds: schedule.interval_seconds.unwrap_or(60),
4429        }),
4430        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4431            expression: schedule.cron_expression.clone().unwrap_or_default(),
4432        }),
4433    }
4434}
4435
4436fn automation_schedule_next_fire_at_ms(
4437    schedule: &AutomationV2Schedule,
4438    from_ms: u64,
4439) -> Option<u64> {
4440    let routine_schedule = schedule_from_automation_v2(schedule)?;
4441    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4442}
4443
4444fn automation_schedule_due_count(
4445    schedule: &AutomationV2Schedule,
4446    now_ms: u64,
4447    next_fire_at_ms: u64,
4448) -> u32 {
4449    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4450        return 0;
4451    };
4452    let (count, _) = compute_misfire_plan_for_schedule(
4453        now_ms,
4454        next_fire_at_ms,
4455        &routine_schedule,
4456        &schedule.timezone,
4457        &schedule.misfire_policy,
4458    );
4459    count.max(1)
4460}
4461
4462#[derive(Debug, Clone, PartialEq, Eq)]
4463pub enum RoutineExecutionDecision {
4464    Allowed,
4465    RequiresApproval { reason: String },
4466    Blocked { reason: String },
4467}
4468
4469pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4470    let entrypoint = routine.entrypoint.to_ascii_lowercase();
4471    if entrypoint.starts_with("connector.")
4472        || entrypoint.starts_with("integration.")
4473        || entrypoint.contains("external")
4474    {
4475        return true;
4476    }
4477    routine
4478        .args
4479        .get("uses_external_integrations")
4480        .and_then(|v| v.as_bool())
4481        .unwrap_or(false)
4482        || routine
4483            .args
4484            .get("connector_id")
4485            .and_then(|v| v.as_str())
4486            .is_some()
4487}
4488
4489pub fn evaluate_routine_execution_policy(
4490    routine: &RoutineSpec,
4491    trigger_type: &str,
4492) -> RoutineExecutionDecision {
4493    if !routine_uses_external_integrations(routine) {
4494        return RoutineExecutionDecision::Allowed;
4495    }
4496    if !routine.external_integrations_allowed {
4497        return RoutineExecutionDecision::Blocked {
4498            reason: "external integrations are disabled by policy".to_string(),
4499        };
4500    }
4501    if routine.requires_approval {
4502        return RoutineExecutionDecision::RequiresApproval {
4503            reason: format!(
4504                "manual approval required before external side effects ({})",
4505                trigger_type
4506            ),
4507        };
4508    }
4509    RoutineExecutionDecision::Allowed
4510}
4511
4512fn is_valid_resource_key(key: &str) -> bool {
4513    let trimmed = key.trim();
4514    if trimmed.is_empty() {
4515        return false;
4516    }
4517    if trimmed == "swarm.active_tasks" {
4518        return true;
4519    }
4520    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4521    if !allowed_prefix
4522        .iter()
4523        .any(|prefix| trimmed.starts_with(prefix))
4524    {
4525        return false;
4526    }
4527    !trimmed.contains("//")
4528}
4529
4530impl Deref for AppState {
4531    type Target = RuntimeState;
4532
4533    fn deref(&self) -> &Self::Target {
4534        self.runtime
4535            .get()
4536            .expect("runtime accessed before startup completion")
4537    }
4538}
4539
4540#[derive(Clone)]
4541struct ServerPromptContextHook {
4542    state: AppState,
4543}
4544
4545impl ServerPromptContextHook {
4546    fn new(state: AppState) -> Self {
4547        Self { state }
4548    }
4549
4550    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4551        let paths = resolve_shared_paths().ok()?;
4552        MemoryDatabase::new(&paths.memory_db_path).await.ok()
4553    }
4554
4555    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4556        let paths = resolve_shared_paths().ok()?;
4557        tandem_memory::MemoryManager::new(&paths.memory_db_path)
4558            .await
4559            .ok()
4560    }
4561
4562    fn hash_query(input: &str) -> String {
4563        let mut hasher = Sha256::new();
4564        hasher.update(input.as_bytes());
4565        format!("{:x}", hasher.finalize())
4566    }
4567
4568    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4569        let mut out = vec!["<memory_context>".to_string()];
4570        let mut used = 0usize;
4571        for hit in hits {
4572            let text = hit
4573                .record
4574                .content
4575                .split_whitespace()
4576                .take(60)
4577                .collect::<Vec<_>>()
4578                .join(" ");
4579            let line = format!(
4580                "- [{:.3}] {} (source={}, run={})",
4581                hit.score, text, hit.record.source_type, hit.record.run_id
4582            );
4583            used = used.saturating_add(line.len());
4584            if used > 2200 {
4585                break;
4586            }
4587            out.push(line);
4588        }
4589        out.push("</memory_context>".to_string());
4590        out.join("\n")
4591    }
4592
4593    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4594        chunk
4595            .metadata
4596            .as_ref()
4597            .and_then(|meta| meta.get("source_url"))
4598            .and_then(Value::as_str)
4599            .map(str::trim)
4600            .filter(|v| !v.is_empty())
4601            .map(ToString::to_string)
4602    }
4603
4604    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4605        if let Some(path) = chunk
4606            .metadata
4607            .as_ref()
4608            .and_then(|meta| meta.get("relative_path"))
4609            .and_then(Value::as_str)
4610            .map(str::trim)
4611            .filter(|v| !v.is_empty())
4612        {
4613            return path.to_string();
4614        }
4615        chunk
4616            .source
4617            .strip_prefix("guide_docs:")
4618            .unwrap_or(chunk.source.as_str())
4619            .to_string()
4620    }
4621
4622    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4623        let mut out = vec!["<docs_context>".to_string()];
4624        let mut used = 0usize;
4625        for hit in hits {
4626            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4627            let path = Self::extract_docs_relative_path(&hit.chunk);
4628            let text = hit
4629                .chunk
4630                .content
4631                .split_whitespace()
4632                .take(70)
4633                .collect::<Vec<_>>()
4634                .join(" ");
4635            let line = format!(
4636                "- [{:.3}] {} (doc_path={}, source_url={})",
4637                hit.similarity, text, path, url
4638            );
4639            used = used.saturating_add(line.len());
4640            if used > 2800 {
4641                break;
4642            }
4643            out.push(line);
4644        }
4645        out.push("</docs_context>".to_string());
4646        out.join("\n")
4647    }
4648
4649    async fn search_embedded_docs(
4650        &self,
4651        query: &str,
4652        limit: usize,
4653    ) -> Vec<tandem_memory::types::MemorySearchResult> {
4654        let Some(manager) = self.open_memory_manager().await else {
4655            return Vec::new();
4656        };
4657        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
4658        manager
4659            .search(
4660                query,
4661                Some(MemoryTier::Global),
4662                None,
4663                None,
4664                Some(search_limit),
4665            )
4666            .await
4667            .unwrap_or_default()
4668            .into_iter()
4669            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
4670            .take(limit)
4671            .collect()
4672    }
4673
4674    fn should_skip_memory_injection(query: &str) -> bool {
4675        let trimmed = query.trim();
4676        if trimmed.is_empty() {
4677            return true;
4678        }
4679        let lower = trimmed.to_ascii_lowercase();
4680        let social = [
4681            "hi",
4682            "hello",
4683            "hey",
4684            "thanks",
4685            "thank you",
4686            "ok",
4687            "okay",
4688            "cool",
4689            "nice",
4690            "yo",
4691            "good morning",
4692            "good afternoon",
4693            "good evening",
4694        ];
4695        lower.len() <= 32 && social.contains(&lower.as_str())
4696    }
4697
4698    fn personality_preset_text(preset: &str) -> &'static str {
4699        match preset {
4700            "concise" => {
4701                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
4702            }
4703            "friendly" => {
4704                "Default style: friendly and supportive while staying technically rigorous and concrete."
4705            }
4706            "mentor" => {
4707                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
4708            }
4709            "critical" => {
4710                "Default style: critical and risk-first. Surface failure modes and assumptions early."
4711            }
4712            _ => {
4713                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
4714            }
4715        }
4716    }
4717
4718    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
4719        let allow_agent_override = agent_name
4720            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
4721            .unwrap_or(false);
4722        let legacy_bot_name = config
4723            .get("bot_name")
4724            .and_then(Value::as_str)
4725            .map(str::trim)
4726            .filter(|v| !v.is_empty());
4727        let bot_name = config
4728            .get("identity")
4729            .and_then(|identity| identity.get("bot"))
4730            .and_then(|bot| bot.get("canonical_name"))
4731            .and_then(Value::as_str)
4732            .map(str::trim)
4733            .filter(|v| !v.is_empty())
4734            .or(legacy_bot_name)
4735            .unwrap_or("Tandem");
4736
4737        let default_profile = config
4738            .get("identity")
4739            .and_then(|identity| identity.get("personality"))
4740            .and_then(|personality| personality.get("default"));
4741        let default_preset = default_profile
4742            .and_then(|profile| profile.get("preset"))
4743            .and_then(Value::as_str)
4744            .map(str::trim)
4745            .filter(|v| !v.is_empty())
4746            .unwrap_or("balanced");
4747        let default_custom = default_profile
4748            .and_then(|profile| profile.get("custom_instructions"))
4749            .and_then(Value::as_str)
4750            .map(str::trim)
4751            .filter(|v| !v.is_empty())
4752            .map(ToString::to_string);
4753        let legacy_persona = config
4754            .get("persona")
4755            .and_then(Value::as_str)
4756            .map(str::trim)
4757            .filter(|v| !v.is_empty())
4758            .map(ToString::to_string);
4759
4760        let per_agent_profile = if allow_agent_override {
4761            agent_name.and_then(|name| {
4762                config
4763                    .get("identity")
4764                    .and_then(|identity| identity.get("personality"))
4765                    .and_then(|personality| personality.get("per_agent"))
4766                    .and_then(|per_agent| per_agent.get(name))
4767            })
4768        } else {
4769            None
4770        };
4771        let preset = per_agent_profile
4772            .and_then(|profile| profile.get("preset"))
4773            .and_then(Value::as_str)
4774            .map(str::trim)
4775            .filter(|v| !v.is_empty())
4776            .unwrap_or(default_preset);
4777        let custom = per_agent_profile
4778            .and_then(|profile| profile.get("custom_instructions"))
4779            .and_then(Value::as_str)
4780            .map(str::trim)
4781            .filter(|v| !v.is_empty())
4782            .map(ToString::to_string)
4783            .or(default_custom)
4784            .or(legacy_persona);
4785
4786        let mut lines = vec![
4787            format!("You are {bot_name}, an AI assistant."),
4788            Self::personality_preset_text(preset).to_string(),
4789        ];
4790        if let Some(custom) = custom {
4791            lines.push(format!("Additional personality instructions: {custom}"));
4792        }
4793        Some(lines.join("\n"))
4794    }
4795
4796    fn build_memory_scope_block(
4797        session_id: &str,
4798        project_id: Option<&str>,
4799        workspace_root: Option<&str>,
4800    ) -> String {
4801        let mut lines = vec![
4802            "<memory_scope>".to_string(),
4803            format!("- current_session_id: {}", session_id),
4804        ];
4805        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
4806            lines.push(format!("- current_project_id: {}", project_id));
4807        }
4808        if let Some(workspace_root) = workspace_root
4809            .map(str::trim)
4810            .filter(|value| !value.is_empty())
4811        {
4812            lines.push(format!("- workspace_root: {}", workspace_root));
4813        }
4814        lines.push(
4815            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
4816                .to_string(),
4817        );
4818        lines.push(
4819            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
4820                .to_string(),
4821        );
4822        lines.push(
4823            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
4824                .to_string(),
4825        );
4826        lines.push("</memory_scope>".to_string());
4827        lines.join("\n")
4828    }
4829}
4830
4831impl PromptContextHook for ServerPromptContextHook {
4832    fn augment_provider_messages(
4833        &self,
4834        ctx: PromptContextHookContext,
4835        mut messages: Vec<ChatMessage>,
4836    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
4837        let this = self.clone();
4838        Box::pin(async move {
4839            // Startup can invoke prompt plumbing before RuntimeState is installed.
4840            // Never panic from context hooks; fail-open and continue without augmentation.
4841            if !this.state.is_ready() {
4842                return Ok(messages);
4843            }
4844            let run = this.state.run_registry.get(&ctx.session_id).await;
4845            let Some(run) = run else {
4846                return Ok(messages);
4847            };
4848            let config = this.state.config.get_effective_value().await;
4849            if let Some(identity_block) =
4850                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
4851            {
4852                messages.push(ChatMessage {
4853                    role: "system".to_string(),
4854                    content: identity_block,
4855                    attachments: Vec::new(),
4856                });
4857            }
4858            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
4859                messages.push(ChatMessage {
4860                    role: "system".to_string(),
4861                    content: Self::build_memory_scope_block(
4862                        &ctx.session_id,
4863                        session.project_id.as_deref(),
4864                        session.workspace_root.as_deref(),
4865                    ),
4866                    attachments: Vec::new(),
4867                });
4868            }
4869            let run_id = run.run_id;
4870            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
4871            let query = messages
4872                .iter()
4873                .rev()
4874                .find(|m| m.role == "user")
4875                .map(|m| m.content.clone())
4876                .unwrap_or_default();
4877            if query.trim().is_empty() {
4878                return Ok(messages);
4879            }
4880            if Self::should_skip_memory_injection(&query) {
4881                return Ok(messages);
4882            }
4883
4884            let docs_hits = this.search_embedded_docs(&query, 6).await;
4885            if !docs_hits.is_empty() {
4886                let docs_block = Self::build_docs_memory_block(&docs_hits);
4887                messages.push(ChatMessage {
4888                    role: "system".to_string(),
4889                    content: docs_block.clone(),
4890                    attachments: Vec::new(),
4891                });
4892                this.state.event_bus.publish(EngineEvent::new(
4893                    "memory.docs.context.injected",
4894                    json!({
4895                        "runID": run_id,
4896                        "sessionID": ctx.session_id,
4897                        "messageID": ctx.message_id,
4898                        "iteration": ctx.iteration,
4899                        "count": docs_hits.len(),
4900                        "tokenSizeApprox": docs_block.split_whitespace().count(),
4901                        "sourcePrefix": "guide_docs:"
4902                    }),
4903                ));
4904                return Ok(messages);
4905            }
4906
4907            let Some(db) = this.open_memory_db().await else {
4908                return Ok(messages);
4909            };
4910            let started = now_ms();
4911            let hits = db
4912                .search_global_memory(&user_id, &query, 8, None, None, None)
4913                .await
4914                .unwrap_or_default();
4915            let latency_ms = now_ms().saturating_sub(started);
4916            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
4917            this.state.event_bus.publish(EngineEvent::new(
4918                "memory.search.performed",
4919                json!({
4920                    "runID": run_id,
4921                    "sessionID": ctx.session_id,
4922                    "messageID": ctx.message_id,
4923                    "providerID": ctx.provider_id,
4924                    "modelID": ctx.model_id,
4925                    "iteration": ctx.iteration,
4926                    "queryHash": Self::hash_query(&query),
4927                    "resultCount": hits.len(),
4928                    "scoreMin": scores.iter().copied().reduce(f64::min),
4929                    "scoreMax": scores.iter().copied().reduce(f64::max),
4930                    "scores": scores,
4931                    "latencyMs": latency_ms,
4932                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
4933                }),
4934            ));
4935
4936            if hits.is_empty() {
4937                return Ok(messages);
4938            }
4939
4940            let memory_block = Self::build_memory_block(&hits);
4941            messages.push(ChatMessage {
4942                role: "system".to_string(),
4943                content: memory_block.clone(),
4944                attachments: Vec::new(),
4945            });
4946            this.state.event_bus.publish(EngineEvent::new(
4947                "memory.context.injected",
4948                json!({
4949                    "runID": run_id,
4950                    "sessionID": ctx.session_id,
4951                    "messageID": ctx.message_id,
4952                    "iteration": ctx.iteration,
4953                    "count": hits.len(),
4954                    "tokenSizeApprox": memory_block.split_whitespace().count(),
4955                }),
4956            ));
4957            Ok(messages)
4958        })
4959    }
4960}
4961
4962fn extract_event_session_id(properties: &Value) -> Option<String> {
4963    properties
4964        .get("sessionID")
4965        .or_else(|| properties.get("sessionId"))
4966        .or_else(|| properties.get("id"))
4967        .or_else(|| {
4968            properties
4969                .get("part")
4970                .and_then(|part| part.get("sessionID"))
4971        })
4972        .or_else(|| {
4973            properties
4974                .get("part")
4975                .and_then(|part| part.get("sessionId"))
4976        })
4977        .and_then(|v| v.as_str())
4978        .map(|s| s.to_string())
4979}
4980
4981fn extract_event_run_id(properties: &Value) -> Option<String> {
4982    properties
4983        .get("runID")
4984        .or_else(|| properties.get("run_id"))
4985        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
4986        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
4987        .and_then(|v| v.as_str())
4988        .map(|s| s.to_string())
4989}
4990
4991pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
4992    let part = properties.get("part")?;
4993    let part_type = part
4994        .get("type")
4995        .and_then(|v| v.as_str())
4996        .unwrap_or_default()
4997        .to_ascii_lowercase();
4998    if part_type != "tool" && part_type != "tool-invocation" && part_type != "tool-result" {
4999        return None;
5000    }
5001    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
5002    let message_id = part
5003        .get("messageID")
5004        .or_else(|| part.get("message_id"))
5005        .and_then(|v| v.as_str())?
5006        .to_string();
5007    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
5008    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5009        if let Some(preview) = properties
5010            .get("toolCallDelta")
5011            .and_then(|delta| delta.get("parsedArgsPreview"))
5012            .cloned()
5013        {
5014            let preview_nonempty = !preview.is_null()
5015                && !preview.as_object().is_some_and(|value| value.is_empty())
5016                && !preview
5017                    .as_str()
5018                    .map(|value| value.trim().is_empty())
5019                    .unwrap_or(false);
5020            if preview_nonempty {
5021                args = preview;
5022            }
5023        }
5024        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5025            if let Some(raw_preview) = properties
5026                .get("toolCallDelta")
5027                .and_then(|delta| delta.get("rawArgsPreview"))
5028                .and_then(|value| value.as_str())
5029                .map(str::trim)
5030                .filter(|value| !value.is_empty())
5031            {
5032                args = Value::String(raw_preview.to_string());
5033            }
5034        }
5035    }
5036    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
5037    {
5038        tracing::info!(
5039            message_id = %message_id,
5040            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
5041            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
5042            has_result = part.get("result").is_some(),
5043            has_error = part.get("error").is_some(),
5044            "persistable write tool part still has empty args"
5045        );
5046    }
5047    let result = part.get("result").cloned().filter(|value| !value.is_null());
5048    let error = part
5049        .get("error")
5050        .and_then(|v| v.as_str())
5051        .map(|value| value.to_string());
5052    Some((
5053        message_id,
5054        MessagePart::ToolInvocation {
5055            tool,
5056            args,
5057            result,
5058            error,
5059        },
5060    ))
5061}
5062
5063pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
5064    let session_id = extract_event_session_id(&event.properties)?;
5065    let run_id = extract_event_run_id(&event.properties);
5066    let key = format!("run/{session_id}/status");
5067
5068    let mut base = serde_json::Map::new();
5069    base.insert("sessionID".to_string(), Value::String(session_id));
5070    if let Some(run_id) = run_id {
5071        base.insert("runID".to_string(), Value::String(run_id));
5072    }
5073
5074    match event.event_type.as_str() {
5075        "session.run.started" => {
5076            base.insert("state".to_string(), Value::String("running".to_string()));
5077            base.insert("phase".to_string(), Value::String("run".to_string()));
5078            base.insert(
5079                "eventType".to_string(),
5080                Value::String("session.run.started".to_string()),
5081            );
5082            Some(StatusIndexUpdate {
5083                key,
5084                value: Value::Object(base),
5085            })
5086        }
5087        "session.run.finished" => {
5088            base.insert("state".to_string(), Value::String("finished".to_string()));
5089            base.insert("phase".to_string(), Value::String("run".to_string()));
5090            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
5091                base.insert("result".to_string(), Value::String(status.to_string()));
5092            }
5093            base.insert(
5094                "eventType".to_string(),
5095                Value::String("session.run.finished".to_string()),
5096            );
5097            Some(StatusIndexUpdate {
5098                key,
5099                value: Value::Object(base),
5100            })
5101        }
5102        "message.part.updated" => {
5103            let part = event.properties.get("part")?;
5104            let part_type = part.get("type").and_then(|v| v.as_str())?;
5105            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
5106            let (phase, tool_active) = match (part_type, part_state) {
5107                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
5108                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
5109                _ => return None,
5110            };
5111            base.insert("state".to_string(), Value::String("running".to_string()));
5112            base.insert("phase".to_string(), Value::String(phase.to_string()));
5113            base.insert("toolActive".to_string(), Value::Bool(tool_active));
5114            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
5115                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
5116            }
5117            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
5118                base.insert(
5119                    "toolState".to_string(),
5120                    Value::String(tool_state.to_string()),
5121                );
5122            }
5123            if let Some(tool_error) = part
5124                .get("error")
5125                .and_then(|v| v.as_str())
5126                .map(str::trim)
5127                .filter(|value| !value.is_empty())
5128            {
5129                base.insert(
5130                    "toolError".to_string(),
5131                    Value::String(tool_error.to_string()),
5132                );
5133            }
5134            if let Some(tool_call_id) = part
5135                .get("id")
5136                .and_then(|v| v.as_str())
5137                .map(str::trim)
5138                .filter(|value| !value.is_empty())
5139            {
5140                base.insert(
5141                    "toolCallID".to_string(),
5142                    Value::String(tool_call_id.to_string()),
5143                );
5144            }
5145            if let Some(args_preview) = part
5146                .get("args")
5147                .filter(|value| {
5148                    !value.is_null()
5149                        && !value.as_object().is_some_and(|map| map.is_empty())
5150                        && !value
5151                            .as_str()
5152                            .map(|text| text.trim().is_empty())
5153                            .unwrap_or(false)
5154                })
5155                .map(|value| truncate_text(&value.to_string(), 500))
5156            {
5157                base.insert(
5158                    "toolArgsPreview".to_string(),
5159                    Value::String(args_preview.to_string()),
5160                );
5161            }
5162            base.insert(
5163                "eventType".to_string(),
5164                Value::String("message.part.updated".to_string()),
5165            );
5166            Some(StatusIndexUpdate {
5167                key,
5168                value: Value::Object(base),
5169            })
5170        }
5171        _ => None,
5172    }
5173}
5174
5175pub async fn run_session_part_persister(state: AppState) {
5176    crate::app::tasks::run_session_part_persister(state).await
5177}
5178
5179pub async fn run_status_indexer(state: AppState) {
5180    crate::app::tasks::run_status_indexer(state).await
5181}
5182
5183pub async fn run_agent_team_supervisor(state: AppState) {
5184    crate::app::tasks::run_agent_team_supervisor(state).await
5185}
5186
5187pub async fn run_bug_monitor(state: AppState) {
5188    crate::app::tasks::run_bug_monitor(state).await
5189}
5190
5191pub async fn run_usage_aggregator(state: AppState) {
5192    crate::app::tasks::run_usage_aggregator(state).await
5193}
5194
5195pub async fn run_optimization_scheduler(state: AppState) {
5196    crate::app::tasks::run_optimization_scheduler(state).await
5197}
5198
5199pub async fn process_bug_monitor_event(
5200    state: &AppState,
5201    event: &EngineEvent,
5202    config: &BugMonitorConfig,
5203) -> anyhow::Result<BugMonitorIncidentRecord> {
5204    let submission =
5205        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
5206            .await?;
5207    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5208        state,
5209        submission.repo.as_deref().unwrap_or_default(),
5210        submission.fingerprint.as_deref().unwrap_or_default(),
5211        submission.title.as_deref(),
5212        submission.detail.as_deref(),
5213        &submission.excerpt,
5214        3,
5215    )
5216    .await;
5217    let fingerprint = submission
5218        .fingerprint
5219        .clone()
5220        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5221    let default_workspace_root = state.workspace_index.snapshot().await.root;
5222    let workspace_root = config
5223        .workspace_root
5224        .clone()
5225        .unwrap_or(default_workspace_root);
5226    let now = now_ms();
5227
5228    let existing = state
5229        .bug_monitor_incidents
5230        .read()
5231        .await
5232        .values()
5233        .find(|row| row.fingerprint == fingerprint)
5234        .cloned();
5235
5236    let mut incident = if let Some(mut row) = existing {
5237        row.occurrence_count = row.occurrence_count.saturating_add(1);
5238        row.updated_at_ms = now;
5239        row.last_seen_at_ms = Some(now);
5240        if row.excerpt.is_empty() {
5241            row.excerpt = submission.excerpt.clone();
5242        }
5243        row
5244    } else {
5245        BugMonitorIncidentRecord {
5246            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5247            fingerprint: fingerprint.clone(),
5248            event_type: event.event_type.clone(),
5249            status: "queued".to_string(),
5250            repo: submission.repo.clone().unwrap_or_default(),
5251            workspace_root,
5252            title: submission
5253                .title
5254                .clone()
5255                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5256            detail: submission.detail.clone(),
5257            excerpt: submission.excerpt.clone(),
5258            source: submission.source.clone(),
5259            run_id: submission.run_id.clone(),
5260            session_id: submission.session_id.clone(),
5261            correlation_id: submission.correlation_id.clone(),
5262            component: submission.component.clone(),
5263            level: submission.level.clone(),
5264            occurrence_count: 1,
5265            created_at_ms: now,
5266            updated_at_ms: now,
5267            last_seen_at_ms: Some(now),
5268            draft_id: None,
5269            triage_run_id: None,
5270            last_error: None,
5271            duplicate_summary: None,
5272            duplicate_matches: None,
5273            event_payload: Some(event.properties.clone()),
5274        }
5275    };
5276    state.put_bug_monitor_incident(incident.clone()).await?;
5277
5278    if !duplicate_matches.is_empty() {
5279        incident.status = "duplicate_suppressed".to_string();
5280        let duplicate_summary =
5281            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5282        incident.duplicate_summary = Some(duplicate_summary.clone());
5283        incident.duplicate_matches = Some(duplicate_matches.clone());
5284        incident.updated_at_ms = now_ms();
5285        state.put_bug_monitor_incident(incident.clone()).await?;
5286        state.event_bus.publish(EngineEvent::new(
5287            "bug_monitor.incident.duplicate_suppressed",
5288            serde_json::json!({
5289                "incident_id": incident.incident_id,
5290                "fingerprint": incident.fingerprint,
5291                "eventType": incident.event_type,
5292                "status": incident.status,
5293                "duplicate_summary": duplicate_summary,
5294                "duplicate_matches": duplicate_matches,
5295            }),
5296        ));
5297        return Ok(incident);
5298    }
5299
5300    let draft = match state.submit_bug_monitor_draft(submission).await {
5301        Ok(draft) => draft,
5302        Err(error) => {
5303            incident.status = "draft_failed".to_string();
5304            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5305            incident.updated_at_ms = now_ms();
5306            state.put_bug_monitor_incident(incident.clone()).await?;
5307            state.event_bus.publish(EngineEvent::new(
5308                "bug_monitor.incident.detected",
5309                serde_json::json!({
5310                    "incident_id": incident.incident_id,
5311                    "fingerprint": incident.fingerprint,
5312                    "eventType": incident.event_type,
5313                    "draft_id": incident.draft_id,
5314                    "triage_run_id": incident.triage_run_id,
5315                    "status": incident.status,
5316                    "detail": incident.last_error,
5317                }),
5318            ));
5319            return Ok(incident);
5320        }
5321    };
5322    incident.draft_id = Some(draft.draft_id.clone());
5323    incident.status = "draft_created".to_string();
5324    state.put_bug_monitor_incident(incident.clone()).await?;
5325
5326    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5327        state.clone(),
5328        &draft.draft_id,
5329        true,
5330    )
5331    .await
5332    {
5333        Ok((updated_draft, _run_id, _deduped)) => {
5334            incident.triage_run_id = updated_draft.triage_run_id.clone();
5335            if incident.triage_run_id.is_some() {
5336                incident.status = "triage_queued".to_string();
5337            }
5338            incident.last_error = None;
5339        }
5340        Err(error) => {
5341            incident.status = "draft_created".to_string();
5342            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5343        }
5344    }
5345
5346    if let Some(draft_id) = incident.draft_id.clone() {
5347        let latest_draft = state
5348            .get_bug_monitor_draft(&draft_id)
5349            .await
5350            .unwrap_or(draft.clone());
5351        match crate::bug_monitor_github::publish_draft(
5352            state,
5353            &draft_id,
5354            Some(&incident.incident_id),
5355            crate::bug_monitor_github::PublishMode::Auto,
5356        )
5357        .await
5358        {
5359            Ok(outcome) => {
5360                incident.status = outcome.action;
5361                incident.last_error = None;
5362            }
5363            Err(error) => {
5364                let detail = truncate_text(&error.to_string(), 500);
5365                incident.last_error = Some(detail.clone());
5366                let mut failed_draft = latest_draft;
5367                failed_draft.status = "github_post_failed".to_string();
5368                failed_draft.github_status = Some("github_post_failed".to_string());
5369                failed_draft.last_post_error = Some(detail.clone());
5370                let evidence_digest = failed_draft.evidence_digest.clone();
5371                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5372                let _ = crate::bug_monitor_github::record_post_failure(
5373                    state,
5374                    &failed_draft,
5375                    Some(&incident.incident_id),
5376                    "auto_post",
5377                    evidence_digest.as_deref(),
5378                    &detail,
5379                )
5380                .await;
5381            }
5382        }
5383    }
5384
5385    incident.updated_at_ms = now_ms();
5386    state.put_bug_monitor_incident(incident.clone()).await?;
5387    state.event_bus.publish(EngineEvent::new(
5388        "bug_monitor.incident.detected",
5389        serde_json::json!({
5390            "incident_id": incident.incident_id,
5391            "fingerprint": incident.fingerprint,
5392            "eventType": incident.event_type,
5393            "draft_id": incident.draft_id,
5394            "triage_run_id": incident.triage_run_id,
5395            "status": incident.status,
5396        }),
5397    ));
5398    Ok(incident)
5399}
5400
5401pub fn sha256_hex(parts: &[&str]) -> String {
5402    let mut hasher = Sha256::new();
5403    for part in parts {
5404        hasher.update(part.as_bytes());
5405        hasher.update([0u8]);
5406    }
5407    format!("{:x}", hasher.finalize())
5408}
5409
5410pub async fn run_routine_scheduler(state: AppState) {
5411    crate::app::tasks::run_routine_scheduler(state).await
5412}
5413
5414pub async fn run_routine_executor(state: AppState) {
5415    crate::app::tasks::run_routine_executor(state).await
5416}
5417
5418pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
5419    crate::app::routines::build_routine_prompt(state, run).await
5420}
5421
5422pub fn truncate_text(input: &str, max_len: usize) -> String {
5423    if input.len() <= max_len {
5424        return input.to_string();
5425    }
5426    let mut out = input[..max_len].to_string();
5427    out.push_str("...<truncated>");
5428    out
5429}
5430
5431pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
5432    crate::app::routines::append_configured_output_artifacts(state, run).await
5433}
5434
5435pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
5436    let provider_id = config
5437        .get("default_provider")
5438        .and_then(|v| v.as_str())
5439        .map(str::trim)
5440        .filter(|v| !v.is_empty())?;
5441    let model_id = config
5442        .get("providers")
5443        .and_then(|v| v.get(provider_id))
5444        .and_then(|v| v.get("default_model"))
5445        .and_then(|v| v.as_str())
5446        .map(str::trim)
5447        .filter(|v| !v.is_empty())?;
5448    Some(ModelSpec {
5449        provider_id: provider_id.to_string(),
5450        model_id: model_id.to_string(),
5451    })
5452}
5453
5454pub async fn resolve_routine_model_spec_for_run(
5455    state: &AppState,
5456    run: &RoutineRunRecord,
5457) -> (Option<ModelSpec>, String) {
5458    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
5459}
5460
5461fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
5462    let mut out = Vec::new();
5463    let mut seen = std::collections::HashSet::new();
5464    for item in raw {
5465        let normalized = item.trim().to_string();
5466        if normalized.is_empty() {
5467            continue;
5468        }
5469        if seen.insert(normalized.clone()) {
5470            out.push(normalized);
5471        }
5472    }
5473    out
5474}
5475
5476#[cfg(not(feature = "browser"))]
5477impl AppState {
5478    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
5479        0
5480    }
5481
5482    pub async fn close_all_browser_sessions(&self) -> usize {
5483        0
5484    }
5485
5486    pub async fn browser_status(&self) -> serde_json::Value {
5487        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
5488    }
5489
5490    pub async fn browser_smoke_test(
5491        &self,
5492        _url: Option<String>,
5493    ) -> anyhow::Result<serde_json::Value> {
5494        anyhow::bail!("browser feature disabled")
5495    }
5496
5497    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
5498        anyhow::bail!("browser feature disabled")
5499    }
5500
5501    pub async fn browser_health_summary(&self) -> serde_json::Value {
5502        serde_json::json!({ "enabled": false })
5503    }
5504}
5505
5506pub mod automation;
5507pub use automation::*;
5508
5509#[cfg(test)]
5510mod tests;