Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::{
22    channel_registry::registered_channels,
23    config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig},
24};
25use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
26use tandem_memory::db::MemoryDatabase;
27use tandem_providers::ChatMessage;
28use tandem_workflows::{
29    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
30    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
31    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
32};
33
34use crate::agent_teams::AgentTeamRuntime;
35use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
36use crate::automation_v2::types::*;
37use crate::bug_monitor::types::*;
38use crate::capability_resolver::CapabilityResolver;
39use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
40use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
41use crate::pack_manager::PackManager;
42use crate::preset_registry::PresetRegistry;
43use crate::routines::{errors::RoutineStoreError, types::*};
44use crate::runtime::{
45    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
46};
47use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
48use crate::util::{host::detect_host_runtime_context, time::now_ms};
49use crate::{
50    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
51    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
52    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
53    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
54    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
55    OptimizationPromotionDecisionKind,
56};
57
58#[derive(Clone)]
59pub struct AppState {
60    pub runtime: Arc<OnceLock<RuntimeState>>,
61    pub startup: Arc<RwLock<StartupState>>,
62    pub in_process_mode: Arc<AtomicBool>,
63    pub api_token: Arc<RwLock<Option<String>>>,
64    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
65    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
66    pub run_registry: RunRegistry,
67    pub run_stale_ms: u64,
68    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
69    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
70    pub memory_audit_path: PathBuf,
71    pub protected_audit_path: PathBuf,
72    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
73    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
74    pub shared_resources_path: PathBuf,
75    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
76    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
77    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
78    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
79    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
80    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
81    pub automation_scheduler_stopping: Arc<AtomicBool>,
82    pub automations_v2_persistence: Arc<tokio::sync::Mutex<()>>,
83    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
84    pub workflow_plan_drafts:
85        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
86    pub workflow_planner_sessions: Arc<
87        RwLock<
88            std::collections::HashMap<
89                String,
90                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
91            >,
92        >,
93    >,
94    pub workflow_learning_candidates:
95        Arc<RwLock<std::collections::HashMap<String, WorkflowLearningCandidate>>>,
96    pub(crate) context_packs: Arc<
97        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
98    >,
99    pub optimization_campaigns:
100        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
101    pub optimization_experiments:
102        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
103    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
104    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
105    pub bug_monitor_incidents:
106        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
107    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
108    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
109    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
110    pub(crate) provider_oauth_sessions: Arc<
111        RwLock<
112            std::collections::HashMap<
113                String,
114                crate::http::config_providers::ProviderOAuthSessionRecord,
115            >,
116        >,
117    >,
118    pub workflows: Arc<RwLock<WorkflowRegistry>>,
119    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
120    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
121    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
122    pub routine_session_policies:
123        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
124    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
125    pub automation_v2_session_mcp_servers:
126        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
127    pub token_cost_per_1k_usd: f64,
128    pub routines_path: PathBuf,
129    pub routine_history_path: PathBuf,
130    pub routine_runs_path: PathBuf,
131    pub automations_v2_path: PathBuf,
132    pub automation_v2_runs_path: PathBuf,
133    pub optimization_campaigns_path: PathBuf,
134    pub optimization_experiments_path: PathBuf,
135    pub bug_monitor_config_path: PathBuf,
136    pub bug_monitor_drafts_path: PathBuf,
137    pub bug_monitor_incidents_path: PathBuf,
138    pub bug_monitor_posts_path: PathBuf,
139    pub external_actions_path: PathBuf,
140    pub workflow_runs_path: PathBuf,
141    pub workflow_planner_sessions_path: PathBuf,
142    pub workflow_learning_candidates_path: PathBuf,
143    pub context_packs_path: PathBuf,
144    pub workflow_hook_overrides_path: PathBuf,
145    pub agent_teams: AgentTeamRuntime,
146    pub web_ui_enabled: Arc<AtomicBool>,
147    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
148    pub server_base_url: Arc<std::sync::RwLock<String>>,
149    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
150    pub host_runtime_context: HostRuntimeContext,
151    pub pack_manager: Arc<PackManager>,
152    pub capability_resolver: Arc<CapabilityResolver>,
153    pub preset_registry: Arc<PresetRegistry>,
154}
155#[derive(Debug, Clone, Serialize, Deserialize, Default)]
156pub struct ChannelStatus {
157    pub enabled: bool,
158    pub connected: bool,
159    pub last_error: Option<String>,
160    pub active_sessions: u64,
161    pub meta: Value,
162}
163#[derive(Debug, Clone, Serialize, Deserialize, Default)]
164struct EffectiveAppConfig {
165    #[serde(default)]
166    pub channels: ChannelsConfigFile,
167    #[serde(default)]
168    pub web_ui: WebUiConfig,
169    #[serde(default)]
170    pub browser: tandem_core::BrowserConfig,
171    #[serde(default)]
172    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
173}
174
175pub struct ChannelRuntime {
176    pub listeners: Option<tokio::task::JoinSet<()>>,
177    pub statuses: std::collections::HashMap<String, ChannelStatus>,
178    pub diagnostics: tandem_channels::channel_registry::ChannelRuntimeDiagnostics,
179}
180
181impl Default for ChannelRuntime {
182    fn default() -> Self {
183        Self {
184            listeners: None,
185            statuses: std::collections::HashMap::new(),
186            diagnostics: tandem_channels::new_channel_runtime_diagnostics(),
187        }
188    }
189}
190
191#[derive(Debug, Clone)]
192pub struct StatusIndexUpdate {
193    pub key: String,
194    pub value: Value,
195}
196
197impl AppState {
198    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
199        Self {
200            runtime: Arc::new(OnceLock::new()),
201            startup: Arc::new(RwLock::new(StartupState {
202                status: StartupStatus::Starting,
203                phase: "boot".to_string(),
204                started_at_ms: now_ms(),
205                attempt_id,
206                last_error: None,
207            })),
208            in_process_mode: Arc::new(AtomicBool::new(in_process)),
209            api_token: Arc::new(RwLock::new(None)),
210            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
211            managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
212            run_registry: RunRegistry::new(),
213            run_stale_ms: config::env::resolve_run_stale_ms(),
214            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
215            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
216            memory_audit_path: config::paths::resolve_memory_audit_path(),
217            protected_audit_path: config::paths::resolve_protected_audit_path(),
218            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
219            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
220            shared_resources_path: config::paths::resolve_shared_resources_path(),
221            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
222            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
223            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
224            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
225            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
226            automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
227                config::env::resolve_scheduler_max_concurrent_runs(),
228            ))),
229            automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
230            automations_v2_persistence: Arc::new(tokio::sync::Mutex::new(())),
231            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
232            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
233            workflow_planner_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
234            workflow_learning_candidates: Arc::new(RwLock::new(std::collections::HashMap::new())),
235            context_packs: Arc::new(RwLock::new(std::collections::HashMap::new())),
236            optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
237            optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
238            bug_monitor_config: Arc::new(
239                RwLock::new(config::env::resolve_bug_monitor_env_config()),
240            ),
241            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
242            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
243            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
244            external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
245            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
246            provider_oauth_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
247            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
248            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
249            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
250            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
251            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
252            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
253            automation_v2_session_mcp_servers: Arc::new(RwLock::new(
254                std::collections::HashMap::new(),
255            )),
256            routines_path: config::paths::resolve_routines_path(),
257            routine_history_path: config::paths::resolve_routine_history_path(),
258            routine_runs_path: config::paths::resolve_routine_runs_path(),
259            automations_v2_path: config::paths::resolve_automations_v2_path(),
260            automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
261            optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
262            optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
263            bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
264            bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
265            bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
266            bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
267            external_actions_path: config::paths::resolve_external_actions_path(),
268            workflow_runs_path: config::paths::resolve_workflow_runs_path(),
269            workflow_planner_sessions_path: config::paths::resolve_workflow_planner_sessions_path(),
270            workflow_learning_candidates_path:
271                config::paths::resolve_workflow_learning_candidates_path(),
272            context_packs_path: config::paths::resolve_context_packs_path(),
273            workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
274            agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
275            web_ui_enabled: Arc::new(AtomicBool::new(false)),
276            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
277            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
278            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
279            host_runtime_context: detect_host_runtime_context(),
280            token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
281            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
282            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
283            preset_registry: Arc::new(PresetRegistry::new(
284                PackManager::default_root(),
285                resolve_shared_paths()
286                    .map(|paths| paths.canonical_root)
287                    .unwrap_or_else(|_| {
288                        dirs::home_dir()
289                            .unwrap_or_else(|| PathBuf::from("."))
290                            .join(".tandem")
291                    }),
292            )),
293        }
294    }
295
296    pub fn is_ready(&self) -> bool {
297        self.runtime.get().is_some()
298    }
299
300    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
301        for _ in 0..attempts {
302            if self.is_ready() {
303                return true;
304            }
305            let startup = self.startup_snapshot().await;
306            if matches!(startup.status, StartupStatus::Failed) {
307                return false;
308            }
309            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
310        }
311        self.is_ready()
312    }
313
314    pub fn mode_label(&self) -> &'static str {
315        if self.in_process_mode.load(Ordering::Relaxed) {
316            "in-process"
317        } else {
318            "sidecar"
319        }
320    }
321
322    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
323        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
324        if let Ok(mut guard) = self.web_ui_prefix.write() {
325            *guard = config::webui::normalize_web_ui_prefix(&prefix);
326        }
327    }
328
329    pub fn web_ui_enabled(&self) -> bool {
330        self.web_ui_enabled.load(Ordering::Relaxed)
331    }
332
333    pub fn web_ui_prefix(&self) -> String {
334        self.web_ui_prefix
335            .read()
336            .map(|v| v.clone())
337            .unwrap_or_else(|_| "/admin".to_string())
338    }
339
340    pub fn set_server_base_url(&self, base_url: String) {
341        if let Ok(mut guard) = self.server_base_url.write() {
342            *guard = base_url;
343        }
344    }
345
346    pub fn server_base_url(&self) -> String {
347        self.server_base_url
348            .read()
349            .map(|v| v.clone())
350            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
351    }
352
353    pub async fn api_token(&self) -> Option<String> {
354        self.api_token.read().await.clone()
355    }
356
357    pub async fn set_api_token(&self, token: Option<String>) {
358        *self.api_token.write().await = token;
359    }
360
361    pub async fn startup_snapshot(&self) -> StartupSnapshot {
362        let state = self.startup.read().await.clone();
363        StartupSnapshot {
364            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
365            status: state.status,
366            phase: state.phase,
367            started_at_ms: state.started_at_ms,
368            attempt_id: state.attempt_id,
369            last_error: state.last_error,
370        }
371    }
372
373    pub fn host_runtime_context(&self) -> HostRuntimeContext {
374        self.runtime
375            .get()
376            .map(|runtime| runtime.host_runtime_context.clone())
377            .unwrap_or_else(|| self.host_runtime_context.clone())
378    }
379
380    pub async fn set_phase(&self, phase: impl Into<String>) {
381        let mut startup = self.startup.write().await;
382        startup.phase = phase.into();
383    }
384
385    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
386        self.runtime
387            .set(runtime)
388            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
389        #[cfg(feature = "browser")]
390        self.register_browser_tools().await?;
391        self.tools
392            .register_tool(
393                "pack_builder".to_string(),
394                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
395            )
396            .await;
397        self.tools
398            .register_tool(
399                "mcp_list".to_string(),
400                Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
401            )
402            .await;
403        self.engine_loop
404            .set_spawn_agent_hook(std::sync::Arc::new(
405                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
406            ))
407            .await;
408        self.engine_loop
409            .set_tool_policy_hook(std::sync::Arc::new(
410                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
411            ))
412            .await;
413        self.engine_loop
414            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
415                self.clone(),
416            )))
417            .await;
418        let _ = self.load_shared_resources().await;
419        self.load_routines().await?;
420        let _ = self.load_routine_history().await;
421        let _ = self.load_routine_runs().await;
422        self.load_automations_v2().await?;
423        let _ = self.load_automation_v2_runs().await;
424        let _ = self.load_optimization_campaigns().await;
425        let _ = self.load_optimization_experiments().await;
426        let _ = self.load_bug_monitor_config().await;
427        let _ = self.load_bug_monitor_drafts().await;
428        let _ = self.load_bug_monitor_incidents().await;
429        let _ = self.load_bug_monitor_posts().await;
430        let _ = self.load_external_actions().await;
431        let _ = self.load_workflow_planner_sessions().await;
432        let _ = self.load_workflow_learning_candidates().await;
433        let _ = self.load_context_packs().await;
434        let _ = self.load_workflow_runs().await;
435        let _ = self.load_workflow_hook_overrides().await;
436        let _ = self.reload_workflows().await;
437        let workspace_root = self.workspace_index.snapshot().await.root;
438        let _ = self
439            .agent_teams
440            .ensure_loaded_for_workspace(&workspace_root)
441            .await;
442        let mut startup = self.startup.write().await;
443        startup.status = StartupStatus::Ready;
444        startup.phase = "ready".to_string();
445        startup.last_error = None;
446        Ok(())
447    }
448
449    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
450        let mut startup = self.startup.write().await;
451        startup.status = StartupStatus::Failed;
452        startup.phase = phase.into();
453        startup.last_error = Some(error.into());
454    }
455
456    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
457        let runtime = self.channels_runtime.lock().await;
458        let mut status = runtime.statuses.clone();
459        let diagnostics = runtime.diagnostics.read().await;
460        for spec in registered_channels() {
461            let entry = status
462                .entry(spec.name.to_string())
463                .or_insert(ChannelStatus {
464                    enabled: false,
465                    connected: false,
466                    last_error: None,
467                    active_sessions: 0,
468                    meta: json!({}),
469                });
470            let mut meta = entry.meta.as_object().cloned().unwrap_or_default();
471            if let Some(diag) = diagnostics.get(spec.name) {
472                entry.last_error = diag.last_error.clone().or_else(|| entry.last_error.clone());
473                meta.insert("state".to_string(), Value::String(diag.state.to_string()));
474                meta.insert(
475                    "last_error_code".to_string(),
476                    diag.last_error_code
477                        .map(|code| Value::String(code.to_string()))
478                        .unwrap_or(Value::Null),
479                );
480                meta.insert(
481                    "last_reconnect_at".to_string(),
482                    diag.last_reconnect_at
483                        .map(|value| Value::Number(value.into()))
484                        .unwrap_or(Value::Null),
485                );
486                meta.insert(
487                    "listener_start_count".to_string(),
488                    Value::Number(serde_json::Number::from(diag.listener_start_count)),
489                );
490            } else {
491                meta.insert("state".to_string(), Value::String("stopped".to_string()));
492                meta.insert("last_error_code".to_string(), Value::Null);
493                meta.insert("last_reconnect_at".to_string(), Value::Null);
494                meta.insert(
495                    "listener_start_count".to_string(),
496                    Value::Number(0u64.into()),
497                );
498            }
499            entry.meta = Value::Object(meta);
500        }
501        status
502    }
503
504    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
505        let effective = self.config.get_effective_value().await;
506        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
507        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
508
509        let diagnostics = tandem_channels::new_channel_runtime_diagnostics();
510
511        let mut runtime = self.channels_runtime.lock().await;
512        if let Some(listeners) = runtime.listeners.as_mut() {
513            listeners.abort_all();
514        }
515        runtime.listeners = None;
516        runtime.diagnostics = diagnostics.clone();
517        runtime.statuses.clear();
518        let channels_config_value = serde_json::to_value(&parsed.channels)
519            .ok()
520            .and_then(|channels| channels.as_object().cloned());
521
522        let mut status_map = std::collections::HashMap::new();
523        for spec in registered_channels() {
524            let enabled = channels_config_value
525                .as_ref()
526                .and_then(|channels| channels.get(spec.config_key))
527                .and_then(Value::as_object)
528                .is_some();
529            status_map.insert(
530                spec.name.to_string(),
531                ChannelStatus {
532                    enabled,
533                    connected: false,
534                    last_error: None,
535                    active_sessions: 0,
536                    meta: serde_json::json!({}),
537                },
538            );
539        }
540
541        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
542            let listeners = tandem_channels::start_channel_listeners_with_diagnostics(
543                channels_cfg,
544                diagnostics.clone(),
545            )
546            .await;
547            runtime.listeners = Some(listeners);
548            for status in status_map.values_mut() {
549                if status.enabled {
550                    status.connected = true;
551                }
552            }
553        }
554
555        runtime.statuses = status_map.clone();
556        drop(runtime);
557
558        self.event_bus.publish(EngineEvent::new(
559            "channel.status.changed",
560            serde_json::json!({ "channels": status_map }),
561        ));
562        Ok(())
563    }
564
565    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
566        if !self.shared_resources_path.exists() {
567            return Ok(());
568        }
569        let raw = fs::read_to_string(&self.shared_resources_path).await?;
570        let parsed =
571            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
572                .unwrap_or_default();
573        let mut guard = self.shared_resources.write().await;
574        *guard = parsed;
575        Ok(())
576    }
577
578    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
579        if let Some(parent) = self.shared_resources_path.parent() {
580            fs::create_dir_all(parent).await?;
581        }
582        let payload = {
583            let guard = self.shared_resources.read().await;
584            serde_json::to_string_pretty(&*guard)?
585        };
586        fs::write(&self.shared_resources_path, payload).await?;
587        Ok(())
588    }
589
590    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
591        self.shared_resources.read().await.get(key).cloned()
592    }
593
594    pub async fn list_shared_resources(
595        &self,
596        prefix: Option<&str>,
597        limit: usize,
598    ) -> Vec<SharedResourceRecord> {
599        let limit = limit.clamp(1, 500);
600        let mut rows = self
601            .shared_resources
602            .read()
603            .await
604            .values()
605            .filter(|record| {
606                if let Some(prefix) = prefix {
607                    record.key.starts_with(prefix)
608                } else {
609                    true
610                }
611            })
612            .cloned()
613            .collect::<Vec<_>>();
614        rows.sort_by(|a, b| a.key.cmp(&b.key));
615        rows.truncate(limit);
616        rows
617    }
618
619    pub async fn put_shared_resource(
620        &self,
621        key: String,
622        value: Value,
623        if_match_rev: Option<u64>,
624        updated_by: String,
625        ttl_ms: Option<u64>,
626    ) -> Result<SharedResourceRecord, ResourceStoreError> {
627        if !is_valid_resource_key(&key) {
628            return Err(ResourceStoreError::InvalidKey { key });
629        }
630
631        let now = now_ms();
632        let mut guard = self.shared_resources.write().await;
633        let existing = guard.get(&key).cloned();
634
635        if let Some(expected) = if_match_rev {
636            let current = existing.as_ref().map(|row| row.rev);
637            if current != Some(expected) {
638                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
639                    key,
640                    expected_rev: Some(expected),
641                    current_rev: current,
642                }));
643            }
644        }
645
646        let next_rev = existing
647            .as_ref()
648            .map(|row| row.rev.saturating_add(1))
649            .unwrap_or(1);
650
651        let record = SharedResourceRecord {
652            key: key.clone(),
653            value,
654            rev: next_rev,
655            updated_at_ms: now,
656            updated_by,
657            ttl_ms,
658        };
659
660        let previous = guard.insert(key.clone(), record.clone());
661        drop(guard);
662
663        if let Err(error) = self.persist_shared_resources().await {
664            let mut rollback = self.shared_resources.write().await;
665            if let Some(previous) = previous {
666                rollback.insert(key, previous);
667            } else {
668                rollback.remove(&key);
669            }
670            return Err(ResourceStoreError::PersistFailed {
671                message: error.to_string(),
672            });
673        }
674
675        Ok(record)
676    }
677
678    pub async fn delete_shared_resource(
679        &self,
680        key: &str,
681        if_match_rev: Option<u64>,
682    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
683        if !is_valid_resource_key(key) {
684            return Err(ResourceStoreError::InvalidKey {
685                key: key.to_string(),
686            });
687        }
688
689        let mut guard = self.shared_resources.write().await;
690        let current = guard.get(key).cloned();
691        if let Some(expected) = if_match_rev {
692            let current_rev = current.as_ref().map(|row| row.rev);
693            if current_rev != Some(expected) {
694                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
695                    key: key.to_string(),
696                    expected_rev: Some(expected),
697                    current_rev,
698                }));
699            }
700        }
701
702        let removed = guard.remove(key);
703        drop(guard);
704
705        if let Err(error) = self.persist_shared_resources().await {
706            if let Some(record) = removed.clone() {
707                self.shared_resources
708                    .write()
709                    .await
710                    .insert(record.key.clone(), record);
711            }
712            return Err(ResourceStoreError::PersistFailed {
713                message: error.to_string(),
714            });
715        }
716
717        Ok(removed)
718    }
719
720    pub async fn load_routines(&self) -> anyhow::Result<()> {
721        if !self.routines_path.exists() {
722            return Ok(());
723        }
724        let raw = fs::read_to_string(&self.routines_path).await?;
725        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
726            Ok(parsed) => {
727                let mut guard = self.routines.write().await;
728                *guard = parsed;
729                Ok(())
730            }
731            Err(primary_err) => {
732                let backup_path = config::paths::sibling_backup_path(&self.routines_path);
733                if backup_path.exists() {
734                    let backup_raw = fs::read_to_string(&backup_path).await?;
735                    if let Ok(parsed_backup) = serde_json::from_str::<
736                        std::collections::HashMap<String, RoutineSpec>,
737                    >(&backup_raw)
738                    {
739                        let mut guard = self.routines.write().await;
740                        *guard = parsed_backup;
741                        return Ok(());
742                    }
743                }
744                Err(anyhow::anyhow!(
745                    "failed to parse routines store {}: {primary_err}",
746                    self.routines_path.display()
747                ))
748            }
749        }
750    }
751
752    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
753        if !self.routine_history_path.exists() {
754            return Ok(());
755        }
756        let raw = fs::read_to_string(&self.routine_history_path).await?;
757        let parsed = serde_json::from_str::<
758            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
759        >(&raw)
760        .unwrap_or_default();
761        let mut guard = self.routine_history.write().await;
762        *guard = parsed;
763        Ok(())
764    }
765
766    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
767        if !self.routine_runs_path.exists() {
768            return Ok(());
769        }
770        let raw = fs::read_to_string(&self.routine_runs_path).await?;
771        let parsed =
772            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
773                .unwrap_or_default();
774        let mut guard = self.routine_runs.write().await;
775        *guard = parsed;
776        Ok(())
777    }
778
779    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
780        if let Some(parent) = self.routines_path.parent() {
781            fs::create_dir_all(parent).await?;
782        }
783        let (payload, is_empty) = {
784            let guard = self.routines.read().await;
785            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
786        };
787        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
788            let existing_raw = fs::read_to_string(&self.routines_path)
789                .await
790                .unwrap_or_default();
791            let existing_has_rows = serde_json::from_str::<
792                std::collections::HashMap<String, RoutineSpec>,
793            >(&existing_raw)
794            .map(|rows| !rows.is_empty())
795            .unwrap_or(true);
796            if existing_has_rows {
797                return Err(anyhow::anyhow!(
798                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
799                    self.routines_path.display()
800                ));
801            }
802        }
803        let backup_path = config::paths::sibling_backup_path(&self.routines_path);
804        if self.routines_path.exists() {
805            let _ = fs::copy(&self.routines_path, &backup_path).await;
806        }
807        let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
808        fs::write(&tmp_path, payload).await?;
809        fs::rename(&tmp_path, &self.routines_path).await?;
810        Ok(())
811    }
812
813    pub async fn persist_routines(&self) -> anyhow::Result<()> {
814        self.persist_routines_inner(false).await
815    }
816
817    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
818        if let Some(parent) = self.routine_history_path.parent() {
819            fs::create_dir_all(parent).await?;
820        }
821        let payload = {
822            let guard = self.routine_history.read().await;
823            serde_json::to_string_pretty(&*guard)?
824        };
825        fs::write(&self.routine_history_path, payload).await?;
826        Ok(())
827    }
828
829    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
830        if let Some(parent) = self.routine_runs_path.parent() {
831            fs::create_dir_all(parent).await?;
832        }
833        let payload = {
834            let guard = self.routine_runs.read().await;
835            serde_json::to_string_pretty(&*guard)?
836        };
837        fs::write(&self.routine_runs_path, payload).await?;
838        Ok(())
839    }
840
841    pub async fn put_routine(
842        &self,
843        mut routine: RoutineSpec,
844    ) -> Result<RoutineSpec, RoutineStoreError> {
845        if routine.routine_id.trim().is_empty() {
846            return Err(RoutineStoreError::InvalidRoutineId {
847                routine_id: routine.routine_id,
848            });
849        }
850
851        routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
852        routine.output_targets = normalize_non_empty_list(routine.output_targets);
853
854        let now = now_ms();
855        let next_schedule_fire =
856            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
857                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
858                    detail: "invalid schedule or timezone".to_string(),
859                })?;
860        match routine.schedule {
861            RoutineSchedule::IntervalSeconds { seconds } => {
862                if seconds == 0 {
863                    return Err(RoutineStoreError::InvalidSchedule {
864                        detail: "interval_seconds must be > 0".to_string(),
865                    });
866                }
867                let _ = seconds;
868            }
869            RoutineSchedule::Cron { .. } => {}
870        }
871        if routine.next_fire_at_ms.is_none() {
872            routine.next_fire_at_ms = Some(next_schedule_fire);
873        }
874
875        let mut guard = self.routines.write().await;
876        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
877        drop(guard);
878
879        if let Err(error) = self.persist_routines().await {
880            let mut rollback = self.routines.write().await;
881            if let Some(previous) = previous {
882                rollback.insert(previous.routine_id.clone(), previous);
883            } else {
884                rollback.remove(&routine.routine_id);
885            }
886            return Err(RoutineStoreError::PersistFailed {
887                message: error.to_string(),
888            });
889        }
890
891        Ok(routine)
892    }
893
894    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
895        let mut rows = self
896            .routines
897            .read()
898            .await
899            .values()
900            .cloned()
901            .collect::<Vec<_>>();
902        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
903        rows
904    }
905
906    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
907        self.routines.read().await.get(routine_id).cloned()
908    }
909
910    pub async fn delete_routine(
911        &self,
912        routine_id: &str,
913    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
914        let mut guard = self.routines.write().await;
915        let removed = guard.remove(routine_id);
916        drop(guard);
917
918        let allow_empty_overwrite = self.routines.read().await.is_empty();
919        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
920            if let Some(removed) = removed.clone() {
921                self.routines
922                    .write()
923                    .await
924                    .insert(removed.routine_id.clone(), removed);
925            }
926            return Err(RoutineStoreError::PersistFailed {
927                message: error.to_string(),
928            });
929        }
930        Ok(removed)
931    }
932
933    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
934        let mut plans = Vec::new();
935        let mut guard = self.routines.write().await;
936        for routine in guard.values_mut() {
937            if routine.status != RoutineStatus::Active {
938                continue;
939            }
940            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
941                continue;
942            };
943            if now_ms < next_fire_at_ms {
944                continue;
945            }
946            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
947                now_ms,
948                next_fire_at_ms,
949                &routine.schedule,
950                &routine.timezone,
951                &routine.misfire_policy,
952            );
953            routine.next_fire_at_ms = Some(next_fire_at_ms);
954            if run_count == 0 {
955                continue;
956            }
957            plans.push(RoutineTriggerPlan {
958                routine_id: routine.routine_id.clone(),
959                run_count,
960                scheduled_at_ms: now_ms,
961                next_fire_at_ms,
962            });
963        }
964        drop(guard);
965        let _ = self.persist_routines().await;
966        plans
967    }
968
969    pub async fn mark_routine_fired(
970        &self,
971        routine_id: &str,
972        fired_at_ms: u64,
973    ) -> Option<RoutineSpec> {
974        let mut guard = self.routines.write().await;
975        let routine = guard.get_mut(routine_id)?;
976        routine.last_fired_at_ms = Some(fired_at_ms);
977        let updated = routine.clone();
978        drop(guard);
979        let _ = self.persist_routines().await;
980        Some(updated)
981    }
982
983    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
984        let mut history = self.routine_history.write().await;
985        history
986            .entry(event.routine_id.clone())
987            .or_default()
988            .push(event);
989        drop(history);
990        let _ = self.persist_routine_history().await;
991    }
992
993    pub async fn list_routine_history(
994        &self,
995        routine_id: &str,
996        limit: usize,
997    ) -> Vec<RoutineHistoryEvent> {
998        let limit = limit.clamp(1, 500);
999        let mut rows = self
1000            .routine_history
1001            .read()
1002            .await
1003            .get(routine_id)
1004            .cloned()
1005            .unwrap_or_default();
1006        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1007        rows.truncate(limit);
1008        rows
1009    }
1010
1011    pub async fn create_routine_run(
1012        &self,
1013        routine: &RoutineSpec,
1014        trigger_type: &str,
1015        run_count: u32,
1016        status: RoutineRunStatus,
1017        detail: Option<String>,
1018    ) -> RoutineRunRecord {
1019        let now = now_ms();
1020        let record = RoutineRunRecord {
1021            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1022            routine_id: routine.routine_id.clone(),
1023            trigger_type: trigger_type.to_string(),
1024            run_count,
1025            status,
1026            created_at_ms: now,
1027            updated_at_ms: now,
1028            fired_at_ms: Some(now),
1029            started_at_ms: None,
1030            finished_at_ms: None,
1031            requires_approval: routine.requires_approval,
1032            approval_reason: None,
1033            denial_reason: None,
1034            paused_reason: None,
1035            detail,
1036            entrypoint: routine.entrypoint.clone(),
1037            args: routine.args.clone(),
1038            allowed_tools: routine.allowed_tools.clone(),
1039            output_targets: routine.output_targets.clone(),
1040            artifacts: Vec::new(),
1041            active_session_ids: Vec::new(),
1042            latest_session_id: None,
1043            prompt_tokens: 0,
1044            completion_tokens: 0,
1045            total_tokens: 0,
1046            estimated_cost_usd: 0.0,
1047        };
1048        self.routine_runs
1049            .write()
1050            .await
1051            .insert(record.run_id.clone(), record.clone());
1052        let _ = self.persist_routine_runs().await;
1053        record
1054    }
1055
1056    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1057        self.routine_runs.read().await.get(run_id).cloned()
1058    }
1059
1060    pub async fn list_routine_runs(
1061        &self,
1062        routine_id: Option<&str>,
1063        limit: usize,
1064    ) -> Vec<RoutineRunRecord> {
1065        let mut rows = self
1066            .routine_runs
1067            .read()
1068            .await
1069            .values()
1070            .filter(|row| {
1071                if let Some(id) = routine_id {
1072                    row.routine_id == id
1073                } else {
1074                    true
1075                }
1076            })
1077            .cloned()
1078            .collect::<Vec<_>>();
1079        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1080        rows.truncate(limit.clamp(1, 500));
1081        rows
1082    }
1083
1084    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1085        let mut guard = self.routine_runs.write().await;
1086        let next_run_id = guard
1087            .values()
1088            .filter(|row| row.status == RoutineRunStatus::Queued)
1089            .min_by(|a, b| {
1090                a.created_at_ms
1091                    .cmp(&b.created_at_ms)
1092                    .then_with(|| a.run_id.cmp(&b.run_id))
1093            })
1094            .map(|row| row.run_id.clone())?;
1095        let now = now_ms();
1096        let row = guard.get_mut(&next_run_id)?;
1097        row.status = RoutineRunStatus::Running;
1098        row.updated_at_ms = now;
1099        row.started_at_ms = Some(now);
1100        let claimed = row.clone();
1101        drop(guard);
1102        let _ = self.persist_routine_runs().await;
1103        Some(claimed)
1104    }
1105
1106    pub async fn set_routine_session_policy(
1107        &self,
1108        session_id: String,
1109        run_id: String,
1110        routine_id: String,
1111        allowed_tools: Vec<String>,
1112    ) {
1113        let policy = RoutineSessionPolicy {
1114            session_id: session_id.clone(),
1115            run_id,
1116            routine_id,
1117            allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1118        };
1119        self.routine_session_policies
1120            .write()
1121            .await
1122            .insert(session_id, policy);
1123    }
1124
1125    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1126        self.routine_session_policies
1127            .read()
1128            .await
1129            .get(session_id)
1130            .cloned()
1131    }
1132
1133    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1134        self.routine_session_policies
1135            .write()
1136            .await
1137            .remove(session_id);
1138    }
1139
1140    pub async fn update_routine_run_status(
1141        &self,
1142        run_id: &str,
1143        status: RoutineRunStatus,
1144        reason: Option<String>,
1145    ) -> Option<RoutineRunRecord> {
1146        let mut guard = self.routine_runs.write().await;
1147        let row = guard.get_mut(run_id)?;
1148        row.status = status.clone();
1149        row.updated_at_ms = now_ms();
1150        match status {
1151            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1152            RoutineRunStatus::Running => {
1153                row.started_at_ms.get_or_insert_with(now_ms);
1154                if let Some(detail) = reason {
1155                    row.detail = Some(detail);
1156                }
1157            }
1158            RoutineRunStatus::Denied => row.denial_reason = reason,
1159            RoutineRunStatus::Paused => row.paused_reason = reason,
1160            RoutineRunStatus::Completed
1161            | RoutineRunStatus::Failed
1162            | RoutineRunStatus::Cancelled => {
1163                row.finished_at_ms = Some(now_ms());
1164                if let Some(detail) = reason {
1165                    row.detail = Some(detail);
1166                }
1167            }
1168            _ => {
1169                if let Some(detail) = reason {
1170                    row.detail = Some(detail);
1171                }
1172            }
1173        }
1174        let updated = row.clone();
1175        drop(guard);
1176        let _ = self.persist_routine_runs().await;
1177        Some(updated)
1178    }
1179
1180    pub async fn append_routine_run_artifact(
1181        &self,
1182        run_id: &str,
1183        artifact: RoutineRunArtifact,
1184    ) -> Option<RoutineRunRecord> {
1185        let mut guard = self.routine_runs.write().await;
1186        let row = guard.get_mut(run_id)?;
1187        row.updated_at_ms = now_ms();
1188        row.artifacts.push(artifact);
1189        let updated = row.clone();
1190        drop(guard);
1191        let _ = self.persist_routine_runs().await;
1192        Some(updated)
1193    }
1194
1195    pub async fn add_active_session_id(
1196        &self,
1197        run_id: &str,
1198        session_id: String,
1199    ) -> Option<RoutineRunRecord> {
1200        let mut guard = self.routine_runs.write().await;
1201        let row = guard.get_mut(run_id)?;
1202        if !row.active_session_ids.iter().any(|id| id == &session_id) {
1203            row.active_session_ids.push(session_id);
1204        }
1205        row.latest_session_id = row.active_session_ids.last().cloned();
1206        row.updated_at_ms = now_ms();
1207        let updated = row.clone();
1208        drop(guard);
1209        let _ = self.persist_routine_runs().await;
1210        Some(updated)
1211    }
1212
1213    pub async fn clear_active_session_id(
1214        &self,
1215        run_id: &str,
1216        session_id: &str,
1217    ) -> Option<RoutineRunRecord> {
1218        let mut guard = self.routine_runs.write().await;
1219        let row = guard.get_mut(run_id)?;
1220        row.active_session_ids.retain(|id| id != session_id);
1221        row.updated_at_ms = now_ms();
1222        let updated = row.clone();
1223        drop(guard);
1224        let _ = self.persist_routine_runs().await;
1225        Some(updated)
1226    }
1227
1228    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1229        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1230        let mut loaded_from_alternate = false;
1231        let mut migrated = false;
1232        let mut path_counts = Vec::new();
1233        let mut canonical_loaded = false;
1234        if self.automations_v2_path.exists() {
1235            let raw = fs::read_to_string(&self.automations_v2_path).await?;
1236            if raw.trim().is_empty() || raw.trim() == "{}" {
1237                path_counts.push((self.automations_v2_path.clone(), 0usize));
1238            } else {
1239                let parsed = parse_automation_v2_file(&raw);
1240                path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1241                canonical_loaded = !parsed.is_empty();
1242                merged = parsed;
1243            }
1244        } else {
1245            path_counts.push((self.automations_v2_path.clone(), 0usize));
1246        }
1247        if !canonical_loaded {
1248            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1249                if path == self.automations_v2_path {
1250                    continue;
1251                }
1252                if !path.exists() {
1253                    path_counts.push((path, 0usize));
1254                    continue;
1255                }
1256                let raw = fs::read_to_string(&path).await?;
1257                if raw.trim().is_empty() || raw.trim() == "{}" {
1258                    path_counts.push((path, 0usize));
1259                    continue;
1260                }
1261                let parsed = parse_automation_v2_file(&raw);
1262                path_counts.push((path.clone(), parsed.len()));
1263                if !parsed.is_empty() {
1264                    loaded_from_alternate = true;
1265                }
1266                for (automation_id, automation) in parsed {
1267                    match merged.get(&automation_id) {
1268                        Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1269                        _ => {
1270                            merged.insert(automation_id, automation);
1271                        }
1272                    }
1273                }
1274            }
1275        } else {
1276            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1277                if path == self.automations_v2_path {
1278                    continue;
1279                }
1280                if !path.exists() {
1281                    path_counts.push((path, 0usize));
1282                    continue;
1283                }
1284                let raw = fs::read_to_string(&path).await?;
1285                let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1286                    0usize
1287                } else {
1288                    parse_automation_v2_file(&raw).len()
1289                };
1290                path_counts.push((path, count));
1291            }
1292        }
1293        let active_path = self.automations_v2_path.display().to_string();
1294        let path_count_summary = path_counts
1295            .iter()
1296            .map(|(path, count)| format!("{}={count}", path.display()))
1297            .collect::<Vec<_>>();
1298        tracing::info!(
1299            active_path,
1300            canonical_loaded,
1301            path_counts = ?path_count_summary,
1302            merged_count = merged.len(),
1303            "loaded automation v2 definitions"
1304        );
1305        for automation in merged.values_mut() {
1306            migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1307        }
1308        *self.automations_v2.write().await = merged;
1309        if loaded_from_alternate || migrated {
1310            let _ = self.persist_automations_v2().await;
1311        } else if canonical_loaded {
1312            let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1313        }
1314        Ok(())
1315    }
1316
1317    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1318        let _guard = self.automations_v2_persistence.lock().await;
1319        self.persist_automations_v2_locked().await
1320    }
1321
1322    async fn persist_automations_v2_locked(&self) -> anyhow::Result<()> {
1323        let payload = {
1324            let guard = self.automations_v2.read().await;
1325            serde_json::to_string_pretty(&*guard)?
1326        };
1327        if let Some(parent) = self.automations_v2_path.parent() {
1328            fs::create_dir_all(parent).await?;
1329        }
1330        write_string_atomic(&self.automations_v2_path, &payload).await?;
1331        let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1332        Ok(())
1333    }
1334
1335    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1336        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1337        let mut loaded_from_alternate = false;
1338        let mut path_counts = Vec::new();
1339        for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1340            if !path.exists() {
1341                path_counts.push((path, 0usize));
1342                continue;
1343            }
1344            let raw = fs::read_to_string(&path).await?;
1345            if raw.trim().is_empty() || raw.trim() == "{}" {
1346                path_counts.push((path, 0usize));
1347                continue;
1348            }
1349            let parsed = parse_automation_v2_runs_file(&raw);
1350            path_counts.push((path.clone(), parsed.len()));
1351            if path != self.automation_v2_runs_path {
1352                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1353            }
1354            for (run_id, run) in parsed {
1355                match merged.get(&run_id) {
1356                    Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1357                    _ => {
1358                        merged.insert(run_id, run);
1359                    }
1360                }
1361            }
1362        }
1363        let active_runs_path = self.automation_v2_runs_path.display().to_string();
1364        let run_path_count_summary = path_counts
1365            .iter()
1366            .map(|(path, count)| format!("{}={count}", path.display()))
1367            .collect::<Vec<_>>();
1368        tracing::info!(
1369            active_path = active_runs_path,
1370            path_counts = ?run_path_count_summary,
1371            merged_count = merged.len(),
1372            "loaded automation v2 runs"
1373        );
1374        *self.automation_v2_runs.write().await = merged;
1375        let recovered = self
1376            .recover_automation_definitions_from_run_snapshots()
1377            .await?;
1378        let automation_count = self.automations_v2.read().await.len();
1379        let run_count = self.automation_v2_runs.read().await.len();
1380        if automation_count == 0 && run_count > 0 {
1381            let active_automations_path = self.automations_v2_path.display().to_string();
1382            let active_runs_path = self.automation_v2_runs_path.display().to_string();
1383            tracing::warn!(
1384                active_automations_path,
1385                active_runs_path,
1386                run_count,
1387                "automation v2 definitions are empty while run history exists"
1388            );
1389        }
1390        if loaded_from_alternate || recovered > 0 {
1391            let _ = self.persist_automation_v2_runs().await;
1392        }
1393        Ok(())
1394    }
1395
1396    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1397        let payload = {
1398            let guard = self.automation_v2_runs.read().await;
1399            serde_json::to_string_pretty(&*guard)?
1400        };
1401        if let Some(parent) = self.automation_v2_runs_path.parent() {
1402            fs::create_dir_all(parent).await?;
1403        }
1404        fs::write(&self.automation_v2_runs_path, &payload).await?;
1405        Ok(())
1406    }
1407
1408    pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1409        if !self.optimization_campaigns_path.exists() {
1410            return Ok(());
1411        }
1412        let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1413        let parsed = parse_optimization_campaigns_file(&raw);
1414        *self.optimization_campaigns.write().await = parsed;
1415        Ok(())
1416    }
1417
1418    pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1419        let payload = {
1420            let guard = self.optimization_campaigns.read().await;
1421            serde_json::to_string_pretty(&*guard)?
1422        };
1423        if let Some(parent) = self.optimization_campaigns_path.parent() {
1424            fs::create_dir_all(parent).await?;
1425        }
1426        fs::write(&self.optimization_campaigns_path, payload).await?;
1427        Ok(())
1428    }
1429
1430    pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1431        if !self.optimization_experiments_path.exists() {
1432            return Ok(());
1433        }
1434        let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1435        let parsed = parse_optimization_experiments_file(&raw);
1436        *self.optimization_experiments.write().await = parsed;
1437        Ok(())
1438    }
1439
1440    pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1441        let payload = {
1442            let guard = self.optimization_experiments.read().await;
1443            serde_json::to_string_pretty(&*guard)?
1444        };
1445        if let Some(parent) = self.optimization_experiments_path.parent() {
1446            fs::create_dir_all(parent).await?;
1447        }
1448        fs::write(&self.optimization_experiments_path, payload).await?;
1449        Ok(())
1450    }
1451
1452    async fn verify_automation_v2_persisted_locked(
1453        &self,
1454        automation_id: &str,
1455        expected_present: bool,
1456    ) -> anyhow::Result<()> {
1457        let active_raw = if self.automations_v2_path.exists() {
1458            fs::read_to_string(&self.automations_v2_path).await?
1459        } else {
1460            String::new()
1461        };
1462        let active_parsed = parse_automation_v2_file_strict(&active_raw).map_err(|error| {
1463            anyhow::anyhow!(
1464                "failed to parse automation v2 persistence file `{}` during verification: {}",
1465                self.automations_v2_path.display(),
1466                error
1467            )
1468        })?;
1469        let active_present = active_parsed.contains_key(automation_id);
1470        if active_present != expected_present {
1471            let active_path = self.automations_v2_path.display().to_string();
1472            tracing::error!(
1473                automation_id,
1474                expected_present,
1475                actual_present = active_present,
1476                count = active_parsed.len(),
1477                active_path,
1478                "automation v2 persistence verification failed"
1479            );
1480            anyhow::bail!(
1481                "automation v2 persistence verification failed for `{}`",
1482                automation_id
1483            );
1484        }
1485        let mut alternate_mismatches = Vec::new();
1486        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1487            if path == self.automations_v2_path {
1488                continue;
1489            }
1490            let raw = if path.exists() {
1491                fs::read_to_string(&path).await?
1492            } else {
1493                String::new()
1494            };
1495            let parsed = match parse_automation_v2_file_strict(&raw) {
1496                Ok(parsed) => parsed,
1497                Err(error) => {
1498                    alternate_mismatches.push(format!(
1499                        "{} expected_present={} parse_error={error}",
1500                        path.display(),
1501                        expected_present
1502                    ));
1503                    continue;
1504                }
1505            };
1506            let present = parsed.contains_key(automation_id);
1507            if present != expected_present {
1508                alternate_mismatches.push(format!(
1509                    "{} expected_present={} actual_present={} count={}",
1510                    path.display(),
1511                    expected_present,
1512                    present,
1513                    parsed.len()
1514                ));
1515            }
1516        }
1517        if !alternate_mismatches.is_empty() {
1518            let active_path = self.automations_v2_path.display().to_string();
1519            tracing::warn!(
1520                automation_id,
1521                expected_present,
1522                mismatches = ?alternate_mismatches,
1523                active_path,
1524                "automation v2 alternate persistence paths are stale"
1525            );
1526        }
1527        Ok(())
1528    }
1529
1530    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1531        let runs = self
1532            .automation_v2_runs
1533            .read()
1534            .await
1535            .values()
1536            .cloned()
1537            .collect::<Vec<_>>();
1538        let mut guard = self.automations_v2.write().await;
1539        let mut recovered = 0usize;
1540        for run in runs {
1541            let Some(snapshot) = run.automation_snapshot.clone() else {
1542                continue;
1543            };
1544            let should_replace = match guard.get(&run.automation_id) {
1545                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1546                None => true,
1547            };
1548            if should_replace {
1549                if !guard.contains_key(&run.automation_id) {
1550                    recovered += 1;
1551                }
1552                guard.insert(run.automation_id.clone(), snapshot);
1553            }
1554        }
1555        drop(guard);
1556        if recovered > 0 {
1557            let active_path = self.automations_v2_path.display().to_string();
1558            tracing::warn!(
1559                recovered,
1560                active_path,
1561                "recovered automation v2 definitions from run snapshots"
1562            );
1563            self.persist_automations_v2().await?;
1564        }
1565        Ok(recovered)
1566    }
1567
1568    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1569        let path = if self.bug_monitor_config_path.exists() {
1570            self.bug_monitor_config_path.clone()
1571        } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1572            .exists()
1573        {
1574            config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1575        } else {
1576            return Ok(());
1577        };
1578        let raw = fs::read_to_string(path).await?;
1579        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1580            .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1581        *self.bug_monitor_config.write().await = parsed;
1582        Ok(())
1583    }
1584
1585    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1586        if let Some(parent) = self.bug_monitor_config_path.parent() {
1587            fs::create_dir_all(parent).await?;
1588        }
1589        let payload = {
1590            let guard = self.bug_monitor_config.read().await;
1591            serde_json::to_string_pretty(&*guard)?
1592        };
1593        fs::write(&self.bug_monitor_config_path, payload).await?;
1594        Ok(())
1595    }
1596
1597    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1598        self.bug_monitor_config.read().await.clone()
1599    }
1600
1601    pub async fn put_bug_monitor_config(
1602        &self,
1603        mut config: BugMonitorConfig,
1604    ) -> anyhow::Result<BugMonitorConfig> {
1605        config.workspace_root = config
1606            .workspace_root
1607            .as_ref()
1608            .map(|v| v.trim().to_string())
1609            .filter(|v| !v.is_empty());
1610        if let Some(repo) = config.repo.as_ref() {
1611            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1612                anyhow::bail!("repo must be in owner/repo format");
1613            }
1614        }
1615        if let Some(server) = config.mcp_server.as_ref() {
1616            let servers = self.mcp.list().await;
1617            if !servers.contains_key(server) {
1618                anyhow::bail!("unknown mcp server `{server}`");
1619            }
1620        }
1621        if let Some(model_policy) = config.model_policy.as_ref() {
1622            crate::http::routines_automations::validate_model_policy(model_policy)
1623                .map_err(anyhow::Error::msg)?;
1624        }
1625        config.updated_at_ms = now_ms();
1626        *self.bug_monitor_config.write().await = config.clone();
1627        self.persist_bug_monitor_config().await?;
1628        Ok(config)
1629    }
1630
1631    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1632        let path = if self.bug_monitor_drafts_path.exists() {
1633            self.bug_monitor_drafts_path.clone()
1634        } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1635            .exists()
1636        {
1637            config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1638        } else {
1639            return Ok(());
1640        };
1641        let raw = fs::read_to_string(path).await?;
1642        let parsed =
1643            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1644                .unwrap_or_default();
1645        *self.bug_monitor_drafts.write().await = parsed;
1646        Ok(())
1647    }
1648
1649    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1650        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1651            fs::create_dir_all(parent).await?;
1652        }
1653        let payload = {
1654            let guard = self.bug_monitor_drafts.read().await;
1655            serde_json::to_string_pretty(&*guard)?
1656        };
1657        fs::write(&self.bug_monitor_drafts_path, payload).await?;
1658        Ok(())
1659    }
1660
1661    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1662        let path = if self.bug_monitor_incidents_path.exists() {
1663            self.bug_monitor_incidents_path.clone()
1664        } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1665            .exists()
1666        {
1667            config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1668        } else {
1669            return Ok(());
1670        };
1671        let raw = fs::read_to_string(path).await?;
1672        let parsed = serde_json::from_str::<
1673            std::collections::HashMap<String, BugMonitorIncidentRecord>,
1674        >(&raw)
1675        .unwrap_or_default();
1676        *self.bug_monitor_incidents.write().await = parsed;
1677        Ok(())
1678    }
1679
1680    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1681        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1682            fs::create_dir_all(parent).await?;
1683        }
1684        let payload = {
1685            let guard = self.bug_monitor_incidents.read().await;
1686            serde_json::to_string_pretty(&*guard)?
1687        };
1688        fs::write(&self.bug_monitor_incidents_path, payload).await?;
1689        Ok(())
1690    }
1691
1692    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1693        let path = if self.bug_monitor_posts_path.exists() {
1694            self.bug_monitor_posts_path.clone()
1695        } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1696            .exists()
1697        {
1698            config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1699        } else {
1700            return Ok(());
1701        };
1702        let raw = fs::read_to_string(path).await?;
1703        let parsed =
1704            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1705                .unwrap_or_default();
1706        *self.bug_monitor_posts.write().await = parsed;
1707        Ok(())
1708    }
1709
1710    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1711        if let Some(parent) = self.bug_monitor_posts_path.parent() {
1712            fs::create_dir_all(parent).await?;
1713        }
1714        let payload = {
1715            let guard = self.bug_monitor_posts.read().await;
1716            serde_json::to_string_pretty(&*guard)?
1717        };
1718        fs::write(&self.bug_monitor_posts_path, payload).await?;
1719        Ok(())
1720    }
1721
1722    pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1723        if !self.external_actions_path.exists() {
1724            return Ok(());
1725        }
1726        let raw = fs::read_to_string(&self.external_actions_path).await?;
1727        let parsed =
1728            serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1729                .unwrap_or_default();
1730        *self.external_actions.write().await = parsed;
1731        Ok(())
1732    }
1733
1734    pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1735        if let Some(parent) = self.external_actions_path.parent() {
1736            fs::create_dir_all(parent).await?;
1737        }
1738        let payload = {
1739            let guard = self.external_actions.read().await;
1740            serde_json::to_string_pretty(&*guard)?
1741        };
1742        fs::write(&self.external_actions_path, payload).await?;
1743        Ok(())
1744    }
1745
1746    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1747        let mut rows = self
1748            .bug_monitor_incidents
1749            .read()
1750            .await
1751            .values()
1752            .cloned()
1753            .collect::<Vec<_>>();
1754        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1755        rows.truncate(limit.clamp(1, 200));
1756        rows
1757    }
1758
1759    pub async fn get_bug_monitor_incident(
1760        &self,
1761        incident_id: &str,
1762    ) -> Option<BugMonitorIncidentRecord> {
1763        self.bug_monitor_incidents
1764            .read()
1765            .await
1766            .get(incident_id)
1767            .cloned()
1768    }
1769
1770    pub async fn put_bug_monitor_incident(
1771        &self,
1772        incident: BugMonitorIncidentRecord,
1773    ) -> anyhow::Result<BugMonitorIncidentRecord> {
1774        self.bug_monitor_incidents
1775            .write()
1776            .await
1777            .insert(incident.incident_id.clone(), incident.clone());
1778        self.persist_bug_monitor_incidents().await?;
1779        Ok(incident)
1780    }
1781
1782    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1783        let mut rows = self
1784            .bug_monitor_posts
1785            .read()
1786            .await
1787            .values()
1788            .cloned()
1789            .collect::<Vec<_>>();
1790        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1791        rows.truncate(limit.clamp(1, 200));
1792        rows
1793    }
1794
1795    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1796        self.bug_monitor_posts.read().await.get(post_id).cloned()
1797    }
1798
1799    pub async fn put_bug_monitor_post(
1800        &self,
1801        post: BugMonitorPostRecord,
1802    ) -> anyhow::Result<BugMonitorPostRecord> {
1803        self.bug_monitor_posts
1804            .write()
1805            .await
1806            .insert(post.post_id.clone(), post.clone());
1807        self.persist_bug_monitor_posts().await?;
1808        Ok(post)
1809    }
1810
1811    pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1812        let mut rows = self
1813            .external_actions
1814            .read()
1815            .await
1816            .values()
1817            .cloned()
1818            .collect::<Vec<_>>();
1819        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1820        rows.truncate(limit.clamp(1, 200));
1821        rows
1822    }
1823
1824    pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1825        self.external_actions.read().await.get(action_id).cloned()
1826    }
1827
1828    pub async fn get_external_action_by_idempotency_key(
1829        &self,
1830        idempotency_key: &str,
1831    ) -> Option<ExternalActionRecord> {
1832        let normalized = idempotency_key.trim();
1833        if normalized.is_empty() {
1834            return None;
1835        }
1836        self.external_actions
1837            .read()
1838            .await
1839            .values()
1840            .find(|action| {
1841                action
1842                    .idempotency_key
1843                    .as_deref()
1844                    .map(str::trim)
1845                    .filter(|value| !value.is_empty())
1846                    == Some(normalized)
1847            })
1848            .cloned()
1849    }
1850
1851    pub async fn put_external_action(
1852        &self,
1853        action: ExternalActionRecord,
1854    ) -> anyhow::Result<ExternalActionRecord> {
1855        self.external_actions
1856            .write()
1857            .await
1858            .insert(action.action_id.clone(), action.clone());
1859        self.persist_external_actions().await?;
1860        Ok(action)
1861    }
1862
1863    pub async fn record_external_action(
1864        &self,
1865        action: ExternalActionRecord,
1866    ) -> anyhow::Result<ExternalActionRecord> {
1867        let action = {
1868            let mut guard = self.external_actions.write().await;
1869            if let Some(idempotency_key) = action
1870                .idempotency_key
1871                .as_deref()
1872                .map(str::trim)
1873                .filter(|value| !value.is_empty())
1874            {
1875                if let Some(existing) = guard
1876                    .values()
1877                    .find(|existing| {
1878                        existing
1879                            .idempotency_key
1880                            .as_deref()
1881                            .map(str::trim)
1882                            .filter(|value| !value.is_empty())
1883                            == Some(idempotency_key)
1884                    })
1885                    .cloned()
1886                {
1887                    return Ok(existing);
1888                }
1889            }
1890            guard.insert(action.action_id.clone(), action.clone());
1891            action
1892        };
1893        self.persist_external_actions().await?;
1894        if let Some(run_id) = action.routine_run_id.as_deref() {
1895            let artifact = RoutineRunArtifact {
1896                artifact_id: format!("external-action-{}", action.action_id),
1897                uri: format!("external-action://{}", action.action_id),
1898                kind: "external_action_receipt".to_string(),
1899                label: Some(format!("external action receipt: {}", action.operation)),
1900                created_at_ms: action.updated_at_ms,
1901                metadata: Some(json!({
1902                    "actionID": action.action_id,
1903                    "operation": action.operation,
1904                    "status": action.status,
1905                    "sourceKind": action.source_kind,
1906                    "sourceID": action.source_id,
1907                    "capabilityID": action.capability_id,
1908                    "target": action.target,
1909                })),
1910            };
1911            let _ = self
1912                .append_routine_run_artifact(run_id, artifact.clone())
1913                .await;
1914            if let Some(runtime) = self.runtime.get() {
1915                runtime.event_bus.publish(EngineEvent::new(
1916                    "routine.run.artifact_added",
1917                    json!({
1918                        "runID": run_id,
1919                        "artifact": artifact,
1920                    }),
1921                ));
1922            }
1923        }
1924        if let Some(context_run_id) = action.context_run_id.as_deref() {
1925            let payload = serde_json::to_value(&action)?;
1926            if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1927                self,
1928                context_run_id,
1929                &format!("external-action-{}", action.action_id),
1930                "external_action_receipt",
1931                &format!("external-actions/{}.json", action.action_id),
1932                &payload,
1933            )
1934            .await
1935            {
1936                tracing::warn!(
1937                    "failed to append external action artifact {} to context run {}: {}",
1938                    action.action_id,
1939                    context_run_id,
1940                    error
1941                );
1942            }
1943        }
1944        Ok(action)
1945    }
1946
1947    pub async fn update_bug_monitor_runtime_status(
1948        &self,
1949        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1950    ) -> BugMonitorRuntimeStatus {
1951        let mut guard = self.bug_monitor_runtime_status.write().await;
1952        update(&mut guard);
1953        guard.clone()
1954    }
1955
1956    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1957        let mut rows = self
1958            .bug_monitor_drafts
1959            .read()
1960            .await
1961            .values()
1962            .cloned()
1963            .collect::<Vec<_>>();
1964        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1965        rows.truncate(limit.clamp(1, 200));
1966        rows
1967    }
1968
1969    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1970        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1971    }
1972
1973    pub async fn put_bug_monitor_draft(
1974        &self,
1975        draft: BugMonitorDraftRecord,
1976    ) -> anyhow::Result<BugMonitorDraftRecord> {
1977        self.bug_monitor_drafts
1978            .write()
1979            .await
1980            .insert(draft.draft_id.clone(), draft.clone());
1981        self.persist_bug_monitor_drafts().await?;
1982        Ok(draft)
1983    }
1984
1985    pub async fn submit_bug_monitor_draft(
1986        &self,
1987        mut submission: BugMonitorSubmission,
1988    ) -> anyhow::Result<BugMonitorDraftRecord> {
1989        fn normalize_optional(value: Option<String>) -> Option<String> {
1990            value
1991                .map(|v| v.trim().to_string())
1992                .filter(|v| !v.is_empty())
1993        }
1994
1995        fn compute_fingerprint(parts: &[&str]) -> String {
1996            use std::hash::{Hash, Hasher};
1997
1998            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1999            for part in parts {
2000                part.hash(&mut hasher);
2001            }
2002            format!("{:016x}", hasher.finish())
2003        }
2004
2005        submission.repo = normalize_optional(submission.repo);
2006        submission.title = normalize_optional(submission.title);
2007        submission.detail = normalize_optional(submission.detail);
2008        submission.source = normalize_optional(submission.source);
2009        submission.run_id = normalize_optional(submission.run_id);
2010        submission.session_id = normalize_optional(submission.session_id);
2011        submission.correlation_id = normalize_optional(submission.correlation_id);
2012        submission.file_name = normalize_optional(submission.file_name);
2013        submission.process = normalize_optional(submission.process);
2014        submission.component = normalize_optional(submission.component);
2015        submission.event = normalize_optional(submission.event);
2016        submission.level = normalize_optional(submission.level);
2017        submission.fingerprint = normalize_optional(submission.fingerprint);
2018        submission.excerpt = submission
2019            .excerpt
2020            .into_iter()
2021            .map(|line| line.trim_end().to_string())
2022            .filter(|line| !line.is_empty())
2023            .take(50)
2024            .collect();
2025
2026        let config = self.bug_monitor_config().await;
2027        let repo = submission
2028            .repo
2029            .clone()
2030            .or(config.repo.clone())
2031            .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
2032        if !is_valid_owner_repo_slug(&repo) {
2033            anyhow::bail!("Bug Monitor repo must be in owner/repo format");
2034        }
2035
2036        let title = submission.title.clone().unwrap_or_else(|| {
2037            if let Some(event) = submission.event.as_ref() {
2038                format!("Failure detected in {event}")
2039            } else if let Some(component) = submission.component.as_ref() {
2040                format!("Failure detected in {component}")
2041            } else if let Some(process) = submission.process.as_ref() {
2042                format!("Failure detected in {process}")
2043            } else if let Some(source) = submission.source.as_ref() {
2044                format!("Failure report from {source}")
2045            } else {
2046                "Failure report".to_string()
2047            }
2048        });
2049
2050        let mut detail_lines = Vec::new();
2051        if let Some(source) = submission.source.as_ref() {
2052            detail_lines.push(format!("source: {source}"));
2053        }
2054        if let Some(file_name) = submission.file_name.as_ref() {
2055            detail_lines.push(format!("file: {file_name}"));
2056        }
2057        if let Some(level) = submission.level.as_ref() {
2058            detail_lines.push(format!("level: {level}"));
2059        }
2060        if let Some(process) = submission.process.as_ref() {
2061            detail_lines.push(format!("process: {process}"));
2062        }
2063        if let Some(component) = submission.component.as_ref() {
2064            detail_lines.push(format!("component: {component}"));
2065        }
2066        if let Some(event) = submission.event.as_ref() {
2067            detail_lines.push(format!("event: {event}"));
2068        }
2069        if let Some(run_id) = submission.run_id.as_ref() {
2070            detail_lines.push(format!("run_id: {run_id}"));
2071        }
2072        if let Some(session_id) = submission.session_id.as_ref() {
2073            detail_lines.push(format!("session_id: {session_id}"));
2074        }
2075        if let Some(correlation_id) = submission.correlation_id.as_ref() {
2076            detail_lines.push(format!("correlation_id: {correlation_id}"));
2077        }
2078        if let Some(detail) = submission.detail.as_ref() {
2079            detail_lines.push(String::new());
2080            detail_lines.push(detail.clone());
2081        }
2082        if !submission.excerpt.is_empty() {
2083            if !detail_lines.is_empty() {
2084                detail_lines.push(String::new());
2085            }
2086            detail_lines.push("excerpt:".to_string());
2087            detail_lines.extend(submission.excerpt.iter().map(|line| format!("  {line}")));
2088        }
2089        let detail = if detail_lines.is_empty() {
2090            None
2091        } else {
2092            Some(detail_lines.join("\n"))
2093        };
2094
2095        let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2096            compute_fingerprint(&[
2097                repo.as_str(),
2098                title.as_str(),
2099                detail.as_deref().unwrap_or(""),
2100                submission.source.as_deref().unwrap_or(""),
2101                submission.run_id.as_deref().unwrap_or(""),
2102                submission.session_id.as_deref().unwrap_or(""),
2103                submission.correlation_id.as_deref().unwrap_or(""),
2104            ])
2105        });
2106
2107        let mut drafts = self.bug_monitor_drafts.write().await;
2108        if let Some(existing) = drafts
2109            .values()
2110            .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2111            .cloned()
2112        {
2113            return Ok(existing);
2114        }
2115
2116        let draft = BugMonitorDraftRecord {
2117            draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2118            fingerprint,
2119            repo,
2120            status: if config.require_approval_for_new_issues {
2121                "approval_required".to_string()
2122            } else {
2123                "draft_ready".to_string()
2124            },
2125            created_at_ms: now_ms(),
2126            triage_run_id: None,
2127            issue_number: None,
2128            title: Some(title),
2129            detail,
2130            github_status: None,
2131            github_issue_url: None,
2132            github_comment_url: None,
2133            github_posted_at_ms: None,
2134            matched_issue_number: None,
2135            matched_issue_state: None,
2136            evidence_digest: None,
2137            last_post_error: None,
2138        };
2139        drafts.insert(draft.draft_id.clone(), draft.clone());
2140        drop(drafts);
2141        self.persist_bug_monitor_drafts().await?;
2142        Ok(draft)
2143    }
2144
2145    pub async fn update_bug_monitor_draft_status(
2146        &self,
2147        draft_id: &str,
2148        next_status: &str,
2149        reason: Option<&str>,
2150    ) -> anyhow::Result<BugMonitorDraftRecord> {
2151        let normalized_status = next_status.trim().to_ascii_lowercase();
2152        if normalized_status != "draft_ready" && normalized_status != "denied" {
2153            anyhow::bail!("unsupported Bug Monitor draft status");
2154        }
2155
2156        let mut drafts = self.bug_monitor_drafts.write().await;
2157        let Some(draft) = drafts.get_mut(draft_id) else {
2158            anyhow::bail!("Bug Monitor draft not found");
2159        };
2160        if !draft.status.eq_ignore_ascii_case("approval_required") {
2161            anyhow::bail!("Bug Monitor draft is not waiting for approval");
2162        }
2163        draft.status = normalized_status.clone();
2164        if let Some(reason) = reason
2165            .map(|value| value.trim())
2166            .filter(|value| !value.is_empty())
2167        {
2168            let next_detail = if let Some(detail) = draft.detail.as_ref() {
2169                format!("{detail}\n\noperator_note: {reason}")
2170            } else {
2171                format!("operator_note: {reason}")
2172            };
2173            draft.detail = Some(next_detail);
2174        }
2175        let updated = draft.clone();
2176        drop(drafts);
2177        self.persist_bug_monitor_drafts().await?;
2178
2179        let event_name = if normalized_status == "draft_ready" {
2180            "bug_monitor.draft.approved"
2181        } else {
2182            "bug_monitor.draft.denied"
2183        };
2184        self.event_bus.publish(EngineEvent::new(
2185            event_name,
2186            serde_json::json!({
2187                "draft_id": updated.draft_id,
2188                "repo": updated.repo,
2189                "status": updated.status,
2190                "reason": reason,
2191            }),
2192        ));
2193        Ok(updated)
2194    }
2195
2196    pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2197        let required_capabilities = vec![
2198            "github.list_issues".to_string(),
2199            "github.get_issue".to_string(),
2200            "github.create_issue".to_string(),
2201            "github.comment_on_issue".to_string(),
2202        ];
2203        let config = self.bug_monitor_config().await;
2204        let drafts = self.bug_monitor_drafts.read().await;
2205        let incidents = self.bug_monitor_incidents.read().await;
2206        let posts = self.bug_monitor_posts.read().await;
2207        let total_incidents = incidents.len();
2208        let pending_incidents = incidents
2209            .values()
2210            .filter(|row| {
2211                matches!(
2212                    row.status.as_str(),
2213                    "queued"
2214                        | "draft_created"
2215                        | "triage_queued"
2216                        | "analysis_queued"
2217                        | "triage_pending"
2218                        | "issue_draft_pending"
2219                )
2220            })
2221            .count();
2222        let pending_drafts = drafts
2223            .values()
2224            .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2225            .count();
2226        let pending_posts = posts
2227            .values()
2228            .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2229            .count();
2230        let last_activity_at_ms = drafts
2231            .values()
2232            .map(|row| row.created_at_ms)
2233            .chain(posts.values().map(|row| row.updated_at_ms))
2234            .max();
2235        drop(drafts);
2236        drop(incidents);
2237        drop(posts);
2238        let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2239        runtime.paused = config.paused;
2240        runtime.total_incidents = total_incidents;
2241        runtime.pending_incidents = pending_incidents;
2242        runtime.pending_posts = pending_posts;
2243
2244        let mut status = BugMonitorStatus {
2245            config: config.clone(),
2246            runtime,
2247            pending_drafts,
2248            pending_posts,
2249            last_activity_at_ms,
2250            ..BugMonitorStatus::default()
2251        };
2252        let repo_valid = config
2253            .repo
2254            .as_ref()
2255            .map(|repo| is_valid_owner_repo_slug(repo))
2256            .unwrap_or(false);
2257        let servers = self.mcp.list().await;
2258        let selected_server = config
2259            .mcp_server
2260            .as_ref()
2261            .and_then(|name| servers.get(name))
2262            .cloned();
2263        let provider_catalog = self.providers.list().await;
2264        let selected_model = config
2265            .model_policy
2266            .as_ref()
2267            .and_then(|policy| policy.get("default_model"))
2268            .and_then(crate::app::routines::parse_model_spec);
2269        let selected_model_ready = selected_model
2270            .as_ref()
2271            .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2272            .unwrap_or(false);
2273        let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2274            self.mcp.server_tools(server_name).await
2275        } else {
2276            Vec::new()
2277        };
2278        let discovered_tools = self
2279            .capability_resolver
2280            .discover_from_runtime(selected_server_tools, Vec::new())
2281            .await;
2282        status.discovered_mcp_tools = discovered_tools
2283            .iter()
2284            .map(|row| row.tool_name.clone())
2285            .collect();
2286        let discovered_providers = discovered_tools
2287            .iter()
2288            .map(|row| row.provider.to_ascii_lowercase())
2289            .collect::<std::collections::HashSet<_>>();
2290        let provider_preference = match config.provider_preference {
2291            BugMonitorProviderPreference::OfficialGithub => {
2292                vec![
2293                    "mcp".to_string(),
2294                    "composio".to_string(),
2295                    "arcade".to_string(),
2296                ]
2297            }
2298            BugMonitorProviderPreference::Composio => {
2299                vec![
2300                    "composio".to_string(),
2301                    "mcp".to_string(),
2302                    "arcade".to_string(),
2303                ]
2304            }
2305            BugMonitorProviderPreference::Arcade => {
2306                vec![
2307                    "arcade".to_string(),
2308                    "mcp".to_string(),
2309                    "composio".to_string(),
2310                ]
2311            }
2312            BugMonitorProviderPreference::Auto => {
2313                vec![
2314                    "mcp".to_string(),
2315                    "composio".to_string(),
2316                    "arcade".to_string(),
2317                ]
2318            }
2319        };
2320        let capability_resolution = self
2321            .capability_resolver
2322            .resolve(
2323                crate::capability_resolver::CapabilityResolveInput {
2324                    workflow_id: Some("bug_monitor".to_string()),
2325                    required_capabilities: required_capabilities.clone(),
2326                    optional_capabilities: Vec::new(),
2327                    provider_preference,
2328                    available_tools: discovered_tools,
2329                },
2330                Vec::new(),
2331            )
2332            .await
2333            .ok();
2334        let bindings_file = self.capability_resolver.list_bindings().await.ok();
2335        if let Some(bindings) = bindings_file.as_ref() {
2336            status.binding_source_version = bindings.builtin_version.clone();
2337            status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2338            status.selected_server_binding_candidates = bindings
2339                .bindings
2340                .iter()
2341                .filter(|binding| required_capabilities.contains(&binding.capability_id))
2342                .filter(|binding| {
2343                    discovered_providers.is_empty()
2344                        || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2345                })
2346                .map(|binding| {
2347                    let binding_key = format!(
2348                        "{}::{}",
2349                        binding.capability_id,
2350                        binding.tool_name.to_ascii_lowercase()
2351                    );
2352                    let matched = capability_resolution
2353                        .as_ref()
2354                        .map(|resolution| {
2355                            resolution.resolved.iter().any(|row| {
2356                                row.capability_id == binding.capability_id
2357                                    && format!(
2358                                        "{}::{}",
2359                                        row.capability_id,
2360                                        row.tool_name.to_ascii_lowercase()
2361                                    ) == binding_key
2362                            })
2363                        })
2364                        .unwrap_or(false);
2365                    BugMonitorBindingCandidate {
2366                        capability_id: binding.capability_id.clone(),
2367                        binding_tool_name: binding.tool_name.clone(),
2368                        aliases: binding.tool_name_aliases.clone(),
2369                        matched,
2370                    }
2371                })
2372                .collect();
2373            status.selected_server_binding_candidates.sort_by(|a, b| {
2374                a.capability_id
2375                    .cmp(&b.capability_id)
2376                    .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2377            });
2378        }
2379        let capability_ready = |capability_id: &str| -> bool {
2380            capability_resolution
2381                .as_ref()
2382                .map(|resolved| {
2383                    resolved
2384                        .resolved
2385                        .iter()
2386                        .any(|row| row.capability_id == capability_id)
2387                })
2388                .unwrap_or(false)
2389        };
2390        if let Some(resolution) = capability_resolution.as_ref() {
2391            status.missing_required_capabilities = resolution.missing_required.clone();
2392            status.resolved_capabilities = resolution
2393                .resolved
2394                .iter()
2395                .map(|row| BugMonitorCapabilityMatch {
2396                    capability_id: row.capability_id.clone(),
2397                    provider: row.provider.clone(),
2398                    tool_name: row.tool_name.clone(),
2399                    binding_index: row.binding_index,
2400                })
2401                .collect();
2402        } else {
2403            status.missing_required_capabilities = required_capabilities.clone();
2404        }
2405        status.required_capabilities = BugMonitorCapabilityReadiness {
2406            github_list_issues: capability_ready("github.list_issues"),
2407            github_get_issue: capability_ready("github.get_issue"),
2408            github_create_issue: capability_ready("github.create_issue"),
2409            github_comment_on_issue: capability_ready("github.comment_on_issue"),
2410        };
2411        status.selected_model = selected_model;
2412        status.readiness = BugMonitorReadiness {
2413            config_valid: repo_valid
2414                && selected_server.is_some()
2415                && status.required_capabilities.github_list_issues
2416                && status.required_capabilities.github_get_issue
2417                && status.required_capabilities.github_create_issue
2418                && status.required_capabilities.github_comment_on_issue
2419                && selected_model_ready,
2420            repo_valid,
2421            mcp_server_present: selected_server.is_some(),
2422            mcp_connected: selected_server
2423                .as_ref()
2424                .map(|row| row.connected)
2425                .unwrap_or(false),
2426            github_read_ready: status.required_capabilities.github_list_issues
2427                && status.required_capabilities.github_get_issue,
2428            github_write_ready: status.required_capabilities.github_create_issue
2429                && status.required_capabilities.github_comment_on_issue,
2430            selected_model_ready,
2431            ingest_ready: config.enabled && !config.paused && repo_valid,
2432            publish_ready: config.enabled
2433                && !config.paused
2434                && repo_valid
2435                && selected_server
2436                    .as_ref()
2437                    .map(|row| row.connected)
2438                    .unwrap_or(false)
2439                && status.required_capabilities.github_list_issues
2440                && status.required_capabilities.github_get_issue
2441                && status.required_capabilities.github_create_issue
2442                && status.required_capabilities.github_comment_on_issue
2443                && selected_model_ready,
2444            runtime_ready: config.enabled
2445                && !config.paused
2446                && repo_valid
2447                && selected_server
2448                    .as_ref()
2449                    .map(|row| row.connected)
2450                    .unwrap_or(false)
2451                && status.required_capabilities.github_list_issues
2452                && status.required_capabilities.github_get_issue
2453                && status.required_capabilities.github_create_issue
2454                && status.required_capabilities.github_comment_on_issue
2455                && selected_model_ready,
2456        };
2457        if config.enabled {
2458            if config.paused {
2459                status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2460            } else if !repo_valid {
2461                status.last_error = Some("Target repo is missing or invalid.".to_string());
2462            } else if selected_server.is_none() {
2463                status.last_error = Some("Selected MCP server is missing.".to_string());
2464            } else if !status.readiness.mcp_connected {
2465                status.last_error = Some("Selected MCP server is disconnected.".to_string());
2466            } else if !selected_model_ready {
2467                status.last_error = Some(
2468                    "Selected provider/model is unavailable. Bug monitor is fail-closed."
2469                        .to_string(),
2470                );
2471            } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2472                let missing = if status.missing_required_capabilities.is_empty() {
2473                    "unknown".to_string()
2474                } else {
2475                    status.missing_required_capabilities.join(", ")
2476                };
2477                status.last_error = Some(format!(
2478                    "Selected MCP server is missing required GitHub capabilities: {missing}"
2479                ));
2480            }
2481        }
2482        status.runtime.monitoring_active = status.readiness.ingest_ready;
2483        status
2484    }
2485
2486    pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2487        if !self.workflow_runs_path.exists() {
2488            return Ok(());
2489        }
2490        let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2491        let parsed =
2492            serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2493                .unwrap_or_default();
2494        *self.workflow_runs.write().await = parsed;
2495        Ok(())
2496    }
2497
2498    pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2499        if let Some(parent) = self.workflow_runs_path.parent() {
2500            fs::create_dir_all(parent).await?;
2501        }
2502        let payload = {
2503            let guard = self.workflow_runs.read().await;
2504            serde_json::to_string_pretty(&*guard)?
2505        };
2506        fs::write(&self.workflow_runs_path, payload).await?;
2507        Ok(())
2508    }
2509
2510    pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2511        if !self.workflow_hook_overrides_path.exists() {
2512            return Ok(());
2513        }
2514        let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2515        let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2516            .unwrap_or_default();
2517        *self.workflow_hook_overrides.write().await = parsed;
2518        Ok(())
2519    }
2520
2521    pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2522        if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2523            fs::create_dir_all(parent).await?;
2524        }
2525        let payload = {
2526            let guard = self.workflow_hook_overrides.read().await;
2527            serde_json::to_string_pretty(&*guard)?
2528        };
2529        fs::write(&self.workflow_hook_overrides_path, payload).await?;
2530        Ok(())
2531    }
2532
2533    pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2534        let mut sources = Vec::new();
2535        sources.push(WorkflowLoadSource {
2536            root: config::paths::resolve_builtin_workflows_dir(),
2537            kind: WorkflowSourceKind::BuiltIn,
2538            pack_id: None,
2539        });
2540
2541        let workspace_root = self.workspace_index.snapshot().await.root;
2542        sources.push(WorkflowLoadSource {
2543            root: PathBuf::from(workspace_root).join(".tandem"),
2544            kind: WorkflowSourceKind::Workspace,
2545            pack_id: None,
2546        });
2547
2548        if let Ok(packs) = self.pack_manager.list().await {
2549            for pack in packs {
2550                sources.push(WorkflowLoadSource {
2551                    root: PathBuf::from(pack.install_path),
2552                    kind: WorkflowSourceKind::Pack,
2553                    pack_id: Some(pack.pack_id),
2554                });
2555            }
2556        }
2557
2558        let mut registry = load_workflow_registry(&sources)?;
2559        let overrides = self.workflow_hook_overrides.read().await.clone();
2560        for hook in &mut registry.hooks {
2561            if let Some(enabled) = overrides.get(&hook.binding_id) {
2562                hook.enabled = *enabled;
2563            }
2564        }
2565        for workflow in registry.workflows.values_mut() {
2566            workflow.hooks = registry
2567                .hooks
2568                .iter()
2569                .filter(|hook| hook.workflow_id == workflow.workflow_id)
2570                .cloned()
2571                .collect();
2572        }
2573        let messages = validate_workflow_registry(&registry);
2574        *self.workflows.write().await = registry;
2575        Ok(messages)
2576    }
2577
2578    pub async fn workflow_registry(&self) -> WorkflowRegistry {
2579        self.workflows.read().await.clone()
2580    }
2581
2582    pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2583        let mut rows = self
2584            .workflows
2585            .read()
2586            .await
2587            .workflows
2588            .values()
2589            .cloned()
2590            .collect::<Vec<_>>();
2591        rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2592        rows
2593    }
2594
2595    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2596        self.workflows
2597            .read()
2598            .await
2599            .workflows
2600            .get(workflow_id)
2601            .cloned()
2602    }
2603
2604    pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2605        let mut rows = self
2606            .workflows
2607            .read()
2608            .await
2609            .hooks
2610            .iter()
2611            .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2612            .cloned()
2613            .collect::<Vec<_>>();
2614        rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2615        rows
2616    }
2617
2618    pub async fn set_workflow_hook_enabled(
2619        &self,
2620        binding_id: &str,
2621        enabled: bool,
2622    ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2623        self.workflow_hook_overrides
2624            .write()
2625            .await
2626            .insert(binding_id.to_string(), enabled);
2627        self.persist_workflow_hook_overrides().await?;
2628        let _ = self.reload_workflows().await?;
2629        Ok(self
2630            .workflows
2631            .read()
2632            .await
2633            .hooks
2634            .iter()
2635            .find(|hook| hook.binding_id == binding_id)
2636            .cloned())
2637    }
2638
2639    pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2640        self.workflow_runs
2641            .write()
2642            .await
2643            .insert(run.run_id.clone(), run);
2644        self.persist_workflow_runs().await
2645    }
2646
2647    pub async fn update_workflow_run(
2648        &self,
2649        run_id: &str,
2650        update: impl FnOnce(&mut WorkflowRunRecord),
2651    ) -> Option<WorkflowRunRecord> {
2652        let mut guard = self.workflow_runs.write().await;
2653        let row = guard.get_mut(run_id)?;
2654        update(row);
2655        row.updated_at_ms = now_ms();
2656        if matches!(
2657            row.status,
2658            WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2659        ) {
2660            row.finished_at_ms.get_or_insert_with(now_ms);
2661        }
2662        let out = row.clone();
2663        drop(guard);
2664        let _ = self.persist_workflow_runs().await;
2665        Some(out)
2666    }
2667
2668    pub async fn list_workflow_runs(
2669        &self,
2670        workflow_id: Option<&str>,
2671        limit: usize,
2672    ) -> Vec<WorkflowRunRecord> {
2673        let mut rows = self
2674            .workflow_runs
2675            .read()
2676            .await
2677            .values()
2678            .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2679            .cloned()
2680            .collect::<Vec<_>>();
2681        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2682        rows.truncate(limit.clamp(1, 500));
2683        rows
2684    }
2685
2686    pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2687        self.workflow_runs.read().await.get(run_id).cloned()
2688    }
2689
2690    pub async fn put_automation_v2(
2691        &self,
2692        mut automation: AutomationV2Spec,
2693    ) -> anyhow::Result<AutomationV2Spec> {
2694        if automation.automation_id.trim().is_empty() {
2695            anyhow::bail!("automation_id is required");
2696        }
2697        for agent in &mut automation.agents {
2698            if agent.display_name.trim().is_empty() {
2699                agent.display_name = auto_generated_agent_name(&agent.agent_id);
2700            }
2701            agent.tool_policy.allowlist =
2702                config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2703            agent.tool_policy.denylist =
2704                config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2705            agent.mcp_policy.allowed_servers =
2706                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2707            agent.mcp_policy.allowed_tools = agent
2708                .mcp_policy
2709                .allowed_tools
2710                .take()
2711                .map(normalize_allowed_tools);
2712        }
2713        let now = now_ms();
2714        if automation.created_at_ms == 0 {
2715            automation.created_at_ms = now;
2716        }
2717        automation.updated_at_ms = now;
2718        if automation.next_fire_at_ms.is_none() {
2719            automation.next_fire_at_ms =
2720                automation_schedule_next_fire_at_ms(&automation.schedule, now);
2721        }
2722        migrate_bundled_studio_research_split_automation(&mut automation);
2723        let _guard = self.automations_v2_persistence.lock().await;
2724        self.automations_v2
2725            .write()
2726            .await
2727            .insert(automation.automation_id.clone(), automation.clone());
2728        self.persist_automations_v2_locked().await?;
2729        self.verify_automation_v2_persisted_locked(&automation.automation_id, true)
2730            .await?;
2731        Ok(automation)
2732    }
2733
2734    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2735        self.automations_v2.read().await.get(automation_id).cloned()
2736    }
2737
2738    pub fn automation_v2_runtime_context(
2739        &self,
2740        run: &AutomationV2RunRecord,
2741    ) -> Option<AutomationRuntimeContextMaterialization> {
2742        run.runtime_context.clone().or_else(|| {
2743            run.automation_snapshot.as_ref().and_then(|automation| {
2744                automation
2745                    .runtime_context_materialization()
2746                    .or_else(|| automation.approved_plan_runtime_context_materialization())
2747            })
2748        })
2749    }
2750
2751    fn merge_automation_runtime_context_materializations(
2752        base: Option<AutomationRuntimeContextMaterialization>,
2753        extra: Option<AutomationRuntimeContextMaterialization>,
2754    ) -> Option<AutomationRuntimeContextMaterialization> {
2755        let mut partitions = std::collections::BTreeMap::<
2756            String,
2757            tandem_plan_compiler::api::ProjectedRoutineContextPartition,
2758        >::new();
2759        let mut merge_partition =
2760            |partition: tandem_plan_compiler::api::ProjectedRoutineContextPartition| {
2761                let entry = partitions
2762                    .entry(partition.routine_id.clone())
2763                    .or_insert_with(|| {
2764                        tandem_plan_compiler::api::ProjectedRoutineContextPartition {
2765                            routine_id: partition.routine_id.clone(),
2766                            visible_context_objects: Vec::new(),
2767                            step_context_bindings: Vec::new(),
2768                        }
2769                    });
2770
2771                let mut seen_context_object_ids = entry
2772                    .visible_context_objects
2773                    .iter()
2774                    .map(|context_object| context_object.context_object_id.clone())
2775                    .collect::<std::collections::HashSet<_>>();
2776                for context_object in partition.visible_context_objects {
2777                    if seen_context_object_ids.insert(context_object.context_object_id.clone()) {
2778                        entry.visible_context_objects.push(context_object);
2779                    }
2780                }
2781                entry
2782                    .visible_context_objects
2783                    .sort_by(|left, right| left.context_object_id.cmp(&right.context_object_id));
2784
2785                let mut seen_step_ids = entry
2786                    .step_context_bindings
2787                    .iter()
2788                    .map(|binding| binding.step_id.clone())
2789                    .collect::<std::collections::HashSet<_>>();
2790                for binding in partition.step_context_bindings {
2791                    if seen_step_ids.insert(binding.step_id.clone()) {
2792                        entry.step_context_bindings.push(binding);
2793                    }
2794                }
2795                entry
2796                    .step_context_bindings
2797                    .sort_by(|left, right| left.step_id.cmp(&right.step_id));
2798            };
2799
2800        if let Some(base) = base {
2801            for partition in base.routines {
2802                merge_partition(partition);
2803            }
2804        }
2805        if let Some(extra) = extra {
2806            for partition in extra.routines {
2807                merge_partition(partition);
2808            }
2809        }
2810        if partitions.is_empty() {
2811            None
2812        } else {
2813            Some(AutomationRuntimeContextMaterialization {
2814                routines: partitions.into_values().collect(),
2815            })
2816        }
2817    }
2818
2819    async fn automation_v2_shared_context_runtime_context(
2820        &self,
2821        automation: &AutomationV2Spec,
2822    ) -> anyhow::Result<Option<AutomationRuntimeContextMaterialization>> {
2823        let pack_ids = crate::http::context_packs::shared_context_pack_ids_from_metadata(
2824            automation.metadata.as_ref(),
2825        );
2826        if pack_ids.is_empty() {
2827            return Ok(None);
2828        }
2829
2830        let mut contexts = Vec::new();
2831        for pack_id in pack_ids {
2832            let Some(pack) = self.get_context_pack(&pack_id).await else {
2833                anyhow::bail!("shared workflow context not found: {pack_id}");
2834            };
2835            if pack.state != crate::http::context_packs::ContextPackState::Published {
2836                anyhow::bail!("shared workflow context is not published: {pack_id}");
2837            }
2838            let pack_context = pack
2839                .manifest
2840                .runtime_context
2841                .clone()
2842                .and_then(|value| {
2843                    serde_json::from_value::<AutomationRuntimeContextMaterialization>(value).ok()
2844                })
2845                .or_else(|| {
2846                    pack.manifest
2847                        .plan_package
2848                        .as_ref()
2849                        .and_then(|value| {
2850                            serde_json::from_value::<tandem_plan_compiler::api::PlanPackage>(
2851                                value.clone(),
2852                            )
2853                            .ok()
2854                        })
2855                        .map(|plan_package| {
2856                            tandem_plan_compiler::api::project_plan_context_materialization(
2857                                &plan_package,
2858                            )
2859                        })
2860                });
2861            let Some(pack_context) = pack_context else {
2862                anyhow::bail!("shared workflow context lacks runtime context: {pack_id}");
2863            };
2864            contexts.push(pack_context);
2865        }
2866
2867        let mut merged: Option<AutomationRuntimeContextMaterialization> = None;
2868        for context in contexts {
2869            merged = Self::merge_automation_runtime_context_materializations(merged, Some(context));
2870        }
2871        Ok(merged)
2872    }
2873
2874    async fn automation_v2_effective_runtime_context(
2875        &self,
2876        automation: &AutomationV2Spec,
2877        base_runtime_context: Option<AutomationRuntimeContextMaterialization>,
2878    ) -> anyhow::Result<Option<AutomationRuntimeContextMaterialization>> {
2879        let shared_context = self
2880            .automation_v2_shared_context_runtime_context(automation)
2881            .await?;
2882        Ok(Self::merge_automation_runtime_context_materializations(
2883            base_runtime_context,
2884            shared_context,
2885        ))
2886    }
2887
2888    pub(crate) fn automation_v2_approved_plan_materialization(
2889        &self,
2890        run: &AutomationV2RunRecord,
2891    ) -> Option<tandem_plan_compiler::api::ApprovedPlanMaterialization> {
2892        run.automation_snapshot
2893            .as_ref()
2894            .and_then(AutomationV2Spec::approved_plan_materialization)
2895    }
2896
2897    pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2898        self.workflow_plans
2899            .write()
2900            .await
2901            .insert(plan.plan_id.clone(), plan);
2902    }
2903
2904    pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2905        self.workflow_plans.read().await.get(plan_id).cloned()
2906    }
2907
2908    pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2909        self.workflow_plan_drafts
2910            .write()
2911            .await
2912            .insert(draft.current_plan.plan_id.clone(), draft.clone());
2913        self.put_workflow_plan(draft.current_plan).await;
2914    }
2915
2916    pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2917        self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2918    }
2919
2920    pub async fn load_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2921        if !self.workflow_planner_sessions_path.exists() {
2922            return Ok(());
2923        }
2924        let raw = fs::read_to_string(&self.workflow_planner_sessions_path).await?;
2925        let parsed = serde_json::from_str::<
2926            std::collections::HashMap<
2927                String,
2928                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2929            >,
2930        >(&raw)
2931        .unwrap_or_default();
2932        self.replace_workflow_planner_sessions(parsed).await?;
2933        Ok(())
2934    }
2935
2936    pub async fn persist_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2937        if let Some(parent) = self.workflow_planner_sessions_path.parent() {
2938            fs::create_dir_all(parent).await?;
2939        }
2940        let payload = {
2941            let guard = self.workflow_planner_sessions.read().await;
2942            serde_json::to_string_pretty(&*guard)?
2943        };
2944        fs::write(&self.workflow_planner_sessions_path, payload).await?;
2945        Ok(())
2946    }
2947
2948    async fn replace_workflow_planner_sessions(
2949        &self,
2950        sessions: std::collections::HashMap<
2951            String,
2952            crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2953        >,
2954    ) -> anyhow::Result<()> {
2955        {
2956            let mut sessions_guard = self.workflow_planner_sessions.write().await;
2957            *sessions_guard = sessions.clone();
2958        }
2959        {
2960            let mut plans = self.workflow_plans.write().await;
2961            let mut drafts = self.workflow_plan_drafts.write().await;
2962            plans.clear();
2963            drafts.clear();
2964            for session in sessions.values() {
2965                if let Some(draft) = session.draft.as_ref() {
2966                    plans.insert(
2967                        draft.current_plan.plan_id.clone(),
2968                        draft.current_plan.clone(),
2969                    );
2970                    drafts.insert(draft.current_plan.plan_id.clone(), draft.clone());
2971                }
2972            }
2973        }
2974        Ok(())
2975    }
2976
2977    async fn sync_workflow_planner_session_cache(
2978        &self,
2979        session: &crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2980    ) {
2981        if let Some(draft) = session.draft.as_ref() {
2982            self.workflow_plans.write().await.insert(
2983                draft.current_plan.plan_id.clone(),
2984                draft.current_plan.clone(),
2985            );
2986            self.workflow_plan_drafts
2987                .write()
2988                .await
2989                .insert(draft.current_plan.plan_id.clone(), draft.clone());
2990        }
2991    }
2992
2993    pub async fn put_workflow_planner_session(
2994        &self,
2995        mut session: crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2996    ) -> anyhow::Result<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2997        if session.session_id.trim().is_empty() {
2998            anyhow::bail!("session_id is required");
2999        }
3000        if session.project_slug.trim().is_empty() {
3001            anyhow::bail!("project_slug is required");
3002        }
3003        let now = now_ms();
3004        if session.created_at_ms == 0 {
3005            session.created_at_ms = now;
3006        }
3007        session.updated_at_ms = now;
3008        {
3009            self.workflow_planner_sessions
3010                .write()
3011                .await
3012                .insert(session.session_id.clone(), session.clone());
3013        }
3014        self.sync_workflow_planner_session_cache(&session).await;
3015        self.persist_workflow_planner_sessions().await?;
3016        Ok(session)
3017    }
3018
3019    pub async fn get_workflow_planner_session(
3020        &self,
3021        session_id: &str,
3022    ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
3023        self.workflow_planner_sessions
3024            .read()
3025            .await
3026            .get(session_id)
3027            .cloned()
3028    }
3029
3030    pub async fn list_workflow_planner_sessions(
3031        &self,
3032        project_slug: Option<&str>,
3033    ) -> Vec<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
3034        let mut rows = self
3035            .workflow_planner_sessions
3036            .read()
3037            .await
3038            .values()
3039            .filter(|session| {
3040                project_slug
3041                    .map(|slug| session.project_slug == slug)
3042                    .unwrap_or(true)
3043            })
3044            .cloned()
3045            .collect::<Vec<_>>();
3046        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3047        rows
3048    }
3049
3050    pub async fn delete_workflow_planner_session(
3051        &self,
3052        session_id: &str,
3053    ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
3054        let removed = self
3055            .workflow_planner_sessions
3056            .write()
3057            .await
3058            .remove(session_id);
3059        if let Some(session) = removed.as_ref() {
3060            if let Some(draft) = session.draft.as_ref() {
3061                self.workflow_plan_drafts
3062                    .write()
3063                    .await
3064                    .remove(&draft.current_plan.plan_id);
3065                self.workflow_plans
3066                    .write()
3067                    .await
3068                    .remove(&draft.current_plan.plan_id);
3069            }
3070        }
3071        let _ = self.persist_workflow_planner_sessions().await;
3072        removed
3073    }
3074
3075    pub async fn load_workflow_learning_candidates(&self) -> anyhow::Result<()> {
3076        if !self.workflow_learning_candidates_path.exists() {
3077            return Ok(());
3078        }
3079        let raw = fs::read_to_string(&self.workflow_learning_candidates_path).await?;
3080        let parsed = serde_json::from_str::<
3081            std::collections::HashMap<String, WorkflowLearningCandidate>,
3082        >(&raw)
3083        .unwrap_or_default();
3084        *self.workflow_learning_candidates.write().await = parsed;
3085        Ok(())
3086    }
3087
3088    pub async fn persist_workflow_learning_candidates(&self) -> anyhow::Result<()> {
3089        if let Some(parent) = self.workflow_learning_candidates_path.parent() {
3090            fs::create_dir_all(parent).await?;
3091        }
3092        let payload = {
3093            let guard = self.workflow_learning_candidates.read().await;
3094            serde_json::to_string_pretty(&*guard)?
3095        };
3096        fs::write(&self.workflow_learning_candidates_path, payload).await?;
3097        Ok(())
3098    }
3099
3100    pub async fn get_workflow_learning_candidate(
3101        &self,
3102        candidate_id: &str,
3103    ) -> Option<WorkflowLearningCandidate> {
3104        self.workflow_learning_candidates
3105            .read()
3106            .await
3107            .get(candidate_id)
3108            .cloned()
3109    }
3110
3111    pub async fn list_workflow_learning_candidates(
3112        &self,
3113        workflow_id: Option<&str>,
3114        status: Option<WorkflowLearningCandidateStatus>,
3115        kind: Option<WorkflowLearningCandidateKind>,
3116    ) -> Vec<WorkflowLearningCandidate> {
3117        let mut rows = self
3118            .workflow_learning_candidates
3119            .read()
3120            .await
3121            .values()
3122            .filter(|candidate| {
3123                workflow_id
3124                    .map(|value| candidate.workflow_id == value)
3125                    .unwrap_or(true)
3126                    && status
3127                        .map(|value| candidate.status == value)
3128                        .unwrap_or(true)
3129                    && kind.map(|value| candidate.kind == value).unwrap_or(true)
3130            })
3131            .cloned()
3132            .collect::<Vec<_>>();
3133        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3134        rows
3135    }
3136
3137    pub async fn put_workflow_learning_candidate(
3138        &self,
3139        mut candidate: WorkflowLearningCandidate,
3140    ) -> anyhow::Result<WorkflowLearningCandidate> {
3141        if candidate.candidate_id.trim().is_empty() {
3142            anyhow::bail!("candidate_id is required");
3143        }
3144        let now = now_ms();
3145        if candidate.created_at_ms == 0 {
3146            candidate.created_at_ms = now;
3147        }
3148        candidate.updated_at_ms = now;
3149        self.workflow_learning_candidates
3150            .write()
3151            .await
3152            .insert(candidate.candidate_id.clone(), candidate.clone());
3153        self.persist_workflow_learning_candidates().await?;
3154        Ok(candidate)
3155    }
3156
3157    pub async fn upsert_workflow_learning_candidate(
3158        &self,
3159        mut candidate: WorkflowLearningCandidate,
3160    ) -> anyhow::Result<WorkflowLearningCandidate> {
3161        let now = now_ms();
3162        if candidate.candidate_id.trim().is_empty() {
3163            candidate.candidate_id = format!("wflearn-{}", uuid::Uuid::new_v4());
3164        }
3165        if candidate.created_at_ms == 0 {
3166            candidate.created_at_ms = now;
3167        }
3168        candidate.updated_at_ms = now;
3169
3170        let stored = {
3171            let mut guard = self.workflow_learning_candidates.write().await;
3172            if let Some(existing) = guard.values_mut().find(|row| {
3173                row.workflow_id == candidate.workflow_id
3174                    && row.kind == candidate.kind
3175                    && row.fingerprint == candidate.fingerprint
3176            }) {
3177                existing.summary = candidate.summary.clone();
3178                existing.confidence = existing.confidence.max(candidate.confidence);
3179                existing.updated_at_ms = now;
3180                if existing.node_id.is_none() {
3181                    existing.node_id = candidate.node_id.clone();
3182                }
3183                if existing.node_kind.is_none() {
3184                    existing.node_kind = candidate.node_kind.clone();
3185                }
3186                if existing.validator_family.is_none() {
3187                    existing.validator_family = candidate.validator_family.clone();
3188                }
3189                if existing.proposed_memory_payload.is_none() {
3190                    existing.proposed_memory_payload = candidate.proposed_memory_payload.clone();
3191                }
3192                if existing.proposed_revision_prompt.is_none() {
3193                    existing.proposed_revision_prompt = candidate.proposed_revision_prompt.clone();
3194                }
3195                if existing.source_memory_id.is_none() {
3196                    existing.source_memory_id = candidate.source_memory_id.clone();
3197                }
3198                if existing.promoted_memory_id.is_none() {
3199                    existing.promoted_memory_id = candidate.promoted_memory_id.clone();
3200                }
3201                if existing.baseline_before.is_none() {
3202                    existing.baseline_before = candidate.baseline_before.clone();
3203                }
3204                if candidate.latest_observed_metrics.is_some() {
3205                    existing.latest_observed_metrics = candidate.latest_observed_metrics.clone();
3206                }
3207                if candidate.last_revision_session_id.is_some() {
3208                    existing.last_revision_session_id = candidate.last_revision_session_id.clone();
3209                }
3210                existing.needs_plan_bundle |= candidate.needs_plan_bundle;
3211                for artifact_ref in candidate.artifact_refs {
3212                    if !existing
3213                        .artifact_refs
3214                        .iter()
3215                        .any(|value| value == &artifact_ref)
3216                    {
3217                        existing.artifact_refs.push(artifact_ref);
3218                    }
3219                }
3220                for run_id in candidate.run_ids {
3221                    if !existing.run_ids.iter().any(|value| value == &run_id) {
3222                        existing.run_ids.push(run_id);
3223                    }
3224                }
3225                for evidence_ref in candidate.evidence_refs {
3226                    if !existing.evidence_refs.contains(&evidence_ref) {
3227                        existing.evidence_refs.push(evidence_ref);
3228                    }
3229                }
3230                existing.clone()
3231            } else {
3232                guard.insert(candidate.candidate_id.clone(), candidate.clone());
3233                candidate
3234            }
3235        };
3236        self.persist_workflow_learning_candidates().await?;
3237        Ok(stored)
3238    }
3239
3240    pub async fn update_workflow_learning_candidate(
3241        &self,
3242        candidate_id: &str,
3243        update: impl FnOnce(&mut WorkflowLearningCandidate),
3244    ) -> Option<WorkflowLearningCandidate> {
3245        let updated = {
3246            let mut guard = self.workflow_learning_candidates.write().await;
3247            let candidate = guard.get_mut(candidate_id)?;
3248            update(candidate);
3249            candidate.updated_at_ms = now_ms();
3250            candidate.clone()
3251        };
3252        let _ = self.persist_workflow_learning_candidates().await;
3253        Some(updated)
3254    }
3255
3256    pub async fn workflow_learning_metrics_for_workflow(
3257        &self,
3258        workflow_id: &str,
3259    ) -> WorkflowLearningMetricsSnapshot {
3260        let runs = self.list_automation_v2_runs(Some(workflow_id), 50).await;
3261        crate::app::state::automation::workflow_learning_metrics_snapshot(&runs)
3262    }
3263
3264    pub async fn workflow_learning_context_for_automation_node(
3265        &self,
3266        automation: &AutomationV2Spec,
3267        node: &AutomationFlowNode,
3268    ) -> (Vec<String>, Option<String>) {
3269        let project_id = crate::app::state::automation::workflow_learning_project_id(automation);
3270        let node_kind = node
3271            .stage_kind
3272            .as_ref()
3273            .map(|kind| format!("{kind:?}").to_ascii_lowercase());
3274        let validator_family = node
3275            .output_contract
3276            .as_ref()
3277            .and_then(|contract| contract.validator.as_ref())
3278            .map(|validator| format!("{validator:?}").to_ascii_lowercase());
3279        let candidates = self
3280            .workflow_learning_candidates
3281            .read()
3282            .await
3283            .values()
3284            .filter(|candidate| {
3285                matches!(
3286                    candidate.status,
3287                    WorkflowLearningCandidateStatus::Approved
3288                        | WorkflowLearningCandidateStatus::Applied
3289                )
3290            })
3291            .cloned()
3292            .collect::<Vec<_>>();
3293        let mut ordered = Vec::new();
3294        let mut push_unique = |candidate: WorkflowLearningCandidate| {
3295            if ordered.iter().any(|existing: &WorkflowLearningCandidate| {
3296                existing.candidate_id == candidate.candidate_id
3297            }) {
3298                return;
3299            }
3300            ordered.push(candidate);
3301        };
3302        for candidate in candidates
3303            .iter()
3304            .filter(|candidate| candidate.workflow_id == automation.automation_id)
3305        {
3306            push_unique(candidate.clone());
3307        }
3308        for candidate in candidates.iter().filter(|candidate| {
3309            candidate.workflow_id == automation.automation_id
3310                && (candidate.node_kind.as_deref() == node_kind.as_deref()
3311                    || candidate.validator_family.as_deref() == validator_family.as_deref())
3312        }) {
3313            push_unique(candidate.clone());
3314        }
3315        for candidate in candidates.iter().filter(|candidate| {
3316            candidate.project_id == project_id && candidate.workflow_id != automation.automation_id
3317        }) {
3318            push_unique(candidate.clone());
3319        }
3320        ordered.truncate(6);
3321        let candidate_ids = ordered
3322            .iter()
3323            .map(|candidate| candidate.candidate_id.clone())
3324            .collect::<Vec<_>>();
3325        let context =
3326            crate::app::state::automation::workflow_learning_context_for_candidates(&ordered);
3327        (candidate_ids, context)
3328    }
3329
3330    pub async fn record_automation_v2_run_learning_usage(
3331        &self,
3332        run_id: &str,
3333        candidate_ids: &[String],
3334    ) -> Option<AutomationV2RunRecord> {
3335        if candidate_ids.is_empty() {
3336            return self.get_automation_v2_run(run_id).await;
3337        }
3338        let updated = {
3339            let mut guard = self.automation_v2_runs.write().await;
3340            let run = guard.get_mut(run_id)?;
3341            let summary = run
3342                .learning_summary
3343                .get_or_insert_with(WorkflowLearningRunSummary::default);
3344            for candidate_id in candidate_ids {
3345                if !summary
3346                    .approved_learning_ids_considered
3347                    .iter()
3348                    .any(|value| value == candidate_id)
3349                {
3350                    summary
3351                        .approved_learning_ids_considered
3352                        .push(candidate_id.clone());
3353                }
3354                if !summary
3355                    .injected_learning_ids
3356                    .iter()
3357                    .any(|value| value == candidate_id)
3358                {
3359                    summary.injected_learning_ids.push(candidate_id.clone());
3360                }
3361            }
3362            run.updated_at_ms = now_ms();
3363            run.clone()
3364        };
3365        let _ = self.persist_automation_v2_runs().await;
3366        let _ = self.persist_automation_v2_run_status_json(&updated).await;
3367        Some(updated)
3368    }
3369
3370    async fn finalize_terminal_automation_v2_run_learning(
3371        &self,
3372        run: &AutomationV2RunRecord,
3373    ) -> anyhow::Result<()> {
3374        const WORKFLOW_LEARNING_POST_APPLY_MIN_SAMPLE_SIZE: usize = 3;
3375        let automation = if let Some(snapshot) = run.automation_snapshot.clone() {
3376            snapshot
3377        } else if let Some(current) = self.get_automation_v2(&run.automation_id).await {
3378            current
3379        } else {
3380            return Ok(());
3381        };
3382        let recent_runs = self
3383            .list_automation_v2_runs(Some(&run.automation_id), 50)
3384            .await;
3385        let metrics =
3386            crate::app::state::automation::workflow_learning_metrics_snapshot(&recent_runs);
3387        let existing_candidates = self
3388            .list_workflow_learning_candidates(Some(&run.automation_id), None, None)
3389            .await;
3390        let generated =
3391            crate::app::state::automation::workflow_learning_candidates_for_terminal_run(
3392                &automation,
3393                run,
3394                &recent_runs,
3395                &existing_candidates,
3396            );
3397        let mut generated_candidate_ids = Vec::new();
3398        for candidate in generated {
3399            let stored = self.upsert_workflow_learning_candidate(candidate).await?;
3400            generated_candidate_ids.push(stored.candidate_id);
3401        }
3402        let candidate_ids = self
3403            .list_workflow_learning_candidates(Some(&run.automation_id), None, None)
3404            .await
3405            .into_iter()
3406            .filter(|candidate| {
3407                matches!(
3408                    candidate.status,
3409                    WorkflowLearningCandidateStatus::Approved
3410                        | WorkflowLearningCandidateStatus::Applied
3411                ) && candidate.baseline_before.is_some()
3412            })
3413            .map(|candidate| candidate.candidate_id)
3414            .collect::<Vec<_>>();
3415        for candidate_id in candidate_ids {
3416            let _ = self
3417                .update_workflow_learning_candidate(&candidate_id, |candidate| {
3418                    candidate.latest_observed_metrics = Some(metrics.clone());
3419                    if candidate.status == WorkflowLearningCandidateStatus::Applied {
3420                        if let Some(baseline) = candidate.baseline_before.as_ref() {
3421                            let post_change_sample_size =
3422                                metrics.sample_size.saturating_sub(baseline.sample_size);
3423                            if post_change_sample_size
3424                                < WORKFLOW_LEARNING_POST_APPLY_MIN_SAMPLE_SIZE
3425                            {
3426                                return;
3427                            }
3428                            if metrics.completion_rate + f64::EPSILON < baseline.completion_rate
3429                                || metrics.validation_pass_rate + f64::EPSILON
3430                                    < baseline.validation_pass_rate
3431                            {
3432                                candidate.status = WorkflowLearningCandidateStatus::Regressed;
3433                            }
3434                        }
3435                    }
3436                })
3437                .await;
3438        }
3439        let updated_run = {
3440            let mut guard = self.automation_v2_runs.write().await;
3441            let Some(stored_run) = guard.get_mut(&run.run_id) else {
3442                return Ok(());
3443            };
3444            let summary = stored_run
3445                .learning_summary
3446                .get_or_insert_with(WorkflowLearningRunSummary::default);
3447            for candidate_id in generated_candidate_ids {
3448                if !summary
3449                    .generated_candidate_ids
3450                    .iter()
3451                    .any(|value| value == &candidate_id)
3452                {
3453                    summary.generated_candidate_ids.push(candidate_id);
3454                }
3455            }
3456            summary.post_run_metrics = Some(metrics);
3457            stored_run.clone()
3458        };
3459        self.persist_automation_v2_runs().await?;
3460        self.persist_automation_v2_run_status_json(&updated_run)
3461            .await?;
3462        Ok(())
3463    }
3464
3465    pub async fn load_context_packs(&self) -> anyhow::Result<()> {
3466        if !self.context_packs_path.exists() {
3467            return Ok(());
3468        }
3469        let raw = fs::read_to_string(&self.context_packs_path).await?;
3470        let parsed = serde_json::from_str::<
3471            std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>,
3472        >(&raw)
3473        .unwrap_or_default();
3474        {
3475            let mut guard = self.context_packs.write().await;
3476            *guard = parsed;
3477        }
3478        Ok(())
3479    }
3480
3481    pub async fn persist_context_packs(&self) -> anyhow::Result<()> {
3482        if let Some(parent) = self.context_packs_path.parent() {
3483            fs::create_dir_all(parent).await?;
3484        }
3485        let payload = {
3486            let guard = self.context_packs.read().await;
3487            serde_json::to_string_pretty(&*guard)?
3488        };
3489        fs::write(&self.context_packs_path, payload).await?;
3490        Ok(())
3491    }
3492
3493    pub(crate) async fn put_context_pack(
3494        &self,
3495        mut pack: crate::http::context_packs::ContextPackRecord,
3496    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3497        if pack.pack_id.trim().is_empty() {
3498            anyhow::bail!("pack_id is required");
3499        }
3500        if pack.title.trim().is_empty() {
3501            anyhow::bail!("title is required");
3502        }
3503        if pack.workspace_root.trim().is_empty() {
3504            anyhow::bail!("workspace_root is required");
3505        }
3506        let now = now_ms();
3507        if pack.created_at_ms == 0 {
3508            pack.created_at_ms = now;
3509        }
3510        pack.updated_at_ms = now;
3511        {
3512            self.context_packs
3513                .write()
3514                .await
3515                .insert(pack.pack_id.clone(), pack.clone());
3516        }
3517        self.persist_context_packs().await?;
3518        Ok(pack)
3519    }
3520
3521    pub(crate) async fn get_context_pack(
3522        &self,
3523        pack_id: &str,
3524    ) -> Option<crate::http::context_packs::ContextPackRecord> {
3525        self.context_packs.read().await.get(pack_id).cloned()
3526    }
3527
3528    pub(crate) async fn list_context_packs(
3529        &self,
3530        project_key: Option<&str>,
3531        workspace_root: Option<&str>,
3532    ) -> Vec<crate::http::context_packs::ContextPackRecord> {
3533        let mut rows = self
3534            .context_packs
3535            .read()
3536            .await
3537            .values()
3538            .filter(|pack| {
3539                let project_ok =
3540                    crate::http::context_packs::context_pack_allows_project(pack, project_key);
3541                let workspace_ok = workspace_root
3542                    .map(|root| pack.workspace_root == root)
3543                    .unwrap_or(true);
3544                project_ok && workspace_ok
3545            })
3546            .cloned()
3547            .collect::<Vec<_>>();
3548        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3549        rows
3550    }
3551
3552    pub(crate) async fn update_context_pack(
3553        &self,
3554        pack_id: &str,
3555        update: impl FnOnce(&mut crate::http::context_packs::ContextPackRecord) -> anyhow::Result<()>,
3556    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3557        let mut guard = self.context_packs.write().await;
3558        let Some(pack) = guard.get_mut(pack_id) else {
3559            anyhow::bail!("shared workflow context not found");
3560        };
3561        update(pack)?;
3562        pack.updated_at_ms = now_ms();
3563        let next = pack.clone();
3564        drop(guard);
3565        self.persist_context_packs().await?;
3566        Ok(next)
3567    }
3568
3569    pub(crate) async fn revoke_context_pack(
3570        &self,
3571        pack_id: &str,
3572        revoked_actor_metadata: Option<Value>,
3573    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3574        self.update_context_pack(pack_id, move |pack| {
3575            pack.state = crate::http::context_packs::ContextPackState::Revoked;
3576            pack.revoked_at_ms = Some(now_ms());
3577            pack.revoked_actor_metadata = revoked_actor_metadata;
3578            Ok(())
3579        })
3580        .await
3581    }
3582
3583    pub(crate) async fn supersede_context_pack(
3584        &self,
3585        pack_id: &str,
3586        superseded_by_pack_id: String,
3587        superseded_actor_metadata: Option<Value>,
3588    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3589        self.update_context_pack(pack_id, move |pack| {
3590            pack.state = crate::http::context_packs::ContextPackState::Superseded;
3591            pack.superseded_by_pack_id = Some(superseded_by_pack_id);
3592            pack.superseded_at_ms = Some(now_ms());
3593            pack.superseded_actor_metadata = superseded_actor_metadata;
3594            Ok(())
3595        })
3596        .await
3597    }
3598
3599    pub(crate) async fn bind_context_pack(
3600        &self,
3601        pack_id: &str,
3602        binding: crate::http::context_packs::ContextPackBindingRecord,
3603    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3604        self.update_context_pack(pack_id, move |pack| {
3605            pack.bindings
3606                .retain(|row| row.binding_id != binding.binding_id);
3607            pack.bindings.push(binding);
3608            Ok(())
3609        })
3610        .await
3611    }
3612
3613    pub async fn put_optimization_campaign(
3614        &self,
3615        mut campaign: OptimizationCampaignRecord,
3616    ) -> anyhow::Result<OptimizationCampaignRecord> {
3617        if campaign.optimization_id.trim().is_empty() {
3618            anyhow::bail!("optimization_id is required");
3619        }
3620        if campaign.source_workflow_id.trim().is_empty() {
3621            anyhow::bail!("source_workflow_id is required");
3622        }
3623        if campaign.name.trim().is_empty() {
3624            anyhow::bail!("name is required");
3625        }
3626        let now = now_ms();
3627        if campaign.created_at_ms == 0 {
3628            campaign.created_at_ms = now;
3629        }
3630        campaign.updated_at_ms = now;
3631        campaign.source_workflow_snapshot_hash =
3632            optimization_snapshot_hash(&campaign.source_workflow_snapshot);
3633        campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
3634        self.optimization_campaigns
3635            .write()
3636            .await
3637            .insert(campaign.optimization_id.clone(), campaign.clone());
3638        self.persist_optimization_campaigns().await?;
3639        Ok(campaign)
3640    }
3641
3642    pub async fn get_optimization_campaign(
3643        &self,
3644        optimization_id: &str,
3645    ) -> Option<OptimizationCampaignRecord> {
3646        self.optimization_campaigns
3647            .read()
3648            .await
3649            .get(optimization_id)
3650            .cloned()
3651    }
3652
3653    pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
3654        let mut rows = self
3655            .optimization_campaigns
3656            .read()
3657            .await
3658            .values()
3659            .cloned()
3660            .collect::<Vec<_>>();
3661        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3662        rows
3663    }
3664
3665    pub async fn put_optimization_experiment(
3666        &self,
3667        mut experiment: OptimizationExperimentRecord,
3668    ) -> anyhow::Result<OptimizationExperimentRecord> {
3669        if experiment.experiment_id.trim().is_empty() {
3670            anyhow::bail!("experiment_id is required");
3671        }
3672        if experiment.optimization_id.trim().is_empty() {
3673            anyhow::bail!("optimization_id is required");
3674        }
3675        let now = now_ms();
3676        if experiment.created_at_ms == 0 {
3677            experiment.created_at_ms = now;
3678        }
3679        experiment.updated_at_ms = now;
3680        experiment.candidate_snapshot_hash =
3681            optimization_snapshot_hash(&experiment.candidate_snapshot);
3682        self.optimization_experiments
3683            .write()
3684            .await
3685            .insert(experiment.experiment_id.clone(), experiment.clone());
3686        self.persist_optimization_experiments().await?;
3687        Ok(experiment)
3688    }
3689
3690    pub async fn get_optimization_experiment(
3691        &self,
3692        optimization_id: &str,
3693        experiment_id: &str,
3694    ) -> Option<OptimizationExperimentRecord> {
3695        self.optimization_experiments
3696            .read()
3697            .await
3698            .get(experiment_id)
3699            .filter(|row| row.optimization_id == optimization_id)
3700            .cloned()
3701    }
3702
3703    pub async fn list_optimization_experiments(
3704        &self,
3705        optimization_id: &str,
3706    ) -> Vec<OptimizationExperimentRecord> {
3707        let mut rows = self
3708            .optimization_experiments
3709            .read()
3710            .await
3711            .values()
3712            .filter(|row| row.optimization_id == optimization_id)
3713            .cloned()
3714            .collect::<Vec<_>>();
3715        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3716        rows
3717    }
3718
3719    pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
3720        self.optimization_experiments
3721            .read()
3722            .await
3723            .values()
3724            .filter(|row| row.optimization_id == optimization_id)
3725            .count()
3726    }
3727
3728    fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
3729        matches!(
3730            status,
3731            crate::AutomationRunStatus::Completed
3732                | crate::AutomationRunStatus::Blocked
3733                | crate::AutomationRunStatus::Failed
3734                | crate::AutomationRunStatus::Cancelled
3735        )
3736    }
3737
3738    fn optimization_consecutive_failure_count(
3739        experiments: &[OptimizationExperimentRecord],
3740    ) -> usize {
3741        let mut ordered = experiments.to_vec();
3742        ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
3743        ordered
3744            .iter()
3745            .rev()
3746            .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
3747            .count()
3748    }
3749
3750    fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
3751        match field {
3752            OptimizationMutableField::Objective => "objective",
3753            OptimizationMutableField::OutputContractSummaryGuidance => {
3754                "output_contract.summary_guidance"
3755            }
3756            OptimizationMutableField::TimeoutMs => "timeout_ms",
3757            OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
3758            OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
3759        }
3760    }
3761
3762    fn optimization_node_field_value(
3763        node: &crate::AutomationFlowNode,
3764        field: OptimizationMutableField,
3765    ) -> Result<Value, String> {
3766        match field {
3767            OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
3768            OptimizationMutableField::OutputContractSummaryGuidance => node
3769                .output_contract
3770                .as_ref()
3771                .and_then(|contract| contract.summary_guidance.clone())
3772                .map(Value::String)
3773                .ok_or_else(|| {
3774                    format!(
3775                        "node `{}` is missing output_contract.summary_guidance",
3776                        node.node_id
3777                    )
3778                }),
3779            OptimizationMutableField::TimeoutMs => node
3780                .timeout_ms
3781                .map(|value| json!(value))
3782                .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
3783            OptimizationMutableField::RetryPolicyMaxAttempts => node
3784                .retry_policy
3785                .as_ref()
3786                .and_then(Value::as_object)
3787                .and_then(|policy| policy.get("max_attempts"))
3788                .cloned()
3789                .ok_or_else(|| {
3790                    format!(
3791                        "node `{}` is missing retry_policy.max_attempts",
3792                        node.node_id
3793                    )
3794                }),
3795            OptimizationMutableField::RetryPolicyRetries => node
3796                .retry_policy
3797                .as_ref()
3798                .and_then(Value::as_object)
3799                .and_then(|policy| policy.get("retries"))
3800                .cloned()
3801                .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
3802        }
3803    }
3804
3805    fn set_optimization_node_field_value(
3806        node: &mut crate::AutomationFlowNode,
3807        field: OptimizationMutableField,
3808        value: &Value,
3809    ) -> Result<(), String> {
3810        match field {
3811            OptimizationMutableField::Objective => {
3812                node.objective = value
3813                    .as_str()
3814                    .ok_or_else(|| "objective apply value must be a string".to_string())?
3815                    .to_string();
3816            }
3817            OptimizationMutableField::OutputContractSummaryGuidance => {
3818                let guidance = value
3819                    .as_str()
3820                    .ok_or_else(|| {
3821                        "output_contract.summary_guidance apply value must be a string".to_string()
3822                    })?
3823                    .to_string();
3824                let contract = node.output_contract.as_mut().ok_or_else(|| {
3825                    format!(
3826                        "node `{}` is missing output_contract for apply",
3827                        node.node_id
3828                    )
3829                })?;
3830                contract.summary_guidance = Some(guidance);
3831            }
3832            OptimizationMutableField::TimeoutMs => {
3833                node.timeout_ms = Some(
3834                    value
3835                        .as_u64()
3836                        .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
3837                );
3838            }
3839            OptimizationMutableField::RetryPolicyMaxAttempts => {
3840                let next = value.as_i64().ok_or_else(|| {
3841                    "retry_policy.max_attempts apply value must be an integer".to_string()
3842                })?;
3843                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3844                let object = policy.as_object_mut().ok_or_else(|| {
3845                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
3846                })?;
3847                object.insert("max_attempts".to_string(), json!(next));
3848            }
3849            OptimizationMutableField::RetryPolicyRetries => {
3850                let next = value.as_i64().ok_or_else(|| {
3851                    "retry_policy.retries apply value must be an integer".to_string()
3852                })?;
3853                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3854                let object = policy.as_object_mut().ok_or_else(|| {
3855                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
3856                })?;
3857                object.insert("retries".to_string(), json!(next));
3858            }
3859        }
3860        Ok(())
3861    }
3862
3863    fn append_optimization_apply_metadata(
3864        metadata: Option<Value>,
3865        record: Value,
3866    ) -> Result<Option<Value>, String> {
3867        let mut root = match metadata {
3868            Some(Value::Object(map)) => map,
3869            Some(_) => return Err("automation metadata must be a JSON object".to_string()),
3870            None => serde_json::Map::new(),
3871        };
3872        let history = root
3873            .entry("optimization_apply_history".to_string())
3874            .or_insert_with(|| Value::Array(Vec::new()));
3875        let Some(entries) = history.as_array_mut() else {
3876            return Err("optimization_apply_history metadata must be an array".to_string());
3877        };
3878        entries.push(record.clone());
3879        root.insert("last_optimization_apply".to_string(), record);
3880        Ok(Some(Value::Object(root)))
3881    }
3882
3883    fn build_optimization_apply_patch(
3884        baseline: &crate::AutomationV2Spec,
3885        candidate: &crate::AutomationV2Spec,
3886        mutation: &crate::OptimizationValidatedMutation,
3887        approved_at_ms: u64,
3888    ) -> Result<Value, String> {
3889        let baseline_node = baseline
3890            .flow
3891            .nodes
3892            .iter()
3893            .find(|node| node.node_id == mutation.node_id)
3894            .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
3895        let candidate_node = candidate
3896            .flow
3897            .nodes
3898            .iter()
3899            .find(|node| node.node_id == mutation.node_id)
3900            .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
3901        let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
3902        let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
3903        Ok(json!({
3904            "node_id": mutation.node_id,
3905            "field": mutation.field,
3906            "field_path": Self::optimization_mutation_field_path(mutation.field),
3907            "expected_before": before,
3908            "apply_value": after,
3909            "approved_at_ms": approved_at_ms,
3910        }))
3911    }
3912
3913    pub async fn apply_optimization_winner(
3914        &self,
3915        optimization_id: &str,
3916        experiment_id: &str,
3917    ) -> Result<
3918        (
3919            OptimizationCampaignRecord,
3920            OptimizationExperimentRecord,
3921            crate::AutomationV2Spec,
3922        ),
3923        String,
3924    > {
3925        let campaign = self
3926            .get_optimization_campaign(optimization_id)
3927            .await
3928            .ok_or_else(|| "optimization not found".to_string())?;
3929        let mut experiment = self
3930            .get_optimization_experiment(optimization_id, experiment_id)
3931            .await
3932            .ok_or_else(|| "experiment not found".to_string())?;
3933        if experiment.status != OptimizationExperimentStatus::PromotionApproved {
3934            return Err("only approved winner experiments may be applied".to_string());
3935        }
3936        if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
3937            return Err(
3938                "only the latest approved winner may be applied to the live workflow".to_string(),
3939            );
3940        }
3941        let patch = experiment
3942            .metadata
3943            .as_ref()
3944            .and_then(|metadata| metadata.get("apply_patch"))
3945            .cloned()
3946            .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
3947        let node_id = patch
3948            .get("node_id")
3949            .and_then(Value::as_str)
3950            .map(str::to_string)
3951            .filter(|value| !value.is_empty())
3952            .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
3953        let field: OptimizationMutableField = serde_json::from_value(
3954            patch
3955                .get("field")
3956                .cloned()
3957                .ok_or_else(|| "apply_patch.field is required".to_string())?,
3958        )
3959        .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
3960        let expected_before = patch
3961            .get("expected_before")
3962            .cloned()
3963            .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
3964        let apply_value = patch
3965            .get("apply_value")
3966            .cloned()
3967            .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
3968        let mut live = self
3969            .get_automation_v2(&campaign.source_workflow_id)
3970            .await
3971            .ok_or_else(|| "source workflow not found".to_string())?;
3972        let current_value = {
3973            let live_node = live
3974                .flow
3975                .nodes
3976                .iter()
3977                .find(|node| node.node_id == node_id)
3978                .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3979            Self::optimization_node_field_value(live_node, field)?
3980        };
3981        if current_value != expected_before {
3982            return Err(format!(
3983                "live workflow drift detected for node `{node_id}` {}",
3984                Self::optimization_mutation_field_path(field)
3985            ));
3986        }
3987        let live_node = live
3988            .flow
3989            .nodes
3990            .iter_mut()
3991            .find(|node| node.node_id == node_id)
3992            .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3993        Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
3994        let applied_at_ms = now_ms();
3995        let apply_record = json!({
3996            "optimization_id": campaign.optimization_id,
3997            "experiment_id": experiment.experiment_id,
3998            "node_id": node_id,
3999            "field": field,
4000            "field_path": Self::optimization_mutation_field_path(field),
4001            "previous_value": expected_before,
4002            "new_value": apply_value,
4003            "applied_at_ms": applied_at_ms,
4004        });
4005        live.metadata =
4006            Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
4007        let stored_live = self
4008            .put_automation_v2(live)
4009            .await
4010            .map_err(|error| error.to_string())?;
4011        let mut metadata = match experiment.metadata.take() {
4012            Some(Value::Object(map)) => map,
4013            Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
4014            None => serde_json::Map::new(),
4015        };
4016        metadata.insert(
4017            "applied_to_live".to_string(),
4018            json!({
4019                "automation_id": stored_live.automation_id,
4020                "applied_at_ms": applied_at_ms,
4021                "field": field,
4022                "node_id": node_id,
4023            }),
4024        );
4025        experiment.metadata = Some(Value::Object(metadata));
4026        let stored_experiment = self
4027            .put_optimization_experiment(experiment)
4028            .await
4029            .map_err(|error| error.to_string())?;
4030        Ok((campaign, stored_experiment, stored_live))
4031    }
4032
4033    fn optimization_objective_hint(text: &str) -> String {
4034        let cleaned = text
4035            .lines()
4036            .map(str::trim)
4037            .filter(|line| !line.is_empty() && !line.starts_with('#'))
4038            .collect::<Vec<_>>()
4039            .join(" ");
4040        let hint = if cleaned.is_empty() {
4041            "Prioritize validator-complete output with explicit evidence."
4042        } else {
4043            cleaned.as_str()
4044        };
4045        let trimmed = hint.trim();
4046        let clipped = if trimmed.len() > 140 {
4047            trimmed[..140].trim_end()
4048        } else {
4049            trimmed
4050        };
4051        let mut sentence = clipped.trim_end_matches('.').to_string();
4052        if sentence.is_empty() {
4053            sentence = "Prioritize validator-complete output with explicit evidence".to_string();
4054        }
4055        sentence.push('.');
4056        sentence
4057    }
4058
4059    fn build_phase1_candidate_options(
4060        baseline: &crate::AutomationV2Spec,
4061        phase1: &crate::OptimizationPhase1Config,
4062    ) -> Vec<(
4063        crate::AutomationV2Spec,
4064        crate::OptimizationValidatedMutation,
4065    )> {
4066        let mut options = Vec::new();
4067        let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
4068        for (index, node) in baseline.flow.nodes.iter().enumerate() {
4069            if phase1
4070                .mutation_policy
4071                .allowed_text_fields
4072                .contains(&OptimizationMutableField::Objective)
4073            {
4074                let addition = if node.objective.contains(&hint) {
4075                    "Prioritize validator-complete output with concrete evidence."
4076                } else {
4077                    &hint
4078                };
4079                let mut candidate = baseline.clone();
4080                candidate.flow.nodes[index].objective =
4081                    format!("{} {}", node.objective.trim(), addition.trim())
4082                        .trim()
4083                        .to_string();
4084                if let Ok(validated) =
4085                    validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4086                {
4087                    options.push((candidate, validated));
4088                }
4089            }
4090            if phase1
4091                .mutation_policy
4092                .allowed_text_fields
4093                .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
4094            {
4095                if let Some(summary_guidance) = node
4096                    .output_contract
4097                    .as_ref()
4098                    .and_then(|contract| contract.summary_guidance.as_ref())
4099                {
4100                    let addition = if summary_guidance.contains("Cite concrete evidence") {
4101                        "Keep evidence explicit."
4102                    } else {
4103                        "Cite concrete evidence in the summary."
4104                    };
4105                    let mut candidate = baseline.clone();
4106                    if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
4107                        contract.summary_guidance = Some(
4108                            format!("{} {}", summary_guidance.trim(), addition)
4109                                .trim()
4110                                .to_string(),
4111                        );
4112                    }
4113                    if let Ok(validated) =
4114                        validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4115                    {
4116                        options.push((candidate, validated));
4117                    }
4118                }
4119            }
4120            if phase1
4121                .mutation_policy
4122                .allowed_knob_fields
4123                .contains(&OptimizationMutableField::TimeoutMs)
4124            {
4125                if let Some(timeout_ms) = node.timeout_ms {
4126                    let delta_by_percent = ((timeout_ms as f64)
4127                        * phase1.mutation_policy.timeout_delta_percent)
4128                        .round() as u64;
4129                    let delta = delta_by_percent
4130                        .min(phase1.mutation_policy.timeout_delta_ms)
4131                        .max(1);
4132                    let next = timeout_ms
4133                        .saturating_add(delta)
4134                        .min(phase1.mutation_policy.timeout_max_ms);
4135                    if next != timeout_ms {
4136                        let mut candidate = baseline.clone();
4137                        candidate.flow.nodes[index].timeout_ms = Some(next);
4138                        if let Ok(validated) =
4139                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4140                        {
4141                            options.push((candidate, validated));
4142                        }
4143                    }
4144                }
4145            }
4146            if phase1
4147                .mutation_policy
4148                .allowed_knob_fields
4149                .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
4150            {
4151                let current = node
4152                    .retry_policy
4153                    .as_ref()
4154                    .and_then(Value::as_object)
4155                    .and_then(|row| row.get("max_attempts"))
4156                    .and_then(Value::as_i64);
4157                if let Some(before) = current {
4158                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
4159                    if next != before {
4160                        let mut candidate = baseline.clone();
4161                        let policy = candidate.flow.nodes[index]
4162                            .retry_policy
4163                            .get_or_insert_with(|| json!({}));
4164                        if let Some(object) = policy.as_object_mut() {
4165                            object.insert("max_attempts".to_string(), json!(next));
4166                        }
4167                        if let Ok(validated) =
4168                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4169                        {
4170                            options.push((candidate, validated));
4171                        }
4172                    }
4173                }
4174            }
4175            if phase1
4176                .mutation_policy
4177                .allowed_knob_fields
4178                .contains(&OptimizationMutableField::RetryPolicyRetries)
4179            {
4180                let current = node
4181                    .retry_policy
4182                    .as_ref()
4183                    .and_then(Value::as_object)
4184                    .and_then(|row| row.get("retries"))
4185                    .and_then(Value::as_i64);
4186                if let Some(before) = current {
4187                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
4188                    if next != before {
4189                        let mut candidate = baseline.clone();
4190                        let policy = candidate.flow.nodes[index]
4191                            .retry_policy
4192                            .get_or_insert_with(|| json!({}));
4193                        if let Some(object) = policy.as_object_mut() {
4194                            object.insert("retries".to_string(), json!(next));
4195                        }
4196                        if let Ok(validated) =
4197                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
4198                        {
4199                            options.push((candidate, validated));
4200                        }
4201                    }
4202                }
4203            }
4204        }
4205        options
4206    }
4207
4208    async fn maybe_queue_phase1_candidate_experiment(
4209        &self,
4210        campaign: &mut OptimizationCampaignRecord,
4211    ) -> Result<bool, String> {
4212        let Some(phase1) = campaign.phase1.as_ref() else {
4213            return Ok(false);
4214        };
4215        let experiment_count = self
4216            .count_optimization_experiments(&campaign.optimization_id)
4217            .await;
4218        if experiment_count >= phase1.budget.max_experiments as usize {
4219            campaign.status = OptimizationCampaignStatus::Completed;
4220            campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
4221            campaign.updated_at_ms = now_ms();
4222            return Ok(true);
4223        }
4224        if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
4225        {
4226            return Ok(false);
4227        }
4228        let existing = self
4229            .list_optimization_experiments(&campaign.optimization_id)
4230            .await;
4231        let active_eval_exists = existing.iter().any(|experiment| {
4232            matches!(experiment.status, OptimizationExperimentStatus::Draft)
4233                && experiment
4234                    .metadata
4235                    .as_ref()
4236                    .and_then(|metadata| metadata.get("eval_run_id"))
4237                    .and_then(Value::as_str)
4238                    .is_some()
4239        });
4240        if active_eval_exists {
4241            return Ok(false);
4242        }
4243        let existing_hashes = existing
4244            .iter()
4245            .map(|experiment| experiment.candidate_snapshot_hash.clone())
4246            .collect::<std::collections::HashSet<_>>();
4247        let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
4248        let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
4249            !existing_hashes.contains(&optimization_snapshot_hash(candidate))
4250        }) else {
4251            campaign.status = OptimizationCampaignStatus::Completed;
4252            campaign.last_pause_reason = Some(
4253                "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
4254            );
4255            campaign.updated_at_ms = now_ms();
4256            return Ok(true);
4257        };
4258        let eval_run = self
4259            .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
4260            .await
4261            .map_err(|error| error.to_string())?;
4262        let now = now_ms();
4263        let experiment = OptimizationExperimentRecord {
4264            experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
4265            optimization_id: campaign.optimization_id.clone(),
4266            status: OptimizationExperimentStatus::Draft,
4267            candidate_snapshot: candidate_snapshot.clone(),
4268            candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
4269            baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
4270            mutation_summary: Some(mutation.summary.clone()),
4271            metrics: None,
4272            phase1_metrics: None,
4273            promotion_recommendation: None,
4274            promotion_decision: None,
4275            created_at_ms: now,
4276            updated_at_ms: now,
4277            metadata: Some(json!({
4278                "generator": "phase1_deterministic_v1",
4279                "eval_run_id": eval_run.run_id,
4280                "mutation": mutation,
4281            })),
4282        };
4283        self.put_optimization_experiment(experiment)
4284            .await
4285            .map_err(|error| error.to_string())?;
4286        campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
4287        campaign.updated_at_ms = now_ms();
4288        Ok(true)
4289    }
4290
4291    async fn reconcile_phase1_candidate_experiments(
4292        &self,
4293        campaign: &mut OptimizationCampaignRecord,
4294    ) -> Result<bool, String> {
4295        let Some(phase1) = campaign.phase1.as_ref() else {
4296            return Ok(false);
4297        };
4298        let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
4299            return Ok(false);
4300        };
4301        let experiments = self
4302            .list_optimization_experiments(&campaign.optimization_id)
4303            .await;
4304        let mut changed = false;
4305        for mut experiment in experiments {
4306            if experiment.status != OptimizationExperimentStatus::Draft {
4307                continue;
4308            }
4309            let Some(eval_run_id) = experiment
4310                .metadata
4311                .as_ref()
4312                .and_then(|metadata| metadata.get("eval_run_id"))
4313                .and_then(Value::as_str)
4314                .map(str::to_string)
4315            else {
4316                continue;
4317            };
4318            let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
4319                continue;
4320            };
4321            if !Self::automation_run_is_terminal(&run.status) {
4322                continue;
4323            }
4324            if run.status != crate::AutomationRunStatus::Completed {
4325                experiment.status = OptimizationExperimentStatus::Failed;
4326                let mut metadata = match experiment.metadata.take() {
4327                    Some(Value::Object(map)) => map,
4328                    Some(_) => serde_json::Map::new(),
4329                    None => serde_json::Map::new(),
4330                };
4331                metadata.insert(
4332                    "eval_failure".to_string(),
4333                    json!({
4334                        "run_id": run.run_id,
4335                        "status": run.status,
4336                    }),
4337                );
4338                experiment.metadata = Some(Value::Object(metadata));
4339                self.put_optimization_experiment(experiment)
4340                    .await
4341                    .map_err(|error| error.to_string())?;
4342                changed = true;
4343                continue;
4344            }
4345            if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
4346                experiment.status = OptimizationExperimentStatus::Failed;
4347                let mut metadata = match experiment.metadata.take() {
4348                    Some(Value::Object(map)) => map,
4349                    Some(_) => serde_json::Map::new(),
4350                    None => serde_json::Map::new(),
4351                };
4352                metadata.insert(
4353                    "eval_failure".to_string(),
4354                    json!({
4355                        "run_id": run.run_id,
4356                        "status": run.status,
4357                        "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
4358                    }),
4359                );
4360                experiment.metadata = Some(Value::Object(metadata));
4361                self.put_optimization_experiment(experiment)
4362                    .await
4363                    .map_err(|error| error.to_string())?;
4364                changed = true;
4365                continue;
4366            }
4367            let metrics =
4368                match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
4369                    Ok(metrics) => metrics,
4370                    Err(error) => {
4371                        experiment.status = OptimizationExperimentStatus::Failed;
4372                        let mut metadata = match experiment.metadata.take() {
4373                            Some(Value::Object(map)) => map,
4374                            Some(_) => serde_json::Map::new(),
4375                            None => serde_json::Map::new(),
4376                        };
4377                        metadata.insert(
4378                            "eval_failure".to_string(),
4379                            json!({
4380                                "run_id": run.run_id,
4381                                "status": run.status,
4382                                "reason": error,
4383                            }),
4384                        );
4385                        experiment.metadata = Some(Value::Object(metadata));
4386                        self.put_optimization_experiment(experiment)
4387                            .await
4388                            .map_err(|error| error.to_string())?;
4389                        changed = true;
4390                        continue;
4391                    }
4392                };
4393            let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
4394            experiment.phase1_metrics = Some(metrics.clone());
4395            experiment.metrics = Some(json!({
4396                "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
4397                "unmet_requirement_count": metrics.unmet_requirement_count,
4398                "blocked_node_rate": metrics.blocked_node_rate,
4399                "budget_within_limits": metrics.budget_within_limits,
4400            }));
4401            experiment.promotion_recommendation = Some(
4402                match decision.decision {
4403                    OptimizationPromotionDecisionKind::Promote => "promote",
4404                    OptimizationPromotionDecisionKind::Discard => "discard",
4405                    OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4406                        "needs_operator_review"
4407                    }
4408                }
4409                .to_string(),
4410            );
4411            experiment.promotion_decision = Some(decision.clone());
4412            match decision.decision {
4413                OptimizationPromotionDecisionKind::Promote
4414                | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4415                    experiment.status = OptimizationExperimentStatus::PromotionRecommended;
4416                    campaign.pending_promotion_experiment_id =
4417                        Some(experiment.experiment_id.clone());
4418                    campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
4419                    campaign.last_pause_reason = Some(decision.reason.clone());
4420                }
4421                OptimizationPromotionDecisionKind::Discard => {
4422                    experiment.status = OptimizationExperimentStatus::Discarded;
4423                    if campaign.status == OptimizationCampaignStatus::Running {
4424                        campaign.last_pause_reason = Some(decision.reason.clone());
4425                    }
4426                }
4427            }
4428            self.put_optimization_experiment(experiment)
4429                .await
4430                .map_err(|error| error.to_string())?;
4431            changed = true;
4432        }
4433        let refreshed = self
4434            .list_optimization_experiments(&campaign.optimization_id)
4435            .await;
4436        let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
4437        if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
4438            && phase1.budget.max_consecutive_failures > 0
4439        {
4440            campaign.status = OptimizationCampaignStatus::Failed;
4441            campaign.last_pause_reason = Some(format!(
4442                "phase 1 candidate evaluations reached {} consecutive failures",
4443                consecutive_failures
4444            ));
4445            changed = true;
4446        }
4447        Ok(changed)
4448    }
4449
4450    async fn reconcile_pending_baseline_replays(
4451        &self,
4452        campaign: &mut OptimizationCampaignRecord,
4453    ) -> Result<bool, String> {
4454        let Some(phase1) = campaign.phase1.as_ref() else {
4455            return Ok(false);
4456        };
4457        let mut changed = false;
4458        let mut remaining = Vec::new();
4459        for run_id in campaign.pending_baseline_run_ids.clone() {
4460            let Some(run) = self.get_automation_v2_run(&run_id).await else {
4461                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4462                campaign.last_pause_reason = Some(format!(
4463                    "baseline replay run `{run_id}` was not found during optimization reconciliation"
4464                ));
4465                changed = true;
4466                continue;
4467            };
4468            if !Self::automation_run_is_terminal(&run.status) {
4469                remaining.push(run_id);
4470                continue;
4471            }
4472            if run.status != crate::AutomationRunStatus::Completed {
4473                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4474                campaign.last_pause_reason = Some(format!(
4475                    "baseline replay run `{}` finished with status `{:?}`",
4476                    run.run_id, run.status
4477                ));
4478                changed = true;
4479                continue;
4480            }
4481            if run.automation_id != campaign.source_workflow_id {
4482                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4483                campaign.last_pause_reason = Some(
4484                    "baseline replay run must belong to the optimization source workflow"
4485                        .to_string(),
4486                );
4487                changed = true;
4488                continue;
4489            }
4490            let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
4491                "baseline replay run must include an automation snapshot".to_string()
4492            })?;
4493            if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
4494                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4495                campaign.last_pause_reason = Some(
4496                    "baseline replay run does not match the current campaign baseline snapshot"
4497                        .to_string(),
4498                );
4499                changed = true;
4500                continue;
4501            }
4502            let metrics =
4503                derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4504            let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4505            campaign
4506                .baseline_replays
4507                .push(OptimizationBaselineReplayRecord {
4508                    replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4509                    automation_run_id: Some(run.run_id.clone()),
4510                    phase1_metrics: metrics,
4511                    validator_case_outcomes,
4512                    experiment_count_at_recording: self
4513                        .count_optimization_experiments(&campaign.optimization_id)
4514                        .await as u64,
4515                    recorded_at_ms: now_ms(),
4516                });
4517            changed = true;
4518        }
4519        if remaining != campaign.pending_baseline_run_ids {
4520            campaign.pending_baseline_run_ids = remaining;
4521            changed = true;
4522        }
4523        Ok(changed)
4524    }
4525
4526    pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
4527        let campaigns = self.list_optimization_campaigns().await;
4528        let mut updated = 0usize;
4529        for campaign in campaigns {
4530            let Some(mut latest) = self
4531                .get_optimization_campaign(&campaign.optimization_id)
4532                .await
4533            else {
4534                continue;
4535            };
4536            let Some(phase1) = latest.phase1.clone() else {
4537                continue;
4538            };
4539            let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
4540            changed |= self
4541                .reconcile_phase1_candidate_experiments(&mut latest)
4542                .await?;
4543            let experiment_count = self
4544                .count_optimization_experiments(&latest.optimization_id)
4545                .await;
4546            if latest.pending_baseline_run_ids.is_empty() {
4547                if phase1_baseline_replay_due(
4548                    &latest.baseline_replays,
4549                    latest.pending_baseline_run_ids.len(),
4550                    &phase1,
4551                    experiment_count,
4552                    now_ms(),
4553                ) {
4554                    if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
4555                        latest.status = OptimizationCampaignStatus::Draft;
4556                        changed = true;
4557                    }
4558                } else if latest.baseline_replays.len()
4559                    >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
4560                {
4561                    match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
4562                        Ok(metrics) => {
4563                            if latest.baseline_metrics.as_ref() != Some(&metrics) {
4564                                latest.baseline_metrics = Some(metrics);
4565                                changed = true;
4566                            }
4567                            if matches!(
4568                                latest.status,
4569                                OptimizationCampaignStatus::Draft
4570                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
4571                            ) || (latest.status == OptimizationCampaignStatus::Running
4572                                && latest.last_pause_reason.is_some())
4573                            {
4574                                latest.status = OptimizationCampaignStatus::Running;
4575                                latest.last_pause_reason = None;
4576                                changed = true;
4577                            }
4578                        }
4579                        Err(error) => {
4580                            if matches!(
4581                                latest.status,
4582                                OptimizationCampaignStatus::Draft
4583                                    | OptimizationCampaignStatus::Running
4584                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
4585                            ) && (latest.status
4586                                != OptimizationCampaignStatus::PausedEvaluatorUnstable
4587                                || latest.last_pause_reason.as_deref() != Some(error.as_str()))
4588                            {
4589                                latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4590                                latest.last_pause_reason = Some(error);
4591                                changed = true;
4592                            }
4593                        }
4594                    }
4595                }
4596            } else if latest.last_pause_reason.as_deref()
4597                != Some("waiting for phase 1 baseline replay completion")
4598            {
4599                latest.last_pause_reason =
4600                    Some("waiting for phase 1 baseline replay completion".to_string());
4601                changed = true;
4602            }
4603            if latest.status == OptimizationCampaignStatus::Running
4604                && latest.pending_baseline_run_ids.is_empty()
4605            {
4606                changed |= self
4607                    .maybe_queue_phase1_candidate_experiment(&mut latest)
4608                    .await?;
4609            }
4610            if changed {
4611                self.put_optimization_campaign(latest)
4612                    .await
4613                    .map_err(|error| error.to_string())?;
4614                updated = updated.saturating_add(1);
4615            }
4616        }
4617        Ok(updated)
4618    }
4619
4620    async fn maybe_queue_phase1_baseline_replay(
4621        &self,
4622        campaign: &mut OptimizationCampaignRecord,
4623    ) -> Result<bool, String> {
4624        let Some(phase1) = campaign.phase1.as_ref() else {
4625            return Ok(false);
4626        };
4627        if !campaign.pending_baseline_run_ids.is_empty() {
4628            campaign.last_pause_reason =
4629                Some("waiting for phase 1 baseline replay completion".into());
4630            campaign.updated_at_ms = now_ms();
4631            return Ok(true);
4632        }
4633        let experiment_count = self
4634            .count_optimization_experiments(&campaign.optimization_id)
4635            .await;
4636        if !phase1_baseline_replay_due(
4637            &campaign.baseline_replays,
4638            campaign.pending_baseline_run_ids.len(),
4639            phase1,
4640            experiment_count,
4641            now_ms(),
4642        ) {
4643            return Ok(false);
4644        }
4645        let replay_run = self
4646            .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
4647            .await
4648            .map_err(|error| error.to_string())?;
4649        if !campaign
4650            .pending_baseline_run_ids
4651            .iter()
4652            .any(|value| value == &replay_run.run_id)
4653        {
4654            campaign
4655                .pending_baseline_run_ids
4656                .push(replay_run.run_id.clone());
4657        }
4658        campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
4659        campaign.updated_at_ms = now_ms();
4660        Ok(true)
4661    }
4662
4663    async fn maybe_queue_initial_phase1_baseline_replay(
4664        &self,
4665        campaign: &mut OptimizationCampaignRecord,
4666    ) -> Result<bool, String> {
4667        let Some(phase1) = campaign.phase1.as_ref() else {
4668            return Ok(false);
4669        };
4670        let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
4671        if campaign.baseline_replays.len() >= required_runs {
4672            return Ok(false);
4673        }
4674        self.maybe_queue_phase1_baseline_replay(campaign).await
4675    }
4676
4677    pub async fn apply_optimization_action(
4678        &self,
4679        optimization_id: &str,
4680        action: &str,
4681        experiment_id: Option<&str>,
4682        run_id: Option<&str>,
4683        reason: Option<&str>,
4684    ) -> Result<OptimizationCampaignRecord, String> {
4685        let normalized = action.trim().to_ascii_lowercase();
4686        let mut campaign = self
4687            .get_optimization_campaign(optimization_id)
4688            .await
4689            .ok_or_else(|| "optimization not found".to_string())?;
4690        match normalized.as_str() {
4691            "start" => {
4692                if campaign.phase1.is_some() {
4693                    if self
4694                        .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
4695                        .await?
4696                    {
4697                        campaign.status = OptimizationCampaignStatus::Draft;
4698                    } else {
4699                        let phase1 = campaign
4700                            .phase1
4701                            .as_ref()
4702                            .ok_or_else(|| "phase 1 config is required".to_string())?;
4703                        match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
4704                            Ok(metrics) => {
4705                                campaign.baseline_metrics = Some(metrics);
4706                                campaign.status = OptimizationCampaignStatus::Running;
4707                                campaign.last_pause_reason = None;
4708                            }
4709                            Err(error) => {
4710                                campaign.status =
4711                                    OptimizationCampaignStatus::PausedEvaluatorUnstable;
4712                                campaign.last_pause_reason = Some(error);
4713                            }
4714                        }
4715                    }
4716                } else {
4717                    campaign.status = OptimizationCampaignStatus::Running;
4718                    campaign.last_pause_reason = None;
4719                }
4720            }
4721            "pause" => {
4722                campaign.status = OptimizationCampaignStatus::PausedManual;
4723                campaign.last_pause_reason = reason
4724                    .map(str::trim)
4725                    .filter(|value| !value.is_empty())
4726                    .map(str::to_string);
4727            }
4728            "resume" => {
4729                if self
4730                    .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
4731                    .await?
4732                {
4733                    campaign.status = OptimizationCampaignStatus::Draft;
4734                } else {
4735                    campaign.status = OptimizationCampaignStatus::Running;
4736                    campaign.last_pause_reason = None;
4737                }
4738            }
4739            "queue_baseline_replay" => {
4740                let replay_run = self
4741                    .create_automation_v2_run(
4742                        &campaign.baseline_snapshot,
4743                        "optimization_baseline_replay",
4744                    )
4745                    .await
4746                    .map_err(|error| error.to_string())?;
4747                if !campaign
4748                    .pending_baseline_run_ids
4749                    .iter()
4750                    .any(|value| value == &replay_run.run_id)
4751                {
4752                    campaign
4753                        .pending_baseline_run_ids
4754                        .push(replay_run.run_id.clone());
4755                }
4756                campaign.updated_at_ms = now_ms();
4757            }
4758            "record_baseline_replay" => {
4759                let run_id = run_id
4760                    .map(str::trim)
4761                    .filter(|value| !value.is_empty())
4762                    .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
4763                let phase1 = campaign
4764                    .phase1
4765                    .as_ref()
4766                    .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
4767                let run = self
4768                    .get_automation_v2_run(run_id)
4769                    .await
4770                    .ok_or_else(|| "automation run not found".to_string())?;
4771                if run.automation_id != campaign.source_workflow_id {
4772                    return Err(
4773                        "baseline replay run must belong to the optimization source workflow"
4774                            .to_string(),
4775                    );
4776                }
4777                let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
4778                    "baseline replay run must include an automation snapshot".to_string()
4779                })?;
4780                if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
4781                    return Err(
4782                        "baseline replay run does not match the current campaign baseline snapshot"
4783                            .to_string(),
4784                    );
4785                }
4786                let metrics =
4787                    derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4788                let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4789                campaign
4790                    .baseline_replays
4791                    .push(OptimizationBaselineReplayRecord {
4792                        replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4793                        automation_run_id: Some(run.run_id.clone()),
4794                        phase1_metrics: metrics,
4795                        validator_case_outcomes,
4796                        experiment_count_at_recording: self
4797                            .count_optimization_experiments(&campaign.optimization_id)
4798                            .await as u64,
4799                        recorded_at_ms: now_ms(),
4800                    });
4801                campaign
4802                    .pending_baseline_run_ids
4803                    .retain(|value| value != run_id);
4804                campaign.updated_at_ms = now_ms();
4805            }
4806            "approve_winner" => {
4807                let experiment_id = experiment_id
4808                    .map(str::trim)
4809                    .filter(|value| !value.is_empty())
4810                    .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
4811                let mut experiment = self
4812                    .get_optimization_experiment(optimization_id, experiment_id)
4813                    .await
4814                    .ok_or_else(|| "experiment not found".to_string())?;
4815                if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
4816                    return Err(
4817                        "experiment baseline_snapshot_hash does not match current campaign baseline"
4818                            .to_string(),
4819                    );
4820                }
4821                if let Some(phase1) = campaign.phase1.as_ref() {
4822                    let validated = validate_phase1_candidate_mutation(
4823                        &campaign.baseline_snapshot,
4824                        &experiment.candidate_snapshot,
4825                        phase1,
4826                    )?;
4827                    if experiment.mutation_summary.is_none() {
4828                        experiment.mutation_summary = Some(validated.summary.clone());
4829                    }
4830                    let approved_at_ms = now_ms();
4831                    let apply_patch = Self::build_optimization_apply_patch(
4832                        &campaign.baseline_snapshot,
4833                        &experiment.candidate_snapshot,
4834                        &validated,
4835                        approved_at_ms,
4836                    )?;
4837                    let mut metadata = match experiment.metadata.take() {
4838                        Some(Value::Object(map)) => map,
4839                        Some(_) => {
4840                            return Err("experiment metadata must be a JSON object".to_string());
4841                        }
4842                        None => serde_json::Map::new(),
4843                    };
4844                    metadata.insert("apply_patch".to_string(), apply_patch);
4845                    experiment.metadata = Some(Value::Object(metadata));
4846                    if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
4847                        let candidate_metrics = experiment
4848                            .phase1_metrics
4849                            .clone()
4850                            .or_else(|| {
4851                                experiment
4852                                    .metrics
4853                                    .as_ref()
4854                                    .and_then(|metrics| parse_phase1_metrics(metrics).ok())
4855                            })
4856                            .ok_or_else(|| {
4857                                "phase 1 candidate is missing promotion metrics".to_string()
4858                            })?;
4859                        let decision =
4860                            evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
4861                        experiment.promotion_recommendation = Some(
4862                            match decision.decision {
4863                                OptimizationPromotionDecisionKind::Promote => "promote",
4864                                OptimizationPromotionDecisionKind::Discard => "discard",
4865                                OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4866                                    "needs_operator_review"
4867                                }
4868                            }
4869                            .to_string(),
4870                        );
4871                        experiment.promotion_decision = Some(decision.clone());
4872                        if decision.decision != OptimizationPromotionDecisionKind::Promote {
4873                            let _ = self
4874                                .put_optimization_experiment(experiment)
4875                                .await
4876                                .map_err(|e| e.to_string())?;
4877                            return Err(decision.reason);
4878                        }
4879                        campaign.baseline_metrics = Some(candidate_metrics);
4880                    }
4881                }
4882                campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
4883                campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
4884                campaign.baseline_replays.clear();
4885                campaign.pending_baseline_run_ids.clear();
4886                campaign.pending_promotion_experiment_id = None;
4887                campaign.status = OptimizationCampaignStatus::Draft;
4888                campaign.last_pause_reason = None;
4889                experiment.status = OptimizationExperimentStatus::PromotionApproved;
4890                let _ = self
4891                    .put_optimization_experiment(experiment)
4892                    .await
4893                    .map_err(|e| e.to_string())?;
4894            }
4895            "reject_winner" => {
4896                let experiment_id = experiment_id
4897                    .map(str::trim)
4898                    .filter(|value| !value.is_empty())
4899                    .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
4900                let mut experiment = self
4901                    .get_optimization_experiment(optimization_id, experiment_id)
4902                    .await
4903                    .ok_or_else(|| "experiment not found".to_string())?;
4904                campaign.pending_promotion_experiment_id = None;
4905                campaign.status = OptimizationCampaignStatus::Draft;
4906                campaign.last_pause_reason = reason
4907                    .map(str::trim)
4908                    .filter(|value| !value.is_empty())
4909                    .map(str::to_string);
4910                experiment.status = OptimizationExperimentStatus::PromotionRejected;
4911                let _ = self
4912                    .put_optimization_experiment(experiment)
4913                    .await
4914                    .map_err(|e| e.to_string())?;
4915            }
4916            _ => return Err("unsupported optimization action".to_string()),
4917        }
4918        self.put_optimization_campaign(campaign)
4919            .await
4920            .map_err(|e| e.to_string())
4921    }
4922
4923    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
4924        let mut rows = self
4925            .automations_v2
4926            .read()
4927            .await
4928            .values()
4929            .cloned()
4930            .collect::<Vec<_>>();
4931        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
4932        rows
4933    }
4934
4935    pub async fn delete_automation_v2(
4936        &self,
4937        automation_id: &str,
4938    ) -> anyhow::Result<Option<AutomationV2Spec>> {
4939        let _guard = self.automations_v2_persistence.lock().await;
4940        let removed = self.automations_v2.write().await.remove(automation_id);
4941        let removed_run_count = {
4942            let mut runs = self.automation_v2_runs.write().await;
4943            let before = runs.len();
4944            runs.retain(|_, run| run.automation_id != automation_id);
4945            before.saturating_sub(runs.len())
4946        };
4947        self.persist_automations_v2_locked().await?;
4948        if removed_run_count > 0 {
4949            self.persist_automation_v2_runs().await?;
4950        }
4951        self.verify_automation_v2_persisted_locked(automation_id, false)
4952            .await?;
4953        Ok(removed)
4954    }
4955
4956    pub async fn create_automation_v2_run(
4957        &self,
4958        automation: &AutomationV2Spec,
4959        trigger_type: &str,
4960    ) -> anyhow::Result<AutomationV2RunRecord> {
4961        let now = now_ms();
4962        let runtime_context = self
4963            .automation_v2_effective_runtime_context(
4964                automation,
4965                automation
4966                    .runtime_context_materialization()
4967                    .or_else(|| automation.approved_plan_runtime_context_materialization()),
4968            )
4969            .await?;
4970        let pending_nodes = automation
4971            .flow
4972            .nodes
4973            .iter()
4974            .map(|n| n.node_id.clone())
4975            .collect::<Vec<_>>();
4976        let run = AutomationV2RunRecord {
4977            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4978            automation_id: automation.automation_id.clone(),
4979            tenant_context: TenantContext::local_implicit(),
4980            trigger_type: trigger_type.to_string(),
4981            status: AutomationRunStatus::Queued,
4982            created_at_ms: now,
4983            updated_at_ms: now,
4984            started_at_ms: None,
4985            finished_at_ms: None,
4986            active_session_ids: Vec::new(),
4987            latest_session_id: None,
4988            active_instance_ids: Vec::new(),
4989            checkpoint: AutomationRunCheckpoint {
4990                completed_nodes: Vec::new(),
4991                pending_nodes,
4992                node_outputs: std::collections::HashMap::new(),
4993                node_attempts: std::collections::HashMap::new(),
4994                blocked_nodes: Vec::new(),
4995                awaiting_gate: None,
4996                gate_history: Vec::new(),
4997                lifecycle_history: Vec::new(),
4998                last_failure: None,
4999            },
5000            runtime_context,
5001            automation_snapshot: Some(automation.clone()),
5002            pause_reason: None,
5003            resume_reason: None,
5004            detail: None,
5005            stop_kind: None,
5006            stop_reason: None,
5007            prompt_tokens: 0,
5008            completion_tokens: 0,
5009            total_tokens: 0,
5010            estimated_cost_usd: 0.0,
5011            scheduler: None,
5012            trigger_reason: None,
5013            consumed_handoff_id: None,
5014            learning_summary: None,
5015        };
5016        self.automation_v2_runs
5017            .write()
5018            .await
5019            .insert(run.run_id.clone(), run.clone());
5020        self.persist_automation_v2_runs().await?;
5021        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
5022            .await
5023            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
5024        Ok(run)
5025    }
5026
5027    pub async fn create_automation_v2_dry_run(
5028        &self,
5029        automation: &AutomationV2Spec,
5030        trigger_type: &str,
5031    ) -> anyhow::Result<AutomationV2RunRecord> {
5032        let now = now_ms();
5033        let runtime_context = self
5034            .automation_v2_effective_runtime_context(
5035                automation,
5036                automation
5037                    .runtime_context_materialization()
5038                    .or_else(|| automation.approved_plan_runtime_context_materialization()),
5039            )
5040            .await?;
5041        let run = AutomationV2RunRecord {
5042            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
5043            automation_id: automation.automation_id.clone(),
5044            tenant_context: TenantContext::local_implicit(),
5045            trigger_type: format!("{trigger_type}_dry_run"),
5046            status: AutomationRunStatus::Completed,
5047            created_at_ms: now,
5048            updated_at_ms: now,
5049            started_at_ms: Some(now),
5050            finished_at_ms: Some(now),
5051            active_session_ids: Vec::new(),
5052            latest_session_id: None,
5053            active_instance_ids: Vec::new(),
5054            checkpoint: AutomationRunCheckpoint {
5055                completed_nodes: Vec::new(),
5056                pending_nodes: Vec::new(),
5057                node_outputs: std::collections::HashMap::new(),
5058                node_attempts: std::collections::HashMap::new(),
5059                blocked_nodes: Vec::new(),
5060                awaiting_gate: None,
5061                gate_history: Vec::new(),
5062                lifecycle_history: Vec::new(),
5063                last_failure: None,
5064            },
5065            runtime_context,
5066            automation_snapshot: Some(automation.clone()),
5067            pause_reason: None,
5068            resume_reason: None,
5069            detail: Some("dry_run".to_string()),
5070            stop_kind: None,
5071            stop_reason: None,
5072            prompt_tokens: 0,
5073            completion_tokens: 0,
5074            total_tokens: 0,
5075            estimated_cost_usd: 0.0,
5076            scheduler: None,
5077            trigger_reason: None,
5078            consumed_handoff_id: None,
5079            learning_summary: None,
5080        };
5081        self.automation_v2_runs
5082            .write()
5083            .await
5084            .insert(run.run_id.clone(), run.clone());
5085        self.persist_automation_v2_runs().await?;
5086        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
5087            .await
5088            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
5089        Ok(run)
5090    }
5091
5092    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
5093        self.automation_v2_runs.read().await.get(run_id).cloned()
5094    }
5095
5096    pub async fn list_automation_v2_runs(
5097        &self,
5098        automation_id: Option<&str>,
5099        limit: usize,
5100    ) -> Vec<AutomationV2RunRecord> {
5101        let mut rows = self
5102            .automation_v2_runs
5103            .read()
5104            .await
5105            .values()
5106            .filter(|row| {
5107                if let Some(id) = automation_id {
5108                    row.automation_id == id
5109                } else {
5110                    true
5111                }
5112            })
5113            .cloned()
5114            .collect::<Vec<_>>();
5115        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
5116        rows.truncate(limit.clamp(1, 500));
5117        rows
5118    }
5119
5120    async fn automation_v2_run_workspace_root(
5121        &self,
5122        run: &AutomationV2RunRecord,
5123    ) -> Option<String> {
5124        if let Some(root) = run
5125            .automation_snapshot
5126            .as_ref()
5127            .and_then(|automation| automation.workspace_root.as_ref())
5128            .map(|value| value.trim())
5129            .filter(|value| !value.is_empty())
5130        {
5131            return Some(root.to_string());
5132        }
5133        self.get_automation_v2(&run.automation_id)
5134            .await
5135            .and_then(|automation| automation.workspace_root)
5136            .map(|value| value.trim().to_string())
5137            .filter(|value| !value.is_empty())
5138    }
5139
5140    async fn sync_automation_scheduler_for_run_transition(
5141        &self,
5142        previous_status: AutomationRunStatus,
5143        run: &AutomationV2RunRecord,
5144    ) {
5145        let had_capacity = automation_status_uses_scheduler_capacity(&previous_status);
5146        let has_capacity = automation_status_uses_scheduler_capacity(&run.status);
5147        let had_lock = automation_status_holds_workspace_lock(&previous_status);
5148        let has_lock = automation_status_holds_workspace_lock(&run.status);
5149        let workspace_root = self.automation_v2_run_workspace_root(run).await;
5150        let mut scheduler = self.automation_scheduler.write().await;
5151
5152        if (had_capacity || had_lock) && !has_capacity && !has_lock {
5153            scheduler.release_run(&run.run_id);
5154            return;
5155        }
5156        if had_capacity && !has_capacity {
5157            scheduler.release_capacity(&run.run_id);
5158        }
5159        if had_lock && !has_lock {
5160            scheduler.release_workspace(&run.run_id);
5161        }
5162        if !had_lock && has_lock {
5163            if has_capacity {
5164                scheduler.admit_run(&run.run_id, workspace_root.as_deref());
5165            } else {
5166                scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
5167            }
5168            return;
5169        }
5170        if !had_capacity && has_capacity {
5171            scheduler.admit_run(&run.run_id, workspace_root.as_deref());
5172        }
5173    }
5174
5175    async fn automation_run_last_activity_at_ms(&self, run: &AutomationV2RunRecord) -> u64 {
5176        let mut last_activity_at_ms = automation::lifecycle::automation_last_activity_at_ms(run);
5177        for session_id in &run.active_session_ids {
5178            if let Some(session) = self.storage.get_session(session_id).await {
5179                last_activity_at_ms = last_activity_at_ms.max(
5180                    session
5181                        .time
5182                        .updated
5183                        .timestamp_millis()
5184                        .max(0)
5185                        .try_into()
5186                        .unwrap_or_default(),
5187                );
5188            }
5189        }
5190        last_activity_at_ms
5191    }
5192
5193    pub async fn reap_stale_running_automation_runs(&self, stale_after_ms: u64) -> usize {
5194        let now = now_ms();
5195        let candidate_runs = self
5196            .automation_v2_runs
5197            .read()
5198            .await
5199            .values()
5200            .filter(|run| run.status == AutomationRunStatus::Running)
5201            .cloned()
5202            .collect::<Vec<_>>();
5203        let mut runs = Vec::new();
5204        for run in candidate_runs {
5205            let last_activity_at_ms = self.automation_run_last_activity_at_ms(&run).await;
5206            if now.saturating_sub(last_activity_at_ms) >= stale_after_ms {
5207                runs.push(run);
5208            }
5209        }
5210        let mut reaped = 0usize;
5211        for run in runs {
5212            let run_id = run.run_id.clone();
5213            let session_ids = run.active_session_ids.clone();
5214            let instance_ids = run.active_instance_ids.clone();
5215            let stale_node_ids = automation::lifecycle::automation_in_progress_node_ids(&run);
5216            let detail = format!(
5217                "automation run paused after no provider activity for at least {}s",
5218                stale_after_ms / 1000
5219            );
5220            for session_id in &session_ids {
5221                let _ = self.cancellations.cancel(session_id).await;
5222            }
5223            for instance_id in instance_ids {
5224                let _ = self
5225                    .agent_teams
5226                    .cancel_instance(self, &instance_id, "paused by stale-run reaper")
5227                    .await;
5228            }
5229            self.forget_automation_v2_sessions(&session_ids).await;
5230            if self
5231                .update_automation_v2_run(&run_id, |row| {
5232                    let stale_node_detail = format!(
5233                        "node execution stalled after no provider activity for at least {}s",
5234                        stale_after_ms / 1000
5235                    );
5236                    let automation_snapshot = row.automation_snapshot.clone();
5237                    let mut annotated_nodes = Vec::new();
5238                    if let Some(automation) = automation_snapshot.as_ref() {
5239                        for node_id in &stale_node_ids {
5240                            if row.checkpoint.node_outputs.contains_key(node_id) {
5241                                continue;
5242                            }
5243                            let Some(node) = automation
5244                                .flow
5245                                .nodes
5246                                .iter()
5247                                .find(|candidate| &candidate.node_id == node_id)
5248                            else {
5249                                continue;
5250                            };
5251                            let attempts =
5252                                row.checkpoint.node_attempts.get(node_id).copied().unwrap_or(1);
5253                            let max_attempts = automation_node_max_attempts(node);
5254                            let terminal = attempts >= max_attempts;
5255                            row.checkpoint.node_outputs.insert(
5256                                node_id.clone(),
5257                                crate::automation_v2::executor::build_node_execution_error_output_with_category(
5258                                    node,
5259                                    &stale_node_detail,
5260                                    terminal,
5261                                    "execution_error",
5262                                ),
5263                            );
5264                            if row.checkpoint.last_failure.is_none() {
5265                                row.checkpoint.last_failure = Some(
5266                                    crate::automation_v2::types::AutomationFailureRecord {
5267                                        node_id: node_id.clone(),
5268                                        reason: stale_node_detail.clone(),
5269                                        failed_at_ms: now_ms(),
5270                                    },
5271                                );
5272                            }
5273                            annotated_nodes.push(node_id.clone());
5274                        }
5275                    }
5276                    row.status = AutomationRunStatus::Paused;
5277                    row.pause_reason = Some("stale_no_provider_activity".to_string());
5278                    row.detail = Some(if annotated_nodes.is_empty() {
5279                        detail.clone()
5280                    } else {
5281                        format!(
5282                            "{}; repairable node(s): {}",
5283                            detail,
5284                            annotated_nodes.join(", ")
5285                        )
5286                    });
5287                    row.stop_kind = Some(AutomationStopKind::StaleReaped);
5288                    row.stop_reason = Some(detail.clone());
5289                    row.active_session_ids.clear();
5290                    row.latest_session_id = None;
5291                    row.active_instance_ids.clear();
5292                    automation::record_automation_lifecycle_event(
5293                        row,
5294                        "run_paused_stale_no_provider_activity",
5295                        Some(detail.clone()),
5296                        Some(AutomationStopKind::StaleReaped),
5297                    );
5298                    if let Some(automation) = automation_snapshot.as_ref() {
5299                        automation::refresh_automation_runtime_state(automation, row);
5300                    }
5301                })
5302                .await
5303                .is_some()
5304            {
5305                reaped += 1;
5306            }
5307        }
5308        reaped
5309    }
5310
5311    pub async fn recover_in_flight_runs(&self) -> usize {
5312        let runs = self
5313            .automation_v2_runs
5314            .read()
5315            .await
5316            .values()
5317            .cloned()
5318            .collect::<Vec<_>>();
5319        let mut recovered = 0usize;
5320        for run in runs {
5321            match run.status {
5322                AutomationRunStatus::Running => {
5323                    let detail = "automation run interrupted by server restart".to_string();
5324                    if self
5325                        .update_automation_v2_run(&run.run_id, |row| {
5326                            row.status = AutomationRunStatus::Failed;
5327                            row.detail = Some(detail.clone());
5328                            row.stop_kind = Some(AutomationStopKind::ServerRestart);
5329                            row.stop_reason = Some(detail.clone());
5330                            automation::record_automation_lifecycle_event(
5331                                row,
5332                                "run_failed_server_restart",
5333                                Some(detail.clone()),
5334                                Some(AutomationStopKind::ServerRestart),
5335                            );
5336                        })
5337                        .await
5338                        .is_some()
5339                    {
5340                        recovered += 1;
5341                    }
5342                }
5343                AutomationRunStatus::Paused
5344                | AutomationRunStatus::Pausing
5345                | AutomationRunStatus::AwaitingApproval => {
5346                    let mut scheduler = self.automation_scheduler.write().await;
5347                    if automation_status_holds_workspace_lock(&run.status) {
5348                        let workspace_root = self.automation_v2_run_workspace_root(&run).await;
5349                        scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
5350                    }
5351                    for (node_id, output) in &run.checkpoint.node_outputs {
5352                        if let Some((path, content_digest)) =
5353                            automation::node_output::automation_output_validated_artifact(output)
5354                        {
5355                            scheduler.preexisting_registry.register_validated(
5356                                &run.run_id,
5357                                node_id,
5358                                automation::scheduler::ValidatedArtifact {
5359                                    path,
5360                                    content_digest,
5361                                },
5362                            );
5363                        }
5364                    }
5365                }
5366                _ => {}
5367            }
5368        }
5369        recovered
5370    }
5371
5372    pub async fn auto_resume_stale_reaped_runs(&self) -> usize {
5373        let candidate_runs = self
5374            .automation_v2_runs
5375            .read()
5376            .await
5377            .values()
5378            .filter(|run| run.status == AutomationRunStatus::Paused)
5379            .filter(|run| run.stop_kind == Some(AutomationStopKind::StaleReaped))
5380            .cloned()
5381            .collect::<Vec<_>>();
5382        let mut resumed = 0usize;
5383        for run in candidate_runs {
5384            let auto_resume_count = run
5385                .checkpoint
5386                .lifecycle_history
5387                .iter()
5388                .filter(|event| event.event == "run_auto_resumed")
5389                .count();
5390            if auto_resume_count >= 2 {
5391                continue;
5392            }
5393            let automation = self.get_automation_v2(&run.automation_id).await;
5394            let automation = match automation.or(run.automation_snapshot.clone()) {
5395                Some(a) => a,
5396                None => continue,
5397            };
5398            let has_repairable_nodes = automation.flow.nodes.iter().any(|node| {
5399                if run.checkpoint.completed_nodes.contains(&node.node_id) {
5400                    return false;
5401                }
5402                if run.checkpoint.node_outputs.contains_key(&node.node_id) {
5403                    let status = run.checkpoint.node_outputs[&node.node_id]
5404                        .get("status")
5405                        .and_then(Value::as_str)
5406                        .unwrap_or_default()
5407                        .to_ascii_lowercase();
5408                    if status != "needs_repair" {
5409                        return false;
5410                    }
5411                } else {
5412                    return false;
5413                }
5414                let attempts = run
5415                    .checkpoint
5416                    .node_attempts
5417                    .get(&node.node_id)
5418                    .copied()
5419                    .unwrap_or(0);
5420                let max_attempts = automation_node_max_attempts(node);
5421                attempts < max_attempts
5422            });
5423            if !has_repairable_nodes {
5424                continue;
5425            }
5426            if self
5427                .update_automation_v2_run(&run.run_id, |row| {
5428                    row.status = AutomationRunStatus::Queued;
5429                    row.pause_reason = None;
5430                    row.detail = None;
5431                    row.stop_kind = None;
5432                    row.stop_reason = None;
5433                    automation::record_automation_lifecycle_event(
5434                        row,
5435                        "run_auto_resumed",
5436                        Some("auto_resume_after_stale_reap".to_string()),
5437                        None,
5438                    );
5439                })
5440                .await
5441                .is_some()
5442            {
5443                resumed += 1;
5444            }
5445        }
5446        resumed
5447    }
5448
5449    pub fn is_automation_scheduler_stopping(&self) -> bool {
5450        self.automation_scheduler_stopping.load(Ordering::Relaxed)
5451    }
5452
5453    pub fn set_automation_scheduler_stopping(&self, stopping: bool) {
5454        self.automation_scheduler_stopping
5455            .store(stopping, Ordering::Relaxed);
5456    }
5457
5458    pub async fn fail_running_automation_runs_for_shutdown(&self) -> usize {
5459        let run_ids = self
5460            .automation_v2_runs
5461            .read()
5462            .await
5463            .values()
5464            .filter(|run| matches!(run.status, AutomationRunStatus::Running))
5465            .map(|run| run.run_id.clone())
5466            .collect::<Vec<_>>();
5467        let mut failed = 0usize;
5468        for run_id in run_ids {
5469            let detail = "automation run stopped during server shutdown".to_string();
5470            if self
5471                .update_automation_v2_run(&run_id, |row| {
5472                    row.status = AutomationRunStatus::Failed;
5473                    row.detail = Some(detail.clone());
5474                    row.stop_kind = Some(AutomationStopKind::Shutdown);
5475                    row.stop_reason = Some(detail.clone());
5476                    automation::record_automation_lifecycle_event(
5477                        row,
5478                        "run_failed_shutdown",
5479                        Some(detail.clone()),
5480                        Some(AutomationStopKind::Shutdown),
5481                    );
5482                })
5483                .await
5484                .is_some()
5485            {
5486                failed += 1;
5487            }
5488        }
5489        failed
5490    }
5491
5492    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
5493        let run_id = self
5494            .automation_v2_runs
5495            .read()
5496            .await
5497            .values()
5498            .filter(|row| row.status == AutomationRunStatus::Queued)
5499            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
5500            .map(|row| row.run_id.clone())?;
5501        self.claim_specific_automation_v2_run(&run_id).await
5502    }
5503    pub async fn claim_specific_automation_v2_run(
5504        &self,
5505        run_id: &str,
5506    ) -> Option<AutomationV2RunRecord> {
5507        let (automation_snapshot, previous_status) = {
5508            let mut guard = self.automation_v2_runs.write().await;
5509            let run = guard.get_mut(run_id)?;
5510            if run.status != AutomationRunStatus::Queued {
5511                return None;
5512            }
5513            (run.automation_snapshot.clone(), run.status.clone())
5514        };
5515        let runtime_context_required = automation_snapshot
5516            .as_ref()
5517            .map(crate::automation_v2::types::AutomationV2Spec::requires_runtime_context)
5518            .unwrap_or(false);
5519        let runtime_context = match automation_snapshot.as_ref() {
5520            Some(automation) => self
5521                .automation_v2_effective_runtime_context(
5522                    automation,
5523                    automation
5524                        .runtime_context_materialization()
5525                        .or_else(|| automation.approved_plan_runtime_context_materialization()),
5526                )
5527                .await
5528                .ok()
5529                .flatten(),
5530            None => None,
5531        };
5532        if runtime_context_required && runtime_context.is_none() {
5533            let mut guard = self.automation_v2_runs.write().await;
5534            let run = guard.get_mut(run_id)?;
5535            if run.status != AutomationRunStatus::Queued {
5536                return None;
5537            }
5538            let previous_status = run.status.clone();
5539            let now = now_ms();
5540            run.status = AutomationRunStatus::Failed;
5541            run.updated_at_ms = now;
5542            run.finished_at_ms.get_or_insert(now);
5543            run.scheduler = None;
5544            run.detail = Some("runtime context partition missing for automation run".to_string());
5545            let claimed = run.clone();
5546            drop(guard);
5547            self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
5548                .await;
5549            let _ = self.persist_automation_v2_runs().await;
5550            return None;
5551        }
5552
5553        let mut guard = self.automation_v2_runs.write().await;
5554        let run = guard.get_mut(run_id)?;
5555        if run.status != AutomationRunStatus::Queued {
5556            return None;
5557        }
5558        let now = now_ms();
5559        run.runtime_context = runtime_context;
5560        run.status = AutomationRunStatus::Running;
5561        run.updated_at_ms = now;
5562        run.started_at_ms.get_or_insert(now);
5563        run.scheduler = None;
5564        let claimed = run.clone();
5565        drop(guard);
5566        self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
5567            .await;
5568        let _ = self.persist_automation_v2_runs().await;
5569        Some(claimed)
5570    }
5571    pub async fn update_automation_v2_run(
5572        &self,
5573        run_id: &str,
5574        update: impl FnOnce(&mut AutomationV2RunRecord),
5575    ) -> Option<AutomationV2RunRecord> {
5576        let mut guard = self.automation_v2_runs.write().await;
5577        let run = guard.get_mut(run_id)?;
5578        let previous_status = run.status.clone();
5579        update(run);
5580        if run.status != AutomationRunStatus::Queued {
5581            run.scheduler = None;
5582        }
5583        run.updated_at_ms = now_ms();
5584        if matches!(
5585            run.status,
5586            AutomationRunStatus::Completed
5587                | AutomationRunStatus::Blocked
5588                | AutomationRunStatus::Failed
5589                | AutomationRunStatus::Cancelled
5590        ) {
5591            run.finished_at_ms.get_or_insert_with(now_ms);
5592        }
5593        let out = run.clone();
5594        drop(guard);
5595        self.sync_automation_scheduler_for_run_transition(previous_status, &out)
5596            .await;
5597        let _ = self.persist_automation_v2_runs().await;
5598        let _ = self.persist_automation_v2_run_status_json(&out).await;
5599        if matches!(
5600            out.status,
5601            AutomationRunStatus::Completed
5602                | AutomationRunStatus::Blocked
5603                | AutomationRunStatus::Failed
5604                | AutomationRunStatus::Cancelled
5605        ) {
5606            let _ = self
5607                .finalize_terminal_automation_v2_run_learning(&out)
5608                .await;
5609        }
5610        Some(out)
5611    }
5612
5613    async fn persist_automation_v2_run_status_json(
5614        &self,
5615        run: &AutomationV2RunRecord,
5616    ) -> anyhow::Result<()> {
5617        let default_workspace = self.workspace_index.snapshot().await.root.clone();
5618        let automation = run.automation_snapshot.as_ref();
5619        let workspace_root = if let Some(ref a) = automation {
5620            if let Some(ref wr) = a.workspace_root {
5621                if !wr.trim().is_empty() {
5622                    wr.trim().to_string()
5623                } else {
5624                    a.metadata
5625                        .as_ref()
5626                        .and_then(|m| m.get("workspace_root"))
5627                        .and_then(Value::as_str)
5628                        .map(str::to_string)
5629                        .unwrap_or_else(|| default_workspace.clone())
5630                }
5631            } else {
5632                a.metadata
5633                    .as_ref()
5634                    .and_then(|m| m.get("workspace_root"))
5635                    .and_then(Value::as_str)
5636                    .map(str::to_string)
5637                    .unwrap_or_else(|| default_workspace.clone())
5638            }
5639        } else {
5640            default_workspace
5641        };
5642        let run_dir = PathBuf::from(&workspace_root)
5643            .join(".tandem")
5644            .join("runs")
5645            .join(&run.run_id);
5646        let status_path = run_dir.join("status.json");
5647        let status_json = json!({
5648            "run_id": run.run_id,
5649            "automation_id": run.automation_id,
5650            "status": run.status,
5651            "detail": run.detail,
5652            "completed_nodes": run.checkpoint.completed_nodes,
5653            "pending_nodes": run.checkpoint.pending_nodes,
5654            "blocked_nodes": run.checkpoint.blocked_nodes,
5655            "node_attempts": run.checkpoint.node_attempts,
5656            "last_failure": run.checkpoint.last_failure,
5657            "learning_summary": run.learning_summary,
5658            "updated_at_ms": run.updated_at_ms,
5659        });
5660        fs::create_dir_all(&run_dir).await?;
5661        fs::write(&status_path, serde_json::to_string_pretty(&status_json)?).await?;
5662        Ok(())
5663    }
5664
5665    pub async fn set_automation_v2_run_scheduler_metadata(
5666        &self,
5667        run_id: &str,
5668        meta: automation::SchedulerMetadata,
5669    ) -> Option<AutomationV2RunRecord> {
5670        self.update_automation_v2_run(run_id, |row| {
5671            row.scheduler = Some(meta);
5672        })
5673        .await
5674    }
5675
5676    pub async fn clear_automation_v2_run_scheduler_metadata(
5677        &self,
5678        run_id: &str,
5679    ) -> Option<AutomationV2RunRecord> {
5680        self.update_automation_v2_run(run_id, |row| {
5681            row.scheduler = None;
5682        })
5683        .await
5684    }
5685
5686    pub async fn add_automation_v2_session(
5687        &self,
5688        run_id: &str,
5689        session_id: &str,
5690    ) -> Option<AutomationV2RunRecord> {
5691        let updated = self
5692            .update_automation_v2_run(run_id, |row| {
5693                if !row.active_session_ids.iter().any(|id| id == session_id) {
5694                    row.active_session_ids.push(session_id.to_string());
5695                }
5696                row.latest_session_id = Some(session_id.to_string());
5697            })
5698            .await;
5699        self.automation_v2_session_runs
5700            .write()
5701            .await
5702            .insert(session_id.to_string(), run_id.to_string());
5703        updated
5704    }
5705
5706    pub async fn set_automation_v2_session_mcp_servers(
5707        &self,
5708        session_id: &str,
5709        servers: Vec<String>,
5710    ) {
5711        if servers.is_empty() {
5712            self.automation_v2_session_mcp_servers
5713                .write()
5714                .await
5715                .remove(session_id);
5716        } else {
5717            self.automation_v2_session_mcp_servers
5718                .write()
5719                .await
5720                .insert(session_id.to_string(), servers);
5721        }
5722    }
5723
5724    pub async fn clear_automation_v2_session_mcp_servers(&self, session_id: &str) {
5725        self.automation_v2_session_mcp_servers
5726            .write()
5727            .await
5728            .remove(session_id);
5729    }
5730
5731    pub async fn clear_automation_v2_session(
5732        &self,
5733        run_id: &str,
5734        session_id: &str,
5735    ) -> Option<AutomationV2RunRecord> {
5736        self.automation_v2_session_runs
5737            .write()
5738            .await
5739            .remove(session_id);
5740        self.update_automation_v2_run(run_id, |row| {
5741            row.active_session_ids.retain(|id| id != session_id);
5742        })
5743        .await
5744    }
5745
5746    pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
5747        let mut guard = self.automation_v2_session_runs.write().await;
5748        for session_id in session_ids {
5749            guard.remove(session_id);
5750        }
5751        let mut mcp_guard = self.automation_v2_session_mcp_servers.write().await;
5752        for session_id in session_ids {
5753            mcp_guard.remove(session_id);
5754        }
5755    }
5756
5757    pub async fn add_automation_v2_instance(
5758        &self,
5759        run_id: &str,
5760        instance_id: &str,
5761    ) -> Option<AutomationV2RunRecord> {
5762        self.update_automation_v2_run(run_id, |row| {
5763            if !row.active_instance_ids.iter().any(|id| id == instance_id) {
5764                row.active_instance_ids.push(instance_id.to_string());
5765            }
5766        })
5767        .await
5768    }
5769
5770    pub async fn clear_automation_v2_instance(
5771        &self,
5772        run_id: &str,
5773        instance_id: &str,
5774    ) -> Option<AutomationV2RunRecord> {
5775        self.update_automation_v2_run(run_id, |row| {
5776            row.active_instance_ids.retain(|id| id != instance_id);
5777        })
5778        .await
5779    }
5780
5781    pub async fn apply_provider_usage_to_runs(
5782        &self,
5783        session_id: &str,
5784        prompt_tokens: u64,
5785        completion_tokens: u64,
5786        total_tokens: u64,
5787    ) {
5788        if let Some(policy) = self.routine_session_policy(session_id).await {
5789            let rate = self.token_cost_per_1k_usd.max(0.0);
5790            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
5791            let mut guard = self.routine_runs.write().await;
5792            if let Some(run) = guard.get_mut(&policy.run_id) {
5793                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
5794                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
5795                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
5796                run.estimated_cost_usd += delta_cost;
5797                run.updated_at_ms = now_ms();
5798            }
5799            drop(guard);
5800            let _ = self.persist_routine_runs().await;
5801        }
5802
5803        let maybe_v2_run_id = self
5804            .automation_v2_session_runs
5805            .read()
5806            .await
5807            .get(session_id)
5808            .cloned();
5809        if let Some(run_id) = maybe_v2_run_id {
5810            let rate = self.token_cost_per_1k_usd.max(0.0);
5811            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
5812            let mut guard = self.automation_v2_runs.write().await;
5813            if let Some(run) = guard.get_mut(&run_id) {
5814                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
5815                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
5816                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
5817                run.estimated_cost_usd += delta_cost;
5818                run.updated_at_ms = now_ms();
5819            }
5820            drop(guard);
5821            let _ = self.persist_automation_v2_runs().await;
5822        }
5823    }
5824
5825    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
5826        let mut fired = Vec::new();
5827        let mut guard = self.automations_v2.write().await;
5828        for automation in guard.values_mut() {
5829            if automation.status != AutomationV2Status::Active {
5830                continue;
5831            }
5832            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
5833                automation.next_fire_at_ms =
5834                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
5835                continue;
5836            };
5837            if now_ms < next_fire_at_ms {
5838                continue;
5839            }
5840            let run_count =
5841                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
5842            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
5843            automation.next_fire_at_ms = next;
5844            automation.last_fired_at_ms = Some(now_ms);
5845            for _ in 0..run_count {
5846                fired.push(automation.automation_id.clone());
5847            }
5848        }
5849        drop(guard);
5850        let _ = self.persist_automations_v2().await;
5851        fired
5852    }
5853
5854    /// Evaluate watch conditions for all active automations and return the IDs of
5855    /// automations whose conditions are met, along with a human-readable trigger reason
5856    /// and the handoff that triggered it (if any).
5857    ///
5858    /// An automation is skipped if it already has a `Queued` or `Running` run (dedup).
5859    pub async fn evaluate_automation_v2_watches(
5860        &self,
5861    ) -> Vec<(
5862        String,
5863        String,
5864        Option<crate::automation_v2::types::HandoffArtifact>,
5865    )> {
5866        use crate::automation_v2::types::{AutomationRunStatus, WatchCondition};
5867
5868        // Snapshot of automations that have watch conditions and are Active.
5869        let candidates: Vec<crate::automation_v2::types::AutomationV2Spec> = {
5870            let guard = self.automations_v2.read().await;
5871            guard
5872                .values()
5873                .filter(|a| {
5874                    a.status == crate::automation_v2::types::AutomationV2Status::Active
5875                        && a.has_watch_conditions()
5876                })
5877                .cloned()
5878                .collect()
5879        };
5880
5881        // Snapshot active run statuses to implement dedup.
5882        let active_automation_ids: std::collections::HashSet<String> = {
5883            let runs = self.automation_v2_runs.read().await;
5884            runs.values()
5885                .filter(|r| {
5886                    matches!(
5887                        r.status,
5888                        AutomationRunStatus::Queued | AutomationRunStatus::Running
5889                    )
5890                })
5891                .map(|r| r.automation_id.clone())
5892                .collect()
5893        };
5894
5895        let workspace_root = self.workspace_index.snapshot().await.root;
5896        let mut results = Vec::new();
5897
5898        'outer: for automation in candidates {
5899            // Dedup: skip if already queued or running.
5900            if active_automation_ids.contains(&automation.automation_id) {
5901                continue;
5902            }
5903
5904            let handoff_cfg = automation.effective_handoff_config();
5905            let approved_dir =
5906                std::path::Path::new(&workspace_root).join(&handoff_cfg.approved_dir);
5907
5908            for condition in &automation.watch_conditions {
5909                match condition {
5910                    WatchCondition::HandoffAvailable {
5911                        source_automation_id,
5912                        artifact_type,
5913                    } => {
5914                        if let Some(handoff) = find_matching_handoff(
5915                            &approved_dir,
5916                            &automation.automation_id,
5917                            source_automation_id.as_deref(),
5918                            artifact_type.as_deref(),
5919                        )
5920                        .await
5921                        {
5922                            let reason = format!(
5923                                "handoff `{}` of type `{}` from `{}` is available",
5924                                handoff.handoff_id,
5925                                handoff.artifact_type,
5926                                handoff.source_automation_id
5927                            );
5928                            results.push((automation.automation_id.clone(), reason, Some(handoff)));
5929                            continue 'outer;
5930                        }
5931                    }
5932                }
5933            }
5934        }
5935
5936        results
5937    }
5938
5939    /// Create a run triggered by a watch condition, recording the trigger reason
5940    /// and the consumed handoff ID (if any).
5941    pub async fn create_automation_v2_watch_run(
5942        &self,
5943        automation: &crate::automation_v2::types::AutomationV2Spec,
5944        trigger_reason: String,
5945        consumed_handoff_id: Option<String>,
5946    ) -> anyhow::Result<crate::automation_v2::types::AutomationV2RunRecord> {
5947        use crate::automation_v2::types::{
5948            AutomationRunCheckpoint, AutomationRunStatus, AutomationV2RunRecord,
5949        };
5950        let now = now_ms();
5951        let runtime_context = self
5952            .automation_v2_effective_runtime_context(
5953                automation,
5954                automation
5955                    .runtime_context_materialization()
5956                    .or_else(|| automation.approved_plan_runtime_context_materialization()),
5957            )
5958            .await?;
5959        let pending_nodes = automation
5960            .flow
5961            .nodes
5962            .iter()
5963            .map(|n| n.node_id.clone())
5964            .collect::<Vec<_>>();
5965        let run = AutomationV2RunRecord {
5966            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
5967            automation_id: automation.automation_id.clone(),
5968            tenant_context: TenantContext::local_implicit(),
5969            trigger_type: "watch_condition".to_string(),
5970            status: AutomationRunStatus::Queued,
5971            created_at_ms: now,
5972            updated_at_ms: now,
5973            started_at_ms: None,
5974            finished_at_ms: None,
5975            active_session_ids: Vec::new(),
5976            latest_session_id: None,
5977            active_instance_ids: Vec::new(),
5978            checkpoint: AutomationRunCheckpoint {
5979                completed_nodes: Vec::new(),
5980                pending_nodes,
5981                node_outputs: std::collections::HashMap::new(),
5982                node_attempts: std::collections::HashMap::new(),
5983                blocked_nodes: Vec::new(),
5984                awaiting_gate: None,
5985                gate_history: Vec::new(),
5986                lifecycle_history: Vec::new(),
5987                last_failure: None,
5988            },
5989            runtime_context,
5990            automation_snapshot: Some(automation.clone()),
5991            pause_reason: None,
5992            resume_reason: None,
5993            detail: None,
5994            stop_kind: None,
5995            stop_reason: None,
5996            prompt_tokens: 0,
5997            completion_tokens: 0,
5998            total_tokens: 0,
5999            estimated_cost_usd: 0.0,
6000            scheduler: None,
6001            trigger_reason: Some(trigger_reason),
6002            consumed_handoff_id,
6003            learning_summary: None,
6004        };
6005        self.automation_v2_runs
6006            .write()
6007            .await
6008            .insert(run.run_id.clone(), run.clone());
6009        self.persist_automation_v2_runs().await?;
6010        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
6011            .await
6012            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
6013        Ok(run)
6014    }
6015
6016    /// Deposit a handoff artifact into the workspace `inbox/` directory.
6017    /// If `auto_approve` is true (Phase 1 default), the file is immediately
6018    /// moved to `approved/` so the downstream watch condition can fire on the next tick.
6019    pub async fn deposit_automation_v2_handoff(
6020        &self,
6021        workspace_root: &str,
6022        handoff: &crate::automation_v2::types::HandoffArtifact,
6023        handoff_cfg: &crate::automation_v2::types::AutomationHandoffConfig,
6024    ) -> anyhow::Result<()> {
6025        use tokio::fs;
6026        let root = std::path::Path::new(workspace_root);
6027        let inbox = root.join(&handoff_cfg.inbox_dir);
6028        fs::create_dir_all(&inbox).await?;
6029
6030        let filename = handoff_filename(&handoff.handoff_id);
6031        let content = serde_json::to_string_pretty(handoff)?;
6032
6033        if handoff_cfg.auto_approve {
6034            // Write directly to approved/ (bypass inbox).
6035            let approved = root.join(&handoff_cfg.approved_dir);
6036            fs::create_dir_all(&approved).await?;
6037            fs::write(approved.join(&filename), &content).await?;
6038            tracing::info!(
6039                handoff_id = %handoff.handoff_id,
6040                target = %handoff.target_automation_id,
6041                artifact_type = %handoff.artifact_type,
6042                "handoff deposited (auto-approved)"
6043            );
6044        } else {
6045            fs::write(inbox.join(&filename), &content).await?;
6046            tracing::info!(
6047                handoff_id = %handoff.handoff_id,
6048                target = %handoff.target_automation_id,
6049                artifact_type = %handoff.artifact_type,
6050                "handoff deposited to inbox (awaiting approval)"
6051            );
6052        }
6053        Ok(())
6054    }
6055
6056    /// Atomically consume a handoff artifact: rename it from `approved/` to
6057    /// `archived/`, stamping the consuming run's metadata into the file for audit.
6058    /// Returns the updated artifact. This is idempotent — if the file is already
6059    /// gone from `approved/`, it returns `None` (race-safe).
6060    pub async fn consume_automation_v2_handoff(
6061        &self,
6062        workspace_root: &str,
6063        handoff: &crate::automation_v2::types::HandoffArtifact,
6064        handoff_cfg: &crate::automation_v2::types::AutomationHandoffConfig,
6065        consuming_run_id: &str,
6066        consuming_automation_id: &str,
6067    ) -> anyhow::Result<Option<crate::automation_v2::types::HandoffArtifact>> {
6068        use tokio::fs;
6069        let root = std::path::Path::new(workspace_root);
6070        let filename = handoff_filename(&handoff.handoff_id);
6071        let approved_path = root.join(&handoff_cfg.approved_dir).join(&filename);
6072
6073        if !approved_path.exists() {
6074            // Already consumed by another run (race).
6075            tracing::warn!(
6076                handoff_id = %handoff.handoff_id,
6077                "handoff already consumed or missing from approved dir"
6078            );
6079            return Ok(None);
6080        }
6081
6082        let archived_dir = root.join(&handoff_cfg.archived_dir);
6083        fs::create_dir_all(&archived_dir).await?;
6084
6085        let mut archived = handoff.clone();
6086        archived.consumed_by_run_id = Some(consuming_run_id.to_string());
6087        archived.consumed_by_automation_id = Some(consuming_automation_id.to_string());
6088        archived.consumed_at_ms = Some(now_ms());
6089
6090        // Write the updated envelope to archived/ first, then remove from approved/.
6091        // This ordering means we never lose the record even if the remove fails.
6092        let archived_path = archived_dir.join(&filename);
6093        fs::write(&archived_path, serde_json::to_string_pretty(&archived)?).await?;
6094        let _ = fs::remove_file(&approved_path).await;
6095
6096        tracing::info!(
6097            handoff_id = %handoff.handoff_id,
6098            run_id = %consuming_run_id,
6099            "handoff consumed and archived"
6100        );
6101        Ok(Some(archived))
6102    }
6103}
6104
6105/// Returns the canonical filename for a handoff artifact JSON file.
6106fn handoff_filename(handoff_id: &str) -> String {
6107    // Sanitize the ID so it's safe as a filename component.
6108    let safe: String = handoff_id
6109        .chars()
6110        .map(|c| {
6111            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
6112                c
6113            } else {
6114                '_'
6115            }
6116        })
6117        .collect();
6118    format!("{safe}.json")
6119}
6120
6121/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
6122/// optionally matches `source_automation_id` and `artifact_type` filters.
6123/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
6124///
6125/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
6126/// at 256 entries to prevent scheduler stall on large directories.
6127async fn find_matching_handoff(
6128    approved_dir: &std::path::Path,
6129    target_automation_id: &str,
6130    source_filter: Option<&str>,
6131    artifact_type_filter: Option<&str>,
6132) -> Option<crate::automation_v2::types::HandoffArtifact> {
6133    use tokio::fs;
6134    if !approved_dir.exists() {
6135        return None;
6136    }
6137
6138    let mut entries = match fs::read_dir(approved_dir).await {
6139        Ok(entries) => entries,
6140        Err(err) => {
6141            tracing::warn!("handoff watch: failed to read approved dir: {err}");
6142            return None;
6143        }
6144    };
6145
6146    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
6147    let mut scanned = 0usize;
6148
6149    while let Ok(Some(entry)) = entries.next_entry().await {
6150        if scanned >= 256 {
6151            break;
6152        }
6153        scanned += 1;
6154
6155        let path = entry.path();
6156        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
6157            continue;
6158        }
6159
6160        let raw = match fs::read_to_string(&path).await {
6161            Ok(raw) => raw,
6162            Err(_) => continue,
6163        };
6164        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
6165        {
6166            Ok(h) => h,
6167            Err(_) => continue,
6168        };
6169
6170        // Check target match (always required).
6171        if handoff.target_automation_id != target_automation_id {
6172            continue;
6173        }
6174        // Optional source filter.
6175        if let Some(src) = source_filter {
6176            if handoff.source_automation_id != src {
6177                continue;
6178            }
6179        }
6180        // Optional artifact type filter.
6181        if let Some(kind) = artifact_type_filter {
6182            if handoff.artifact_type != kind {
6183                continue;
6184            }
6185        }
6186        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
6187        if handoff.consumed_by_run_id.is_some() {
6188            continue;
6189        }
6190        candidates.push(handoff);
6191    }
6192
6193    // Return the oldest unmatched handoff so we process them in arrival order.
6194    candidates.into_iter().min_by_key(|h| h.created_at_ms)
6195}
6196
6197async fn build_channels_config(
6198    state: &AppState,
6199    channels: &ChannelsConfigFile,
6200) -> Option<ChannelsConfig> {
6201    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
6202        return None;
6203    }
6204    Some(ChannelsConfig {
6205        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
6206            bot_token: cfg.bot_token,
6207            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
6208            mention_only: cfg.mention_only,
6209            style_profile: cfg.style_profile,
6210            security_profile: cfg.security_profile,
6211        }),
6212        discord: channels.discord.clone().map(|cfg| DiscordConfig {
6213            bot_token: cfg.bot_token,
6214            guild_id: cfg.guild_id.and_then(|value| {
6215                let trimmed = value.trim().to_string();
6216                if trimmed.is_empty() {
6217                    None
6218                } else {
6219                    Some(trimmed)
6220                }
6221            }),
6222            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
6223            mention_only: cfg.mention_only,
6224            security_profile: cfg.security_profile,
6225        }),
6226        slack: channels.slack.clone().map(|cfg| SlackConfig {
6227            bot_token: cfg.bot_token,
6228            channel_id: cfg.channel_id,
6229            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
6230            mention_only: cfg.mention_only,
6231            security_profile: cfg.security_profile,
6232        }),
6233        server_base_url: state.server_base_url(),
6234        api_token: state.api_token().await.unwrap_or_default(),
6235        tool_policy: channels.tool_policy.clone(),
6236    })
6237}
6238
6239// channel config normalization moved to crate::config::channels
6240
6241fn is_valid_owner_repo_slug(value: &str) -> bool {
6242    let trimmed = value.trim();
6243    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
6244        return false;
6245    }
6246    let mut parts = trimmed.split('/');
6247    let Some(owner) = parts.next() else {
6248        return false;
6249    };
6250    let Some(repo) = parts.next() else {
6251        return false;
6252    };
6253    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
6254}
6255
6256fn legacy_automations_v2_path() -> Option<PathBuf> {
6257    config::paths::resolve_legacy_root_file_path("automations_v2.json")
6258        .filter(|path| path != &config::paths::resolve_automations_v2_path())
6259}
6260
6261fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
6262    let mut candidates = vec![active_path.clone()];
6263    if let Some(legacy_path) = legacy_automations_v2_path() {
6264        if !candidates.contains(&legacy_path) {
6265            candidates.push(legacy_path);
6266        }
6267    }
6268    let default_path = config::paths::default_state_dir().join("automations_v2.json");
6269    if !candidates.contains(&default_path) {
6270        candidates.push(default_path);
6271    }
6272    candidates
6273}
6274
6275async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
6276    let Some(legacy_path) = legacy_automations_v2_path() else {
6277        return Ok(());
6278    };
6279    if legacy_path == *active_path || !legacy_path.exists() {
6280        return Ok(());
6281    }
6282    fs::remove_file(&legacy_path).await?;
6283    tracing::info!(
6284        active_path = active_path.display().to_string(),
6285        removed_path = legacy_path.display().to_string(),
6286        "removed stale legacy automation v2 file after canonical persistence"
6287    );
6288    Ok(())
6289}
6290
6291fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
6292    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
6293        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
6294}
6295
6296fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
6297    let mut candidates = vec![active_path.clone()];
6298    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
6299        if !candidates.contains(&legacy_path) {
6300            candidates.push(legacy_path);
6301        }
6302    }
6303    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
6304    if !candidates.contains(&default_path) {
6305        candidates.push(default_path);
6306    }
6307    candidates
6308}
6309
6310fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
6311    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
6312        .unwrap_or_default()
6313}
6314
6315fn parse_automation_v2_file_strict(
6316    raw: &str,
6317) -> anyhow::Result<std::collections::HashMap<String, AutomationV2Spec>> {
6318    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
6319        .map_err(anyhow::Error::from)
6320}
6321
6322async fn write_string_atomic(path: &Path, payload: &str) -> anyhow::Result<()> {
6323    let parent = path.parent().unwrap_or_else(|| Path::new("."));
6324    let file_name = path
6325        .file_name()
6326        .and_then(|value| value.to_str())
6327        .unwrap_or("state.json");
6328    let temp_path = parent.join(format!(
6329        ".{file_name}.tmp-{}-{}",
6330        std::process::id(),
6331        now_ms()
6332    ));
6333    fs::write(&temp_path, payload).await?;
6334    if let Err(error) = fs::rename(&temp_path, path).await {
6335        let _ = fs::remove_file(&temp_path).await;
6336        return Err(error.into());
6337    }
6338    Ok(())
6339}
6340
6341fn parse_automation_v2_runs_file(
6342    raw: &str,
6343) -> std::collections::HashMap<String, AutomationV2RunRecord> {
6344    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
6345        .unwrap_or_default()
6346}
6347
6348fn parse_optimization_campaigns_file(
6349    raw: &str,
6350) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
6351    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
6352        .unwrap_or_default()
6353}
6354
6355fn parse_optimization_experiments_file(
6356    raw: &str,
6357) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
6358    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
6359        .unwrap_or_default()
6360}
6361
6362fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
6363    match schedule {
6364        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
6365        RoutineSchedule::Cron { .. } => None,
6366    }
6367}
6368
6369fn parse_timezone(timezone: &str) -> Option<Tz> {
6370    timezone.trim().parse::<Tz>().ok()
6371}
6372
6373fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
6374    let tz = parse_timezone(timezone)?;
6375    let schedule = Schedule::from_str(expression).ok()?;
6376    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
6377    let local_from = from_dt.with_timezone(&tz);
6378    let next = schedule.after(&local_from).next()?;
6379    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
6380}
6381
6382fn compute_next_schedule_fire_at_ms(
6383    schedule: &RoutineSchedule,
6384    timezone: &str,
6385    from_ms: u64,
6386) -> Option<u64> {
6387    let _ = parse_timezone(timezone)?;
6388    match schedule {
6389        RoutineSchedule::IntervalSeconds { seconds } => {
6390            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
6391        }
6392        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
6393    }
6394}
6395
6396fn compute_misfire_plan_for_schedule(
6397    now_ms: u64,
6398    next_fire_at_ms: u64,
6399    schedule: &RoutineSchedule,
6400    timezone: &str,
6401    policy: &RoutineMisfirePolicy,
6402) -> (u32, u64) {
6403    match schedule {
6404        RoutineSchedule::IntervalSeconds { .. } => {
6405            let Some(interval_ms) = routine_interval_ms(schedule) else {
6406                return (0, next_fire_at_ms);
6407            };
6408            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
6409        }
6410        RoutineSchedule::Cron { expression } => {
6411            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
6412                .unwrap_or_else(|| now_ms.saturating_add(60_000));
6413            match policy {
6414                RoutineMisfirePolicy::Skip => (0, aligned_next),
6415                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
6416                RoutineMisfirePolicy::CatchUp { max_runs } => {
6417                    let mut count = 0u32;
6418                    let mut cursor = next_fire_at_ms;
6419                    while cursor <= now_ms && count < *max_runs {
6420                        count = count.saturating_add(1);
6421                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
6422                            break;
6423                        };
6424                        if next <= cursor {
6425                            break;
6426                        }
6427                        cursor = next;
6428                    }
6429                    (count, aligned_next)
6430                }
6431            }
6432        }
6433    }
6434}
6435
6436fn compute_misfire_plan(
6437    now_ms: u64,
6438    next_fire_at_ms: u64,
6439    interval_ms: u64,
6440    policy: &RoutineMisfirePolicy,
6441) -> (u32, u64) {
6442    if now_ms < next_fire_at_ms || interval_ms == 0 {
6443        return (0, next_fire_at_ms);
6444    }
6445    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
6446    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
6447    match policy {
6448        RoutineMisfirePolicy::Skip => (0, aligned_next),
6449        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
6450        RoutineMisfirePolicy::CatchUp { max_runs } => {
6451            let count = missed.min(u64::from(*max_runs)) as u32;
6452            (count, aligned_next)
6453        }
6454    }
6455}
6456
6457fn auto_generated_agent_name(agent_id: &str) -> String {
6458    let names = [
6459        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
6460    ];
6461    let digest = Sha256::digest(agent_id.as_bytes());
6462    let idx = usize::from(digest[0]) % names.len();
6463    format!("{}-{:02x}", names[idx], digest[1])
6464}
6465
6466fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
6467    match schedule.schedule_type {
6468        AutomationV2ScheduleType::Manual => None,
6469        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
6470            seconds: schedule.interval_seconds.unwrap_or(60),
6471        }),
6472        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
6473            expression: schedule.cron_expression.clone().unwrap_or_default(),
6474        }),
6475    }
6476}
6477
6478fn automation_schedule_next_fire_at_ms(
6479    schedule: &AutomationV2Schedule,
6480    from_ms: u64,
6481) -> Option<u64> {
6482    let routine_schedule = schedule_from_automation_v2(schedule)?;
6483    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
6484}
6485
6486fn automation_schedule_due_count(
6487    schedule: &AutomationV2Schedule,
6488    now_ms: u64,
6489    next_fire_at_ms: u64,
6490) -> u32 {
6491    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
6492        return 0;
6493    };
6494    let (count, _) = compute_misfire_plan_for_schedule(
6495        now_ms,
6496        next_fire_at_ms,
6497        &routine_schedule,
6498        &schedule.timezone,
6499        &schedule.misfire_policy,
6500    );
6501    count.max(1)
6502}
6503
6504#[derive(Debug, Clone, PartialEq, Eq)]
6505pub enum RoutineExecutionDecision {
6506    Allowed,
6507    RequiresApproval { reason: String },
6508    Blocked { reason: String },
6509}
6510
6511pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
6512    let entrypoint = routine.entrypoint.to_ascii_lowercase();
6513    if entrypoint.starts_with("connector.")
6514        || entrypoint.starts_with("integration.")
6515        || entrypoint.contains("external")
6516    {
6517        return true;
6518    }
6519    routine
6520        .args
6521        .get("uses_external_integrations")
6522        .and_then(|v| v.as_bool())
6523        .unwrap_or(false)
6524        || routine
6525            .args
6526            .get("connector_id")
6527            .and_then(|v| v.as_str())
6528            .is_some()
6529}
6530
6531pub fn evaluate_routine_execution_policy(
6532    routine: &RoutineSpec,
6533    trigger_type: &str,
6534) -> RoutineExecutionDecision {
6535    if !routine_uses_external_integrations(routine) {
6536        return RoutineExecutionDecision::Allowed;
6537    }
6538    if !routine.external_integrations_allowed {
6539        return RoutineExecutionDecision::Blocked {
6540            reason: "external integrations are disabled by policy".to_string(),
6541        };
6542    }
6543    if routine.requires_approval {
6544        return RoutineExecutionDecision::RequiresApproval {
6545            reason: format!(
6546                "manual approval required before external side effects ({})",
6547                trigger_type
6548            ),
6549        };
6550    }
6551    RoutineExecutionDecision::Allowed
6552}
6553
6554fn is_valid_resource_key(key: &str) -> bool {
6555    let trimmed = key.trim();
6556    if trimmed.is_empty() {
6557        return false;
6558    }
6559    if trimmed == "swarm.active_tasks" {
6560        return true;
6561    }
6562    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
6563    if !allowed_prefix
6564        .iter()
6565        .any(|prefix| trimmed.starts_with(prefix))
6566    {
6567        return false;
6568    }
6569    !trimmed.contains("//")
6570}
6571
6572impl Deref for AppState {
6573    type Target = RuntimeState;
6574
6575    fn deref(&self) -> &Self::Target {
6576        self.runtime
6577            .get()
6578            .expect("runtime accessed before startup completion")
6579    }
6580}
6581
6582#[derive(Clone)]
6583struct ServerPromptContextHook {
6584    state: AppState,
6585}
6586
6587impl ServerPromptContextHook {
6588    fn new(state: AppState) -> Self {
6589        Self { state }
6590    }
6591
6592    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
6593        let paths = resolve_shared_paths().ok()?;
6594        MemoryDatabase::new(&paths.memory_db_path).await.ok()
6595    }
6596
6597    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
6598        let paths = resolve_shared_paths().ok()?;
6599        tandem_memory::MemoryManager::new(&paths.memory_db_path)
6600            .await
6601            .ok()
6602    }
6603
6604    fn hash_query(input: &str) -> String {
6605        let mut hasher = Sha256::new();
6606        hasher.update(input.as_bytes());
6607        format!("{:x}", hasher.finalize())
6608    }
6609
6610    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
6611        let mut out = vec!["<memory_context>".to_string()];
6612        let mut used = 0usize;
6613        for hit in hits {
6614            let text = hit
6615                .record
6616                .content
6617                .split_whitespace()
6618                .take(60)
6619                .collect::<Vec<_>>()
6620                .join(" ");
6621            let line = format!(
6622                "- [{:.3}] {} (source={}, run={})",
6623                hit.score, text, hit.record.source_type, hit.record.run_id
6624            );
6625            used = used.saturating_add(line.len());
6626            if used > 2200 {
6627                break;
6628            }
6629            out.push(line);
6630        }
6631        out.push("</memory_context>".to_string());
6632        out.join("\n")
6633    }
6634
6635    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
6636        chunk
6637            .metadata
6638            .as_ref()
6639            .and_then(|meta| meta.get("source_url"))
6640            .and_then(Value::as_str)
6641            .map(str::trim)
6642            .filter(|v| !v.is_empty())
6643            .map(ToString::to_string)
6644    }
6645
6646    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
6647        if let Some(path) = chunk
6648            .metadata
6649            .as_ref()
6650            .and_then(|meta| meta.get("relative_path"))
6651            .and_then(Value::as_str)
6652            .map(str::trim)
6653            .filter(|v| !v.is_empty())
6654        {
6655            return path.to_string();
6656        }
6657        chunk
6658            .source
6659            .strip_prefix("guide_docs:")
6660            .unwrap_or(chunk.source.as_str())
6661            .to_string()
6662    }
6663
6664    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
6665        let mut out = vec!["<docs_context>".to_string()];
6666        let mut used = 0usize;
6667        for hit in hits {
6668            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
6669            let path = Self::extract_docs_relative_path(&hit.chunk);
6670            let text = hit
6671                .chunk
6672                .content
6673                .split_whitespace()
6674                .take(70)
6675                .collect::<Vec<_>>()
6676                .join(" ");
6677            let line = format!(
6678                "- [{:.3}] {} (doc_path={}, source_url={})",
6679                hit.similarity, text, path, url
6680            );
6681            used = used.saturating_add(line.len());
6682            if used > 2800 {
6683                break;
6684            }
6685            out.push(line);
6686        }
6687        out.push("</docs_context>".to_string());
6688        out.join("\n")
6689    }
6690
6691    async fn search_embedded_docs(
6692        &self,
6693        query: &str,
6694        limit: usize,
6695    ) -> Vec<tandem_memory::types::MemorySearchResult> {
6696        let Some(manager) = self.open_memory_manager().await else {
6697            return Vec::new();
6698        };
6699        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
6700        manager
6701            .search(
6702                query,
6703                Some(MemoryTier::Global),
6704                None,
6705                None,
6706                Some(search_limit),
6707            )
6708            .await
6709            .unwrap_or_default()
6710            .into_iter()
6711            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
6712            .take(limit)
6713            .collect()
6714    }
6715
6716    fn should_skip_memory_injection(query: &str) -> bool {
6717        let trimmed = query.trim();
6718        if trimmed.is_empty() {
6719            return true;
6720        }
6721        let lower = trimmed.to_ascii_lowercase();
6722        let social = [
6723            "hi",
6724            "hello",
6725            "hey",
6726            "thanks",
6727            "thank you",
6728            "ok",
6729            "okay",
6730            "cool",
6731            "nice",
6732            "yo",
6733            "good morning",
6734            "good afternoon",
6735            "good evening",
6736        ];
6737        lower.len() <= 32 && social.contains(&lower.as_str())
6738    }
6739
6740    fn personality_preset_text(preset: &str) -> &'static str {
6741        match preset {
6742            "concise" => {
6743                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
6744            }
6745            "friendly" => {
6746                "Default style: friendly and supportive while staying technically rigorous and concrete."
6747            }
6748            "mentor" => {
6749                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
6750            }
6751            "critical" => {
6752                "Default style: critical and risk-first. Surface failure modes and assumptions early."
6753            }
6754            _ => {
6755                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
6756            }
6757        }
6758    }
6759
6760    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
6761        let allow_agent_override = agent_name
6762            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
6763            .unwrap_or(false);
6764        let legacy_bot_name = config
6765            .get("bot_name")
6766            .and_then(Value::as_str)
6767            .map(str::trim)
6768            .filter(|v| !v.is_empty());
6769        let bot_name = config
6770            .get("identity")
6771            .and_then(|identity| identity.get("bot"))
6772            .and_then(|bot| bot.get("canonical_name"))
6773            .and_then(Value::as_str)
6774            .map(str::trim)
6775            .filter(|v| !v.is_empty())
6776            .or(legacy_bot_name)
6777            .unwrap_or("Tandem");
6778
6779        let default_profile = config
6780            .get("identity")
6781            .and_then(|identity| identity.get("personality"))
6782            .and_then(|personality| personality.get("default"));
6783        let default_preset = default_profile
6784            .and_then(|profile| profile.get("preset"))
6785            .and_then(Value::as_str)
6786            .map(str::trim)
6787            .filter(|v| !v.is_empty())
6788            .unwrap_or("balanced");
6789        let default_custom = default_profile
6790            .and_then(|profile| profile.get("custom_instructions"))
6791            .and_then(Value::as_str)
6792            .map(str::trim)
6793            .filter(|v| !v.is_empty())
6794            .map(ToString::to_string);
6795        let legacy_persona = config
6796            .get("persona")
6797            .and_then(Value::as_str)
6798            .map(str::trim)
6799            .filter(|v| !v.is_empty())
6800            .map(ToString::to_string);
6801
6802        let per_agent_profile = if allow_agent_override {
6803            agent_name.and_then(|name| {
6804                config
6805                    .get("identity")
6806                    .and_then(|identity| identity.get("personality"))
6807                    .and_then(|personality| personality.get("per_agent"))
6808                    .and_then(|per_agent| per_agent.get(name))
6809            })
6810        } else {
6811            None
6812        };
6813        let preset = per_agent_profile
6814            .and_then(|profile| profile.get("preset"))
6815            .and_then(Value::as_str)
6816            .map(str::trim)
6817            .filter(|v| !v.is_empty())
6818            .unwrap_or(default_preset);
6819        let custom = per_agent_profile
6820            .and_then(|profile| profile.get("custom_instructions"))
6821            .and_then(Value::as_str)
6822            .map(str::trim)
6823            .filter(|v| !v.is_empty())
6824            .map(ToString::to_string)
6825            .or(default_custom)
6826            .or(legacy_persona);
6827
6828        let mut lines = vec![
6829            format!("You are {bot_name}, an AI assistant."),
6830            Self::personality_preset_text(preset).to_string(),
6831        ];
6832        if let Some(custom) = custom {
6833            lines.push(format!("Additional personality instructions: {custom}"));
6834        }
6835        Some(lines.join("\n"))
6836    }
6837
6838    fn build_memory_scope_block(
6839        session_id: &str,
6840        project_id: Option<&str>,
6841        workspace_root: Option<&str>,
6842    ) -> String {
6843        let mut lines = vec![
6844            "<memory_scope>".to_string(),
6845            format!("- current_session_id: {}", session_id),
6846        ];
6847        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
6848            lines.push(format!("- current_project_id: {}", project_id));
6849        }
6850        if let Some(workspace_root) = workspace_root
6851            .map(str::trim)
6852            .filter(|value| !value.is_empty())
6853        {
6854            lines.push(format!("- workspace_root: {}", workspace_root));
6855        }
6856        lines.push(
6857            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
6858                .to_string(),
6859        );
6860        lines.push(
6861            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
6862                .to_string(),
6863        );
6864        lines.push(
6865            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
6866                .to_string(),
6867        );
6868        lines.push("</memory_scope>".to_string());
6869        lines.join("\n")
6870    }
6871}
6872
6873impl PromptContextHook for ServerPromptContextHook {
6874    fn augment_provider_messages(
6875        &self,
6876        ctx: PromptContextHookContext,
6877        mut messages: Vec<ChatMessage>,
6878    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
6879        let this = self.clone();
6880        Box::pin(async move {
6881            // Startup can invoke prompt plumbing before RuntimeState is installed.
6882            // Never panic from context hooks; fail-open and continue without augmentation.
6883            if !this.state.is_ready() {
6884                return Ok(messages);
6885            }
6886            let run = this.state.run_registry.get(&ctx.session_id).await;
6887            let Some(run) = run else {
6888                return Ok(messages);
6889            };
6890            let config = this.state.config.get_effective_value().await;
6891            if let Some(identity_block) =
6892                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
6893            {
6894                messages.push(ChatMessage {
6895                    role: "system".to_string(),
6896                    content: identity_block,
6897                    attachments: Vec::new(),
6898                });
6899            }
6900            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
6901                messages.push(ChatMessage {
6902                    role: "system".to_string(),
6903                    content: Self::build_memory_scope_block(
6904                        &ctx.session_id,
6905                        session.project_id.as_deref(),
6906                        session.workspace_root.as_deref(),
6907                    ),
6908                    attachments: Vec::new(),
6909                });
6910            }
6911            let run_id = run.run_id;
6912            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
6913            let query = messages
6914                .iter()
6915                .rev()
6916                .find(|m| m.role == "user")
6917                .map(|m| m.content.clone())
6918                .unwrap_or_default();
6919            if query.trim().is_empty() {
6920                return Ok(messages);
6921            }
6922            if Self::should_skip_memory_injection(&query) {
6923                return Ok(messages);
6924            }
6925
6926            let docs_hits = this.search_embedded_docs(&query, 6).await;
6927            if !docs_hits.is_empty() {
6928                let docs_block = Self::build_docs_memory_block(&docs_hits);
6929                messages.push(ChatMessage {
6930                    role: "system".to_string(),
6931                    content: docs_block.clone(),
6932                    attachments: Vec::new(),
6933                });
6934                this.state.event_bus.publish(EngineEvent::new(
6935                    "memory.docs.context.injected",
6936                    json!({
6937                        "runID": run_id,
6938                        "sessionID": ctx.session_id,
6939                        "messageID": ctx.message_id,
6940                        "iteration": ctx.iteration,
6941                        "count": docs_hits.len(),
6942                        "tokenSizeApprox": docs_block.split_whitespace().count(),
6943                        "sourcePrefix": "guide_docs:"
6944                    }),
6945                ));
6946                return Ok(messages);
6947            }
6948
6949            let Some(db) = this.open_memory_db().await else {
6950                return Ok(messages);
6951            };
6952            let started = now_ms();
6953            let hits = db
6954                .search_global_memory(&user_id, &query, 8, None, None, None)
6955                .await
6956                .unwrap_or_default();
6957            let latency_ms = now_ms().saturating_sub(started);
6958            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
6959            this.state.event_bus.publish(EngineEvent::new(
6960                "memory.search.performed",
6961                json!({
6962                    "runID": run_id,
6963                    "sessionID": ctx.session_id,
6964                    "messageID": ctx.message_id,
6965                    "providerID": ctx.provider_id,
6966                    "modelID": ctx.model_id,
6967                    "iteration": ctx.iteration,
6968                    "queryHash": Self::hash_query(&query),
6969                    "resultCount": hits.len(),
6970                    "scoreMin": scores.iter().copied().reduce(f64::min),
6971                    "scoreMax": scores.iter().copied().reduce(f64::max),
6972                    "scores": scores,
6973                    "latencyMs": latency_ms,
6974                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
6975                }),
6976            ));
6977
6978            if hits.is_empty() {
6979                return Ok(messages);
6980            }
6981
6982            let memory_block = Self::build_memory_block(&hits);
6983            messages.push(ChatMessage {
6984                role: "system".to_string(),
6985                content: memory_block.clone(),
6986                attachments: Vec::new(),
6987            });
6988            this.state.event_bus.publish(EngineEvent::new(
6989                "memory.context.injected",
6990                json!({
6991                    "runID": run_id,
6992                    "sessionID": ctx.session_id,
6993                    "messageID": ctx.message_id,
6994                    "iteration": ctx.iteration,
6995                    "count": hits.len(),
6996                    "tokenSizeApprox": memory_block.split_whitespace().count(),
6997                }),
6998            ));
6999            Ok(messages)
7000        })
7001    }
7002}
7003
7004fn extract_event_session_id(properties: &Value) -> Option<String> {
7005    properties
7006        .get("sessionID")
7007        .or_else(|| properties.get("sessionId"))
7008        .or_else(|| properties.get("id"))
7009        .or_else(|| {
7010            properties
7011                .get("part")
7012                .and_then(|part| part.get("sessionID"))
7013        })
7014        .or_else(|| {
7015            properties
7016                .get("part")
7017                .and_then(|part| part.get("sessionId"))
7018        })
7019        .and_then(|v| v.as_str())
7020        .map(|s| s.to_string())
7021}
7022
7023fn extract_event_run_id(properties: &Value) -> Option<String> {
7024    properties
7025        .get("runID")
7026        .or_else(|| properties.get("run_id"))
7027        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
7028        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
7029        .and_then(|v| v.as_str())
7030        .map(|s| s.to_string())
7031}
7032
7033pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
7034    let part = properties.get("part")?;
7035    let part_type = part
7036        .get("type")
7037        .and_then(|v| v.as_str())
7038        .unwrap_or_default()
7039        .to_ascii_lowercase();
7040    if part_type != "tool"
7041        && part_type != "tool-invocation"
7042        && part_type != "tool-result"
7043        && part_type != "tool_invocation"
7044        && part_type != "tool_result"
7045    {
7046        return None;
7047    }
7048    let part_state = part
7049        .get("state")
7050        .and_then(|v| v.as_str())
7051        .unwrap_or_default()
7052        .to_ascii_lowercase();
7053    let has_result = part.get("result").is_some_and(|value| !value.is_null());
7054    let has_error = part
7055        .get("error")
7056        .and_then(|v| v.as_str())
7057        .is_some_and(|value| !value.trim().is_empty());
7058    // Skip transient "running" deltas to avoid persistence storms from streamed
7059    // tool-argument chunks; keep actionable/final updates.
7060    if part_state == "running" && !has_result && !has_error {
7061        return None;
7062    }
7063    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
7064    let message_id = part
7065        .get("messageID")
7066        .or_else(|| part.get("message_id"))
7067        .and_then(|v| v.as_str())?
7068        .to_string();
7069    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
7070    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
7071        if let Some(preview) = properties
7072            .get("toolCallDelta")
7073            .and_then(|delta| delta.get("parsedArgsPreview"))
7074            .cloned()
7075        {
7076            let preview_nonempty = !preview.is_null()
7077                && !preview.as_object().is_some_and(|value| value.is_empty())
7078                && !preview
7079                    .as_str()
7080                    .map(|value| value.trim().is_empty())
7081                    .unwrap_or(false);
7082            if preview_nonempty {
7083                args = preview;
7084            }
7085        }
7086        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
7087            if let Some(raw_preview) = properties
7088                .get("toolCallDelta")
7089                .and_then(|delta| delta.get("rawArgsPreview"))
7090                .and_then(|value| value.as_str())
7091                .map(str::trim)
7092                .filter(|value| !value.is_empty())
7093            {
7094                args = Value::String(raw_preview.to_string());
7095            }
7096        }
7097    }
7098    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
7099    {
7100        tracing::info!(
7101            message_id = %message_id,
7102            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
7103            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
7104            has_result = part.get("result").is_some(),
7105            has_error = part.get("error").is_some(),
7106            "persistable write tool part still has empty args"
7107        );
7108    }
7109    let result = part.get("result").cloned().filter(|value| !value.is_null());
7110    let error = part
7111        .get("error")
7112        .and_then(|v| v.as_str())
7113        .map(|value| value.to_string());
7114    Some((
7115        message_id,
7116        MessagePart::ToolInvocation {
7117            tool,
7118            args,
7119            result,
7120            error,
7121        },
7122    ))
7123}
7124
7125pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
7126    let session_id = extract_event_session_id(&event.properties)?;
7127    let run_id = extract_event_run_id(&event.properties);
7128    let key = format!("run/{session_id}/status");
7129
7130    let mut base = serde_json::Map::new();
7131    base.insert("sessionID".to_string(), Value::String(session_id));
7132    if let Some(run_id) = run_id {
7133        base.insert("runID".to_string(), Value::String(run_id));
7134    }
7135
7136    match event.event_type.as_str() {
7137        "session.run.started" => {
7138            base.insert("state".to_string(), Value::String("running".to_string()));
7139            base.insert("phase".to_string(), Value::String("run".to_string()));
7140            base.insert(
7141                "eventType".to_string(),
7142                Value::String("session.run.started".to_string()),
7143            );
7144            Some(StatusIndexUpdate {
7145                key,
7146                value: Value::Object(base),
7147            })
7148        }
7149        "session.run.finished" => {
7150            base.insert("state".to_string(), Value::String("finished".to_string()));
7151            base.insert("phase".to_string(), Value::String("run".to_string()));
7152            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
7153                base.insert("result".to_string(), Value::String(status.to_string()));
7154            }
7155            base.insert(
7156                "eventType".to_string(),
7157                Value::String("session.run.finished".to_string()),
7158            );
7159            Some(StatusIndexUpdate {
7160                key,
7161                value: Value::Object(base),
7162            })
7163        }
7164        "message.part.updated" => {
7165            let part = event.properties.get("part")?;
7166            let part_type = part.get("type").and_then(|v| v.as_str())?;
7167            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
7168            let (phase, tool_active) = match (part_type, part_state) {
7169                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
7170                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
7171                _ => return None,
7172            };
7173            base.insert("state".to_string(), Value::String("running".to_string()));
7174            base.insert("phase".to_string(), Value::String(phase.to_string()));
7175            base.insert("toolActive".to_string(), Value::Bool(tool_active));
7176            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
7177                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
7178            }
7179            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
7180                base.insert(
7181                    "toolState".to_string(),
7182                    Value::String(tool_state.to_string()),
7183                );
7184            }
7185            if let Some(tool_error) = part
7186                .get("error")
7187                .and_then(|v| v.as_str())
7188                .map(str::trim)
7189                .filter(|value| !value.is_empty())
7190            {
7191                base.insert(
7192                    "toolError".to_string(),
7193                    Value::String(tool_error.to_string()),
7194                );
7195            }
7196            if let Some(tool_call_id) = part
7197                .get("id")
7198                .and_then(|v| v.as_str())
7199                .map(str::trim)
7200                .filter(|value| !value.is_empty())
7201            {
7202                base.insert(
7203                    "toolCallID".to_string(),
7204                    Value::String(tool_call_id.to_string()),
7205                );
7206            }
7207            if let Some(args_preview) = part
7208                .get("args")
7209                .filter(|value| {
7210                    !value.is_null()
7211                        && !value.as_object().is_some_and(|map| map.is_empty())
7212                        && !value
7213                            .as_str()
7214                            .map(|text| text.trim().is_empty())
7215                            .unwrap_or(false)
7216                })
7217                .map(|value| truncate_text(&value.to_string(), 500))
7218            {
7219                base.insert(
7220                    "toolArgsPreview".to_string(),
7221                    Value::String(args_preview.to_string()),
7222                );
7223            }
7224            base.insert(
7225                "eventType".to_string(),
7226                Value::String("message.part.updated".to_string()),
7227            );
7228            Some(StatusIndexUpdate {
7229                key,
7230                value: Value::Object(base),
7231            })
7232        }
7233        _ => None,
7234    }
7235}
7236
7237pub async fn run_session_part_persister(state: AppState) {
7238    crate::app::tasks::run_session_part_persister(state).await
7239}
7240
7241pub async fn run_status_indexer(state: AppState) {
7242    crate::app::tasks::run_status_indexer(state).await
7243}
7244
7245pub async fn run_agent_team_supervisor(state: AppState) {
7246    crate::app::tasks::run_agent_team_supervisor(state).await
7247}
7248
7249pub async fn run_bug_monitor(state: AppState) {
7250    crate::app::tasks::run_bug_monitor(state).await
7251}
7252
7253pub async fn run_usage_aggregator(state: AppState) {
7254    crate::app::tasks::run_usage_aggregator(state).await
7255}
7256
7257pub async fn run_optimization_scheduler(state: AppState) {
7258    crate::app::tasks::run_optimization_scheduler(state).await
7259}
7260
7261pub async fn process_bug_monitor_event(
7262    state: &AppState,
7263    event: &EngineEvent,
7264    config: &BugMonitorConfig,
7265) -> anyhow::Result<BugMonitorIncidentRecord> {
7266    let submission =
7267        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
7268            .await?;
7269    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
7270        state,
7271        submission.repo.as_deref().unwrap_or_default(),
7272        submission.fingerprint.as_deref().unwrap_or_default(),
7273        submission.title.as_deref(),
7274        submission.detail.as_deref(),
7275        &submission.excerpt,
7276        3,
7277    )
7278    .await;
7279    let fingerprint = submission
7280        .fingerprint
7281        .clone()
7282        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
7283    let default_workspace_root = state.workspace_index.snapshot().await.root;
7284    let workspace_root = config
7285        .workspace_root
7286        .clone()
7287        .unwrap_or(default_workspace_root);
7288    let now = now_ms();
7289
7290    let existing = state
7291        .bug_monitor_incidents
7292        .read()
7293        .await
7294        .values()
7295        .find(|row| row.fingerprint == fingerprint)
7296        .cloned();
7297
7298    let mut incident = if let Some(mut row) = existing {
7299        row.occurrence_count = row.occurrence_count.saturating_add(1);
7300        row.updated_at_ms = now;
7301        row.last_seen_at_ms = Some(now);
7302        if row.excerpt.is_empty() {
7303            row.excerpt = submission.excerpt.clone();
7304        }
7305        row
7306    } else {
7307        BugMonitorIncidentRecord {
7308            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
7309            fingerprint: fingerprint.clone(),
7310            event_type: event.event_type.clone(),
7311            status: "queued".to_string(),
7312            repo: submission.repo.clone().unwrap_or_default(),
7313            workspace_root,
7314            title: submission
7315                .title
7316                .clone()
7317                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
7318            detail: submission.detail.clone(),
7319            excerpt: submission.excerpt.clone(),
7320            source: submission.source.clone(),
7321            run_id: submission.run_id.clone(),
7322            session_id: submission.session_id.clone(),
7323            correlation_id: submission.correlation_id.clone(),
7324            component: submission.component.clone(),
7325            level: submission.level.clone(),
7326            occurrence_count: 1,
7327            created_at_ms: now,
7328            updated_at_ms: now,
7329            last_seen_at_ms: Some(now),
7330            draft_id: None,
7331            triage_run_id: None,
7332            last_error: None,
7333            duplicate_summary: None,
7334            duplicate_matches: None,
7335            event_payload: Some(event.properties.clone()),
7336        }
7337    };
7338    state.put_bug_monitor_incident(incident.clone()).await?;
7339
7340    if !duplicate_matches.is_empty() {
7341        incident.status = "duplicate_suppressed".to_string();
7342        let duplicate_summary =
7343            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
7344        incident.duplicate_summary = Some(duplicate_summary.clone());
7345        incident.duplicate_matches = Some(duplicate_matches.clone());
7346        incident.updated_at_ms = now_ms();
7347        state.put_bug_monitor_incident(incident.clone()).await?;
7348        state.event_bus.publish(EngineEvent::new(
7349            "bug_monitor.incident.duplicate_suppressed",
7350            serde_json::json!({
7351                "incident_id": incident.incident_id,
7352                "fingerprint": incident.fingerprint,
7353                "eventType": incident.event_type,
7354                "status": incident.status,
7355                "duplicate_summary": duplicate_summary,
7356                "duplicate_matches": duplicate_matches,
7357            }),
7358        ));
7359        return Ok(incident);
7360    }
7361
7362    let draft = match state.submit_bug_monitor_draft(submission).await {
7363        Ok(draft) => draft,
7364        Err(error) => {
7365            incident.status = "draft_failed".to_string();
7366            incident.last_error = Some(truncate_text(&error.to_string(), 500));
7367            incident.updated_at_ms = now_ms();
7368            state.put_bug_monitor_incident(incident.clone()).await?;
7369            state.event_bus.publish(EngineEvent::new(
7370                "bug_monitor.incident.detected",
7371                serde_json::json!({
7372                    "incident_id": incident.incident_id,
7373                    "fingerprint": incident.fingerprint,
7374                    "eventType": incident.event_type,
7375                    "draft_id": incident.draft_id,
7376                    "triage_run_id": incident.triage_run_id,
7377                    "status": incident.status,
7378                    "detail": incident.last_error,
7379                }),
7380            ));
7381            return Ok(incident);
7382        }
7383    };
7384    incident.draft_id = Some(draft.draft_id.clone());
7385    incident.status = "draft_created".to_string();
7386    state.put_bug_monitor_incident(incident.clone()).await?;
7387
7388    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
7389        state.clone(),
7390        &draft.draft_id,
7391        true,
7392    )
7393    .await
7394    {
7395        Ok((updated_draft, _run_id, _deduped)) => {
7396            incident.triage_run_id = updated_draft.triage_run_id.clone();
7397            if incident.triage_run_id.is_some() {
7398                incident.status = "triage_queued".to_string();
7399            }
7400            incident.last_error = None;
7401        }
7402        Err(error) => {
7403            incident.status = "draft_created".to_string();
7404            incident.last_error = Some(truncate_text(&error.to_string(), 500));
7405        }
7406    }
7407
7408    if let Some(draft_id) = incident.draft_id.clone() {
7409        let latest_draft = state
7410            .get_bug_monitor_draft(&draft_id)
7411            .await
7412            .unwrap_or(draft.clone());
7413        match crate::bug_monitor_github::publish_draft(
7414            state,
7415            &draft_id,
7416            Some(&incident.incident_id),
7417            crate::bug_monitor_github::PublishMode::Auto,
7418        )
7419        .await
7420        {
7421            Ok(outcome) => {
7422                incident.status = outcome.action;
7423                incident.last_error = None;
7424            }
7425            Err(error) => {
7426                let detail = truncate_text(&error.to_string(), 500);
7427                incident.last_error = Some(detail.clone());
7428                let mut failed_draft = latest_draft;
7429                failed_draft.status = "github_post_failed".to_string();
7430                failed_draft.github_status = Some("github_post_failed".to_string());
7431                failed_draft.last_post_error = Some(detail.clone());
7432                let evidence_digest = failed_draft.evidence_digest.clone();
7433                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
7434                let _ = crate::bug_monitor_github::record_post_failure(
7435                    state,
7436                    &failed_draft,
7437                    Some(&incident.incident_id),
7438                    "auto_post",
7439                    evidence_digest.as_deref(),
7440                    &detail,
7441                )
7442                .await;
7443            }
7444        }
7445    }
7446
7447    incident.updated_at_ms = now_ms();
7448    state.put_bug_monitor_incident(incident.clone()).await?;
7449    state.event_bus.publish(EngineEvent::new(
7450        "bug_monitor.incident.detected",
7451        serde_json::json!({
7452            "incident_id": incident.incident_id,
7453            "fingerprint": incident.fingerprint,
7454            "eventType": incident.event_type,
7455            "draft_id": incident.draft_id,
7456            "triage_run_id": incident.triage_run_id,
7457            "status": incident.status,
7458        }),
7459    ));
7460    Ok(incident)
7461}
7462
7463pub fn sha256_hex(parts: &[&str]) -> String {
7464    let mut hasher = Sha256::new();
7465    for part in parts {
7466        hasher.update(part.as_bytes());
7467        hasher.update([0u8]);
7468    }
7469    format!("{:x}", hasher.finalize())
7470}
7471
7472fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
7473    matches!(status, AutomationRunStatus::Running)
7474}
7475
7476fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
7477    matches!(
7478        status,
7479        AutomationRunStatus::Running | AutomationRunStatus::Pausing
7480    )
7481}
7482
7483pub async fn run_routine_scheduler(state: AppState) {
7484    crate::app::tasks::run_routine_scheduler(state).await
7485}
7486
7487pub async fn run_routine_executor(state: AppState) {
7488    crate::app::tasks::run_routine_executor(state).await
7489}
7490
7491pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
7492    crate::app::routines::build_routine_prompt(state, run).await
7493}
7494
7495pub fn truncate_text(input: &str, max_len: usize) -> String {
7496    if input.len() <= max_len {
7497        return input.to_string();
7498    }
7499    let mut end = 0usize;
7500    for (idx, ch) in input.char_indices() {
7501        let next = idx + ch.len_utf8();
7502        if next > max_len {
7503            break;
7504        }
7505        end = next;
7506    }
7507    let mut out = input[..end].to_string();
7508    out.push_str("...<truncated>");
7509    out
7510}
7511
7512pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
7513    crate::app::routines::append_configured_output_artifacts(state, run).await
7514}
7515
7516pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
7517    let provider_id = config
7518        .get("default_provider")
7519        .and_then(|v| v.as_str())
7520        .map(str::trim)
7521        .filter(|v| !v.is_empty())?;
7522    let model_id = config
7523        .get("providers")
7524        .and_then(|v| v.get(provider_id))
7525        .and_then(|v| v.get("default_model"))
7526        .and_then(|v| v.as_str())
7527        .map(str::trim)
7528        .filter(|v| !v.is_empty())?;
7529    Some(ModelSpec {
7530        provider_id: provider_id.to_string(),
7531        model_id: model_id.to_string(),
7532    })
7533}
7534
7535pub async fn resolve_routine_model_spec_for_run(
7536    state: &AppState,
7537    run: &RoutineRunRecord,
7538) -> (Option<ModelSpec>, String) {
7539    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
7540}
7541
7542fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
7543    let mut out = Vec::new();
7544    let mut seen = std::collections::HashSet::new();
7545    for item in raw {
7546        let normalized = item.trim().to_string();
7547        if normalized.is_empty() {
7548            continue;
7549        }
7550        if seen.insert(normalized.clone()) {
7551            out.push(normalized);
7552        }
7553    }
7554    out
7555}
7556
7557#[cfg(not(feature = "browser"))]
7558impl AppState {
7559    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
7560        0
7561    }
7562
7563    pub async fn close_all_browser_sessions(&self) -> usize {
7564        0
7565    }
7566
7567    pub async fn browser_status(&self) -> serde_json::Value {
7568        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
7569    }
7570
7571    pub async fn browser_smoke_test(
7572        &self,
7573        _url: Option<String>,
7574    ) -> anyhow::Result<serde_json::Value> {
7575        anyhow::bail!("browser feature disabled")
7576    }
7577
7578    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
7579        anyhow::bail!("browser feature disabled")
7580    }
7581
7582    pub async fn browser_health_summary(&self) -> serde_json::Value {
7583        serde_json::json!({ "enabled": false })
7584    }
7585}
7586
7587pub mod automation;
7588pub use automation::*;
7589
7590#[cfg(test)]
7591mod tests;