Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
22use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
23use tandem_memory::db::MemoryDatabase;
24use tandem_providers::ChatMessage;
25use tandem_workflows::{
26    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
27    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
28    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
29};
30
31use crate::agent_teams::AgentTeamRuntime;
32use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
33use crate::automation_v2::types::*;
34use crate::bug_monitor::types::*;
35use crate::capability_resolver::CapabilityResolver;
36use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
37use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
38use crate::pack_manager::PackManager;
39use crate::preset_registry::PresetRegistry;
40use crate::routines::{errors::RoutineStoreError, types::*};
41use crate::runtime::{
42    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
43};
44use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
45use crate::util::{host::detect_host_runtime_context, time::now_ms};
46use crate::{
47    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
48    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
49    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
50    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
51    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
52    OptimizationPromotionDecisionKind,
53};
54
55#[derive(Clone)]
56pub struct AppState {
57    pub runtime: Arc<OnceLock<RuntimeState>>,
58    pub startup: Arc<RwLock<StartupState>>,
59    pub in_process_mode: Arc<AtomicBool>,
60    pub api_token: Arc<RwLock<Option<String>>>,
61    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
62    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
63    pub run_registry: RunRegistry,
64    pub run_stale_ms: u64,
65    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
66    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
67    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
68    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
69    pub shared_resources_path: PathBuf,
70    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
71    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
72    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
73    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
74    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
75    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
76    pub automation_scheduler_stopping: Arc<AtomicBool>,
77    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
78    pub workflow_plan_drafts:
79        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
80    pub workflow_planner_sessions: Arc<
81        RwLock<
82            std::collections::HashMap<
83                String,
84                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
85            >,
86        >,
87    >,
88    pub optimization_campaigns:
89        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
90    pub optimization_experiments:
91        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
92    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
93    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
94    pub bug_monitor_incidents:
95        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
96    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
97    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
98    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
99    pub workflows: Arc<RwLock<WorkflowRegistry>>,
100    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
101    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
102    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
103    pub routine_session_policies:
104        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
105    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
106    pub token_cost_per_1k_usd: f64,
107    pub routines_path: PathBuf,
108    pub routine_history_path: PathBuf,
109    pub routine_runs_path: PathBuf,
110    pub automations_v2_path: PathBuf,
111    pub automation_v2_runs_path: PathBuf,
112    pub optimization_campaigns_path: PathBuf,
113    pub optimization_experiments_path: PathBuf,
114    pub bug_monitor_config_path: PathBuf,
115    pub bug_monitor_drafts_path: PathBuf,
116    pub bug_monitor_incidents_path: PathBuf,
117    pub bug_monitor_posts_path: PathBuf,
118    pub external_actions_path: PathBuf,
119    pub workflow_runs_path: PathBuf,
120    pub workflow_planner_sessions_path: PathBuf,
121    pub workflow_hook_overrides_path: PathBuf,
122    pub agent_teams: AgentTeamRuntime,
123    pub web_ui_enabled: Arc<AtomicBool>,
124    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
125    pub server_base_url: Arc<std::sync::RwLock<String>>,
126    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
127    pub host_runtime_context: HostRuntimeContext,
128    pub pack_manager: Arc<PackManager>,
129    pub capability_resolver: Arc<CapabilityResolver>,
130    pub preset_registry: Arc<PresetRegistry>,
131}
132#[derive(Debug, Clone, Serialize, Deserialize, Default)]
133pub struct ChannelStatus {
134    pub enabled: bool,
135    pub connected: bool,
136    pub last_error: Option<String>,
137    pub active_sessions: u64,
138    pub meta: Value,
139}
140#[derive(Debug, Clone, Serialize, Deserialize, Default)]
141struct EffectiveAppConfig {
142    #[serde(default)]
143    pub channels: ChannelsConfigFile,
144    #[serde(default)]
145    pub web_ui: WebUiConfig,
146    #[serde(default)]
147    pub browser: tandem_core::BrowserConfig,
148    #[serde(default)]
149    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
150}
151
152#[derive(Default)]
153pub struct ChannelRuntime {
154    pub listeners: Option<tokio::task::JoinSet<()>>,
155    pub statuses: std::collections::HashMap<String, ChannelStatus>,
156}
157
158#[derive(Debug, Clone)]
159pub struct StatusIndexUpdate {
160    pub key: String,
161    pub value: Value,
162}
163
164impl AppState {
165    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
166        Self {
167            runtime: Arc::new(OnceLock::new()),
168            startup: Arc::new(RwLock::new(StartupState {
169                status: StartupStatus::Starting,
170                phase: "boot".to_string(),
171                started_at_ms: now_ms(),
172                attempt_id,
173                last_error: None,
174            })),
175            in_process_mode: Arc::new(AtomicBool::new(in_process)),
176            api_token: Arc::new(RwLock::new(None)),
177            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
178            managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
179            run_registry: RunRegistry::new(),
180            run_stale_ms: config::env::resolve_run_stale_ms(),
181            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
182            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
183            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
184            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
185            shared_resources_path: config::paths::resolve_shared_resources_path(),
186            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
187            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
188            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
189            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
190            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
191            automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
192                config::env::resolve_scheduler_max_concurrent_runs(),
193            ))),
194            automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
195            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
196            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
197            workflow_planner_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
198            optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
199            optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
200            bug_monitor_config: Arc::new(
201                RwLock::new(config::env::resolve_bug_monitor_env_config()),
202            ),
203            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
204            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
205            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
206            external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
207            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
208            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
209            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
210            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
211            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
212            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
213            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
214            routines_path: config::paths::resolve_routines_path(),
215            routine_history_path: config::paths::resolve_routine_history_path(),
216            routine_runs_path: config::paths::resolve_routine_runs_path(),
217            automations_v2_path: config::paths::resolve_automations_v2_path(),
218            automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
219            optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
220            optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
221            bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
222            bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
223            bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
224            bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
225            external_actions_path: config::paths::resolve_external_actions_path(),
226            workflow_runs_path: config::paths::resolve_workflow_runs_path(),
227            workflow_planner_sessions_path: config::paths::resolve_workflow_planner_sessions_path(),
228            workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
229            agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
230            web_ui_enabled: Arc::new(AtomicBool::new(false)),
231            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
232            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
233            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
234            host_runtime_context: detect_host_runtime_context(),
235            token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
236            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
237            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
238            preset_registry: Arc::new(PresetRegistry::new(
239                PackManager::default_root(),
240                resolve_shared_paths()
241                    .map(|paths| paths.canonical_root)
242                    .unwrap_or_else(|_| {
243                        dirs::home_dir()
244                            .unwrap_or_else(|| PathBuf::from("."))
245                            .join(".tandem")
246                    }),
247            )),
248        }
249    }
250
251    pub fn is_ready(&self) -> bool {
252        self.runtime.get().is_some()
253    }
254
255    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
256        for _ in 0..attempts {
257            if self.is_ready() {
258                return true;
259            }
260            let startup = self.startup_snapshot().await;
261            if matches!(startup.status, StartupStatus::Failed) {
262                return false;
263            }
264            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
265        }
266        self.is_ready()
267    }
268
269    pub fn mode_label(&self) -> &'static str {
270        if self.in_process_mode.load(Ordering::Relaxed) {
271            "in-process"
272        } else {
273            "sidecar"
274        }
275    }
276
277    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
278        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
279        if let Ok(mut guard) = self.web_ui_prefix.write() {
280            *guard = config::webui::normalize_web_ui_prefix(&prefix);
281        }
282    }
283
284    pub fn web_ui_enabled(&self) -> bool {
285        self.web_ui_enabled.load(Ordering::Relaxed)
286    }
287
288    pub fn web_ui_prefix(&self) -> String {
289        self.web_ui_prefix
290            .read()
291            .map(|v| v.clone())
292            .unwrap_or_else(|_| "/admin".to_string())
293    }
294
295    pub fn set_server_base_url(&self, base_url: String) {
296        if let Ok(mut guard) = self.server_base_url.write() {
297            *guard = base_url;
298        }
299    }
300
301    pub fn server_base_url(&self) -> String {
302        self.server_base_url
303            .read()
304            .map(|v| v.clone())
305            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
306    }
307
308    pub async fn api_token(&self) -> Option<String> {
309        self.api_token.read().await.clone()
310    }
311
312    pub async fn set_api_token(&self, token: Option<String>) {
313        *self.api_token.write().await = token;
314    }
315
316    pub async fn startup_snapshot(&self) -> StartupSnapshot {
317        let state = self.startup.read().await.clone();
318        StartupSnapshot {
319            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
320            status: state.status,
321            phase: state.phase,
322            started_at_ms: state.started_at_ms,
323            attempt_id: state.attempt_id,
324            last_error: state.last_error,
325        }
326    }
327
328    pub fn host_runtime_context(&self) -> HostRuntimeContext {
329        self.runtime
330            .get()
331            .map(|runtime| runtime.host_runtime_context.clone())
332            .unwrap_or_else(|| self.host_runtime_context.clone())
333    }
334
335    pub async fn set_phase(&self, phase: impl Into<String>) {
336        let mut startup = self.startup.write().await;
337        startup.phase = phase.into();
338    }
339
340    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
341        self.runtime
342            .set(runtime)
343            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
344        #[cfg(feature = "browser")]
345        self.register_browser_tools().await?;
346        self.tools
347            .register_tool(
348                "pack_builder".to_string(),
349                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
350            )
351            .await;
352        self.tools
353            .register_tool(
354                "mcp_list".to_string(),
355                Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
356            )
357            .await;
358        self.engine_loop
359            .set_spawn_agent_hook(std::sync::Arc::new(
360                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
361            ))
362            .await;
363        self.engine_loop
364            .set_tool_policy_hook(std::sync::Arc::new(
365                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
366            ))
367            .await;
368        self.engine_loop
369            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
370                self.clone(),
371            )))
372            .await;
373        let _ = self.load_shared_resources().await;
374        self.load_routines().await?;
375        let _ = self.load_routine_history().await;
376        let _ = self.load_routine_runs().await;
377        self.load_automations_v2().await?;
378        let _ = self.load_automation_v2_runs().await;
379        let _ = self.load_optimization_campaigns().await;
380        let _ = self.load_optimization_experiments().await;
381        let _ = self.load_bug_monitor_config().await;
382        let _ = self.load_bug_monitor_drafts().await;
383        let _ = self.load_bug_monitor_incidents().await;
384        let _ = self.load_bug_monitor_posts().await;
385        let _ = self.load_external_actions().await;
386        let _ = self.load_workflow_planner_sessions().await;
387        let _ = self.load_workflow_runs().await;
388        let _ = self.load_workflow_hook_overrides().await;
389        let _ = self.reload_workflows().await;
390        let workspace_root = self.workspace_index.snapshot().await.root;
391        let _ = self
392            .agent_teams
393            .ensure_loaded_for_workspace(&workspace_root)
394            .await;
395        let mut startup = self.startup.write().await;
396        startup.status = StartupStatus::Ready;
397        startup.phase = "ready".to_string();
398        startup.last_error = None;
399        Ok(())
400    }
401
402    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
403        let mut startup = self.startup.write().await;
404        startup.status = StartupStatus::Failed;
405        startup.phase = phase.into();
406        startup.last_error = Some(error.into());
407    }
408
409    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
410        let runtime = self.channels_runtime.lock().await;
411        runtime.statuses.clone()
412    }
413
414    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
415        let effective = self.config.get_effective_value().await;
416        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
417        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
418
419        let mut runtime = self.channels_runtime.lock().await;
420        if let Some(listeners) = runtime.listeners.as_mut() {
421            listeners.abort_all();
422        }
423        runtime.listeners = None;
424        runtime.statuses.clear();
425
426        let mut status_map = std::collections::HashMap::new();
427        status_map.insert(
428            "telegram".to_string(),
429            ChannelStatus {
430                enabled: parsed.channels.telegram.is_some(),
431                connected: false,
432                last_error: None,
433                active_sessions: 0,
434                meta: serde_json::json!({}),
435            },
436        );
437        status_map.insert(
438            "discord".to_string(),
439            ChannelStatus {
440                enabled: parsed.channels.discord.is_some(),
441                connected: false,
442                last_error: None,
443                active_sessions: 0,
444                meta: serde_json::json!({}),
445            },
446        );
447        status_map.insert(
448            "slack".to_string(),
449            ChannelStatus {
450                enabled: parsed.channels.slack.is_some(),
451                connected: false,
452                last_error: None,
453                active_sessions: 0,
454                meta: serde_json::json!({}),
455            },
456        );
457
458        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
459            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
460            runtime.listeners = Some(listeners);
461            for status in status_map.values_mut() {
462                if status.enabled {
463                    status.connected = true;
464                }
465            }
466        }
467
468        runtime.statuses = status_map.clone();
469        drop(runtime);
470
471        self.event_bus.publish(EngineEvent::new(
472            "channel.status.changed",
473            serde_json::json!({ "channels": status_map }),
474        ));
475        Ok(())
476    }
477
478    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
479        if !self.shared_resources_path.exists() {
480            return Ok(());
481        }
482        let raw = fs::read_to_string(&self.shared_resources_path).await?;
483        let parsed =
484            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
485                .unwrap_or_default();
486        let mut guard = self.shared_resources.write().await;
487        *guard = parsed;
488        Ok(())
489    }
490
491    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
492        if let Some(parent) = self.shared_resources_path.parent() {
493            fs::create_dir_all(parent).await?;
494        }
495        let payload = {
496            let guard = self.shared_resources.read().await;
497            serde_json::to_string_pretty(&*guard)?
498        };
499        fs::write(&self.shared_resources_path, payload).await?;
500        Ok(())
501    }
502
503    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
504        self.shared_resources.read().await.get(key).cloned()
505    }
506
507    pub async fn list_shared_resources(
508        &self,
509        prefix: Option<&str>,
510        limit: usize,
511    ) -> Vec<SharedResourceRecord> {
512        let limit = limit.clamp(1, 500);
513        let mut rows = self
514            .shared_resources
515            .read()
516            .await
517            .values()
518            .filter(|record| {
519                if let Some(prefix) = prefix {
520                    record.key.starts_with(prefix)
521                } else {
522                    true
523                }
524            })
525            .cloned()
526            .collect::<Vec<_>>();
527        rows.sort_by(|a, b| a.key.cmp(&b.key));
528        rows.truncate(limit);
529        rows
530    }
531
532    pub async fn put_shared_resource(
533        &self,
534        key: String,
535        value: Value,
536        if_match_rev: Option<u64>,
537        updated_by: String,
538        ttl_ms: Option<u64>,
539    ) -> Result<SharedResourceRecord, ResourceStoreError> {
540        if !is_valid_resource_key(&key) {
541            return Err(ResourceStoreError::InvalidKey { key });
542        }
543
544        let now = now_ms();
545        let mut guard = self.shared_resources.write().await;
546        let existing = guard.get(&key).cloned();
547
548        if let Some(expected) = if_match_rev {
549            let current = existing.as_ref().map(|row| row.rev);
550            if current != Some(expected) {
551                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
552                    key,
553                    expected_rev: Some(expected),
554                    current_rev: current,
555                }));
556            }
557        }
558
559        let next_rev = existing
560            .as_ref()
561            .map(|row| row.rev.saturating_add(1))
562            .unwrap_or(1);
563
564        let record = SharedResourceRecord {
565            key: key.clone(),
566            value,
567            rev: next_rev,
568            updated_at_ms: now,
569            updated_by,
570            ttl_ms,
571        };
572
573        let previous = guard.insert(key.clone(), record.clone());
574        drop(guard);
575
576        if let Err(error) = self.persist_shared_resources().await {
577            let mut rollback = self.shared_resources.write().await;
578            if let Some(previous) = previous {
579                rollback.insert(key, previous);
580            } else {
581                rollback.remove(&key);
582            }
583            return Err(ResourceStoreError::PersistFailed {
584                message: error.to_string(),
585            });
586        }
587
588        Ok(record)
589    }
590
591    pub async fn delete_shared_resource(
592        &self,
593        key: &str,
594        if_match_rev: Option<u64>,
595    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
596        if !is_valid_resource_key(key) {
597            return Err(ResourceStoreError::InvalidKey {
598                key: key.to_string(),
599            });
600        }
601
602        let mut guard = self.shared_resources.write().await;
603        let current = guard.get(key).cloned();
604        if let Some(expected) = if_match_rev {
605            let current_rev = current.as_ref().map(|row| row.rev);
606            if current_rev != Some(expected) {
607                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
608                    key: key.to_string(),
609                    expected_rev: Some(expected),
610                    current_rev,
611                }));
612            }
613        }
614
615        let removed = guard.remove(key);
616        drop(guard);
617
618        if let Err(error) = self.persist_shared_resources().await {
619            if let Some(record) = removed.clone() {
620                self.shared_resources
621                    .write()
622                    .await
623                    .insert(record.key.clone(), record);
624            }
625            return Err(ResourceStoreError::PersistFailed {
626                message: error.to_string(),
627            });
628        }
629
630        Ok(removed)
631    }
632
633    pub async fn load_routines(&self) -> anyhow::Result<()> {
634        if !self.routines_path.exists() {
635            return Ok(());
636        }
637        let raw = fs::read_to_string(&self.routines_path).await?;
638        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
639            Ok(parsed) => {
640                let mut guard = self.routines.write().await;
641                *guard = parsed;
642                Ok(())
643            }
644            Err(primary_err) => {
645                let backup_path = config::paths::sibling_backup_path(&self.routines_path);
646                if backup_path.exists() {
647                    let backup_raw = fs::read_to_string(&backup_path).await?;
648                    if let Ok(parsed_backup) = serde_json::from_str::<
649                        std::collections::HashMap<String, RoutineSpec>,
650                    >(&backup_raw)
651                    {
652                        let mut guard = self.routines.write().await;
653                        *guard = parsed_backup;
654                        return Ok(());
655                    }
656                }
657                Err(anyhow::anyhow!(
658                    "failed to parse routines store {}: {primary_err}",
659                    self.routines_path.display()
660                ))
661            }
662        }
663    }
664
665    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
666        if !self.routine_history_path.exists() {
667            return Ok(());
668        }
669        let raw = fs::read_to_string(&self.routine_history_path).await?;
670        let parsed = serde_json::from_str::<
671            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
672        >(&raw)
673        .unwrap_or_default();
674        let mut guard = self.routine_history.write().await;
675        *guard = parsed;
676        Ok(())
677    }
678
679    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
680        if !self.routine_runs_path.exists() {
681            return Ok(());
682        }
683        let raw = fs::read_to_string(&self.routine_runs_path).await?;
684        let parsed =
685            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
686                .unwrap_or_default();
687        let mut guard = self.routine_runs.write().await;
688        *guard = parsed;
689        Ok(())
690    }
691
692    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
693        if let Some(parent) = self.routines_path.parent() {
694            fs::create_dir_all(parent).await?;
695        }
696        let (payload, is_empty) = {
697            let guard = self.routines.read().await;
698            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
699        };
700        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
701            let existing_raw = fs::read_to_string(&self.routines_path)
702                .await
703                .unwrap_or_default();
704            let existing_has_rows = serde_json::from_str::<
705                std::collections::HashMap<String, RoutineSpec>,
706            >(&existing_raw)
707            .map(|rows| !rows.is_empty())
708            .unwrap_or(true);
709            if existing_has_rows {
710                return Err(anyhow::anyhow!(
711                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
712                    self.routines_path.display()
713                ));
714            }
715        }
716        let backup_path = config::paths::sibling_backup_path(&self.routines_path);
717        if self.routines_path.exists() {
718            let _ = fs::copy(&self.routines_path, &backup_path).await;
719        }
720        let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
721        fs::write(&tmp_path, payload).await?;
722        fs::rename(&tmp_path, &self.routines_path).await?;
723        Ok(())
724    }
725
726    pub async fn persist_routines(&self) -> anyhow::Result<()> {
727        self.persist_routines_inner(false).await
728    }
729
730    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
731        if let Some(parent) = self.routine_history_path.parent() {
732            fs::create_dir_all(parent).await?;
733        }
734        let payload = {
735            let guard = self.routine_history.read().await;
736            serde_json::to_string_pretty(&*guard)?
737        };
738        fs::write(&self.routine_history_path, payload).await?;
739        Ok(())
740    }
741
742    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
743        if let Some(parent) = self.routine_runs_path.parent() {
744            fs::create_dir_all(parent).await?;
745        }
746        let payload = {
747            let guard = self.routine_runs.read().await;
748            serde_json::to_string_pretty(&*guard)?
749        };
750        fs::write(&self.routine_runs_path, payload).await?;
751        Ok(())
752    }
753
754    pub async fn put_routine(
755        &self,
756        mut routine: RoutineSpec,
757    ) -> Result<RoutineSpec, RoutineStoreError> {
758        if routine.routine_id.trim().is_empty() {
759            return Err(RoutineStoreError::InvalidRoutineId {
760                routine_id: routine.routine_id,
761            });
762        }
763
764        routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
765        routine.output_targets = normalize_non_empty_list(routine.output_targets);
766
767        let now = now_ms();
768        let next_schedule_fire =
769            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
770                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
771                    detail: "invalid schedule or timezone".to_string(),
772                })?;
773        match routine.schedule {
774            RoutineSchedule::IntervalSeconds { seconds } => {
775                if seconds == 0 {
776                    return Err(RoutineStoreError::InvalidSchedule {
777                        detail: "interval_seconds must be > 0".to_string(),
778                    });
779                }
780                let _ = seconds;
781            }
782            RoutineSchedule::Cron { .. } => {}
783        }
784        if routine.next_fire_at_ms.is_none() {
785            routine.next_fire_at_ms = Some(next_schedule_fire);
786        }
787
788        let mut guard = self.routines.write().await;
789        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
790        drop(guard);
791
792        if let Err(error) = self.persist_routines().await {
793            let mut rollback = self.routines.write().await;
794            if let Some(previous) = previous {
795                rollback.insert(previous.routine_id.clone(), previous);
796            } else {
797                rollback.remove(&routine.routine_id);
798            }
799            return Err(RoutineStoreError::PersistFailed {
800                message: error.to_string(),
801            });
802        }
803
804        Ok(routine)
805    }
806
807    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
808        let mut rows = self
809            .routines
810            .read()
811            .await
812            .values()
813            .cloned()
814            .collect::<Vec<_>>();
815        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
816        rows
817    }
818
819    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
820        self.routines.read().await.get(routine_id).cloned()
821    }
822
823    pub async fn delete_routine(
824        &self,
825        routine_id: &str,
826    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
827        let mut guard = self.routines.write().await;
828        let removed = guard.remove(routine_id);
829        drop(guard);
830
831        let allow_empty_overwrite = self.routines.read().await.is_empty();
832        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
833            if let Some(removed) = removed.clone() {
834                self.routines
835                    .write()
836                    .await
837                    .insert(removed.routine_id.clone(), removed);
838            }
839            return Err(RoutineStoreError::PersistFailed {
840                message: error.to_string(),
841            });
842        }
843        Ok(removed)
844    }
845
846    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
847        let mut plans = Vec::new();
848        let mut guard = self.routines.write().await;
849        for routine in guard.values_mut() {
850            if routine.status != RoutineStatus::Active {
851                continue;
852            }
853            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
854                continue;
855            };
856            if now_ms < next_fire_at_ms {
857                continue;
858            }
859            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
860                now_ms,
861                next_fire_at_ms,
862                &routine.schedule,
863                &routine.timezone,
864                &routine.misfire_policy,
865            );
866            routine.next_fire_at_ms = Some(next_fire_at_ms);
867            if run_count == 0 {
868                continue;
869            }
870            plans.push(RoutineTriggerPlan {
871                routine_id: routine.routine_id.clone(),
872                run_count,
873                scheduled_at_ms: now_ms,
874                next_fire_at_ms,
875            });
876        }
877        drop(guard);
878        let _ = self.persist_routines().await;
879        plans
880    }
881
882    pub async fn mark_routine_fired(
883        &self,
884        routine_id: &str,
885        fired_at_ms: u64,
886    ) -> Option<RoutineSpec> {
887        let mut guard = self.routines.write().await;
888        let routine = guard.get_mut(routine_id)?;
889        routine.last_fired_at_ms = Some(fired_at_ms);
890        let updated = routine.clone();
891        drop(guard);
892        let _ = self.persist_routines().await;
893        Some(updated)
894    }
895
896    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
897        let mut history = self.routine_history.write().await;
898        history
899            .entry(event.routine_id.clone())
900            .or_default()
901            .push(event);
902        drop(history);
903        let _ = self.persist_routine_history().await;
904    }
905
906    pub async fn list_routine_history(
907        &self,
908        routine_id: &str,
909        limit: usize,
910    ) -> Vec<RoutineHistoryEvent> {
911        let limit = limit.clamp(1, 500);
912        let mut rows = self
913            .routine_history
914            .read()
915            .await
916            .get(routine_id)
917            .cloned()
918            .unwrap_or_default();
919        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
920        rows.truncate(limit);
921        rows
922    }
923
924    pub async fn create_routine_run(
925        &self,
926        routine: &RoutineSpec,
927        trigger_type: &str,
928        run_count: u32,
929        status: RoutineRunStatus,
930        detail: Option<String>,
931    ) -> RoutineRunRecord {
932        let now = now_ms();
933        let record = RoutineRunRecord {
934            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
935            routine_id: routine.routine_id.clone(),
936            trigger_type: trigger_type.to_string(),
937            run_count,
938            status,
939            created_at_ms: now,
940            updated_at_ms: now,
941            fired_at_ms: Some(now),
942            started_at_ms: None,
943            finished_at_ms: None,
944            requires_approval: routine.requires_approval,
945            approval_reason: None,
946            denial_reason: None,
947            paused_reason: None,
948            detail,
949            entrypoint: routine.entrypoint.clone(),
950            args: routine.args.clone(),
951            allowed_tools: routine.allowed_tools.clone(),
952            output_targets: routine.output_targets.clone(),
953            artifacts: Vec::new(),
954            active_session_ids: Vec::new(),
955            latest_session_id: None,
956            prompt_tokens: 0,
957            completion_tokens: 0,
958            total_tokens: 0,
959            estimated_cost_usd: 0.0,
960        };
961        self.routine_runs
962            .write()
963            .await
964            .insert(record.run_id.clone(), record.clone());
965        let _ = self.persist_routine_runs().await;
966        record
967    }
968
969    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
970        self.routine_runs.read().await.get(run_id).cloned()
971    }
972
973    pub async fn list_routine_runs(
974        &self,
975        routine_id: Option<&str>,
976        limit: usize,
977    ) -> Vec<RoutineRunRecord> {
978        let mut rows = self
979            .routine_runs
980            .read()
981            .await
982            .values()
983            .filter(|row| {
984                if let Some(id) = routine_id {
985                    row.routine_id == id
986                } else {
987                    true
988                }
989            })
990            .cloned()
991            .collect::<Vec<_>>();
992        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
993        rows.truncate(limit.clamp(1, 500));
994        rows
995    }
996
997    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
998        let mut guard = self.routine_runs.write().await;
999        let next_run_id = guard
1000            .values()
1001            .filter(|row| row.status == RoutineRunStatus::Queued)
1002            .min_by(|a, b| {
1003                a.created_at_ms
1004                    .cmp(&b.created_at_ms)
1005                    .then_with(|| a.run_id.cmp(&b.run_id))
1006            })
1007            .map(|row| row.run_id.clone())?;
1008        let now = now_ms();
1009        let row = guard.get_mut(&next_run_id)?;
1010        row.status = RoutineRunStatus::Running;
1011        row.updated_at_ms = now;
1012        row.started_at_ms = Some(now);
1013        let claimed = row.clone();
1014        drop(guard);
1015        let _ = self.persist_routine_runs().await;
1016        Some(claimed)
1017    }
1018
1019    pub async fn set_routine_session_policy(
1020        &self,
1021        session_id: String,
1022        run_id: String,
1023        routine_id: String,
1024        allowed_tools: Vec<String>,
1025    ) {
1026        let policy = RoutineSessionPolicy {
1027            session_id: session_id.clone(),
1028            run_id,
1029            routine_id,
1030            allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1031        };
1032        self.routine_session_policies
1033            .write()
1034            .await
1035            .insert(session_id, policy);
1036    }
1037
1038    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1039        self.routine_session_policies
1040            .read()
1041            .await
1042            .get(session_id)
1043            .cloned()
1044    }
1045
1046    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1047        self.routine_session_policies
1048            .write()
1049            .await
1050            .remove(session_id);
1051    }
1052
1053    pub async fn update_routine_run_status(
1054        &self,
1055        run_id: &str,
1056        status: RoutineRunStatus,
1057        reason: Option<String>,
1058    ) -> Option<RoutineRunRecord> {
1059        let mut guard = self.routine_runs.write().await;
1060        let row = guard.get_mut(run_id)?;
1061        row.status = status.clone();
1062        row.updated_at_ms = now_ms();
1063        match status {
1064            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1065            RoutineRunStatus::Running => {
1066                row.started_at_ms.get_or_insert_with(now_ms);
1067                if let Some(detail) = reason {
1068                    row.detail = Some(detail);
1069                }
1070            }
1071            RoutineRunStatus::Denied => row.denial_reason = reason,
1072            RoutineRunStatus::Paused => row.paused_reason = reason,
1073            RoutineRunStatus::Completed
1074            | RoutineRunStatus::Failed
1075            | RoutineRunStatus::Cancelled => {
1076                row.finished_at_ms = Some(now_ms());
1077                if let Some(detail) = reason {
1078                    row.detail = Some(detail);
1079                }
1080            }
1081            _ => {
1082                if let Some(detail) = reason {
1083                    row.detail = Some(detail);
1084                }
1085            }
1086        }
1087        let updated = row.clone();
1088        drop(guard);
1089        let _ = self.persist_routine_runs().await;
1090        Some(updated)
1091    }
1092
1093    pub async fn append_routine_run_artifact(
1094        &self,
1095        run_id: &str,
1096        artifact: RoutineRunArtifact,
1097    ) -> Option<RoutineRunRecord> {
1098        let mut guard = self.routine_runs.write().await;
1099        let row = guard.get_mut(run_id)?;
1100        row.updated_at_ms = now_ms();
1101        row.artifacts.push(artifact);
1102        let updated = row.clone();
1103        drop(guard);
1104        let _ = self.persist_routine_runs().await;
1105        Some(updated)
1106    }
1107
1108    pub async fn add_active_session_id(
1109        &self,
1110        run_id: &str,
1111        session_id: String,
1112    ) -> Option<RoutineRunRecord> {
1113        let mut guard = self.routine_runs.write().await;
1114        let row = guard.get_mut(run_id)?;
1115        if !row.active_session_ids.iter().any(|id| id == &session_id) {
1116            row.active_session_ids.push(session_id);
1117        }
1118        row.latest_session_id = row.active_session_ids.last().cloned();
1119        row.updated_at_ms = now_ms();
1120        let updated = row.clone();
1121        drop(guard);
1122        let _ = self.persist_routine_runs().await;
1123        Some(updated)
1124    }
1125
1126    pub async fn clear_active_session_id(
1127        &self,
1128        run_id: &str,
1129        session_id: &str,
1130    ) -> Option<RoutineRunRecord> {
1131        let mut guard = self.routine_runs.write().await;
1132        let row = guard.get_mut(run_id)?;
1133        row.active_session_ids.retain(|id| id != session_id);
1134        row.updated_at_ms = now_ms();
1135        let updated = row.clone();
1136        drop(guard);
1137        let _ = self.persist_routine_runs().await;
1138        Some(updated)
1139    }
1140
1141    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1142        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1143        let mut loaded_from_alternate = false;
1144        let mut migrated = false;
1145        let mut path_counts = Vec::new();
1146        let mut canonical_loaded = false;
1147        if self.automations_v2_path.exists() {
1148            let raw = fs::read_to_string(&self.automations_v2_path).await?;
1149            if raw.trim().is_empty() || raw.trim() == "{}" {
1150                path_counts.push((self.automations_v2_path.clone(), 0usize));
1151            } else {
1152                let parsed = parse_automation_v2_file(&raw);
1153                path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1154                canonical_loaded = !parsed.is_empty();
1155                merged = parsed;
1156            }
1157        } else {
1158            path_counts.push((self.automations_v2_path.clone(), 0usize));
1159        }
1160        if !canonical_loaded {
1161            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1162                if path == self.automations_v2_path {
1163                    continue;
1164                }
1165                if !path.exists() {
1166                    path_counts.push((path, 0usize));
1167                    continue;
1168                }
1169                let raw = fs::read_to_string(&path).await?;
1170                if raw.trim().is_empty() || raw.trim() == "{}" {
1171                    path_counts.push((path, 0usize));
1172                    continue;
1173                }
1174                let parsed = parse_automation_v2_file(&raw);
1175                path_counts.push((path.clone(), parsed.len()));
1176                if !parsed.is_empty() {
1177                    loaded_from_alternate = true;
1178                }
1179                for (automation_id, automation) in parsed {
1180                    match merged.get(&automation_id) {
1181                        Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1182                        _ => {
1183                            merged.insert(automation_id, automation);
1184                        }
1185                    }
1186                }
1187            }
1188        } else {
1189            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1190                if path == self.automations_v2_path {
1191                    continue;
1192                }
1193                if !path.exists() {
1194                    path_counts.push((path, 0usize));
1195                    continue;
1196                }
1197                let raw = fs::read_to_string(&path).await?;
1198                let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1199                    0usize
1200                } else {
1201                    parse_automation_v2_file(&raw).len()
1202                };
1203                path_counts.push((path, count));
1204            }
1205        }
1206        let active_path = self.automations_v2_path.display().to_string();
1207        let path_count_summary = path_counts
1208            .iter()
1209            .map(|(path, count)| format!("{}={count}", path.display()))
1210            .collect::<Vec<_>>();
1211        tracing::info!(
1212            active_path,
1213            canonical_loaded,
1214            path_counts = ?path_count_summary,
1215            merged_count = merged.len(),
1216            "loaded automation v2 definitions"
1217        );
1218        for automation in merged.values_mut() {
1219            migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1220        }
1221        *self.automations_v2.write().await = merged;
1222        if loaded_from_alternate || migrated {
1223            let _ = self.persist_automations_v2().await;
1224        } else if canonical_loaded {
1225            let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1226        }
1227        Ok(())
1228    }
1229
1230    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1231        let payload = {
1232            let guard = self.automations_v2.read().await;
1233            serde_json::to_string_pretty(&*guard)?
1234        };
1235        if let Some(parent) = self.automations_v2_path.parent() {
1236            fs::create_dir_all(parent).await?;
1237        }
1238        fs::write(&self.automations_v2_path, &payload).await?;
1239        let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1240        Ok(())
1241    }
1242
1243    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1244        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1245        let mut loaded_from_alternate = false;
1246        let mut path_counts = Vec::new();
1247        for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1248            if !path.exists() {
1249                path_counts.push((path, 0usize));
1250                continue;
1251            }
1252            let raw = fs::read_to_string(&path).await?;
1253            if raw.trim().is_empty() || raw.trim() == "{}" {
1254                path_counts.push((path, 0usize));
1255                continue;
1256            }
1257            let parsed = parse_automation_v2_runs_file(&raw);
1258            path_counts.push((path.clone(), parsed.len()));
1259            if path != self.automation_v2_runs_path {
1260                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1261            }
1262            for (run_id, run) in parsed {
1263                match merged.get(&run_id) {
1264                    Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1265                    _ => {
1266                        merged.insert(run_id, run);
1267                    }
1268                }
1269            }
1270        }
1271        let active_runs_path = self.automation_v2_runs_path.display().to_string();
1272        let run_path_count_summary = path_counts
1273            .iter()
1274            .map(|(path, count)| format!("{}={count}", path.display()))
1275            .collect::<Vec<_>>();
1276        tracing::info!(
1277            active_path = active_runs_path,
1278            path_counts = ?run_path_count_summary,
1279            merged_count = merged.len(),
1280            "loaded automation v2 runs"
1281        );
1282        *self.automation_v2_runs.write().await = merged;
1283        let recovered = self
1284            .recover_automation_definitions_from_run_snapshots()
1285            .await?;
1286        let automation_count = self.automations_v2.read().await.len();
1287        let run_count = self.automation_v2_runs.read().await.len();
1288        if automation_count == 0 && run_count > 0 {
1289            let active_automations_path = self.automations_v2_path.display().to_string();
1290            let active_runs_path = self.automation_v2_runs_path.display().to_string();
1291            tracing::warn!(
1292                active_automations_path,
1293                active_runs_path,
1294                run_count,
1295                "automation v2 definitions are empty while run history exists"
1296            );
1297        }
1298        if loaded_from_alternate || recovered > 0 {
1299            let _ = self.persist_automation_v2_runs().await;
1300        }
1301        Ok(())
1302    }
1303
1304    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1305        let payload = {
1306            let guard = self.automation_v2_runs.read().await;
1307            serde_json::to_string_pretty(&*guard)?
1308        };
1309        if let Some(parent) = self.automation_v2_runs_path.parent() {
1310            fs::create_dir_all(parent).await?;
1311        }
1312        fs::write(&self.automation_v2_runs_path, &payload).await?;
1313        Ok(())
1314    }
1315
1316    pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1317        if !self.optimization_campaigns_path.exists() {
1318            return Ok(());
1319        }
1320        let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1321        let parsed = parse_optimization_campaigns_file(&raw);
1322        *self.optimization_campaigns.write().await = parsed;
1323        Ok(())
1324    }
1325
1326    pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1327        let payload = {
1328            let guard = self.optimization_campaigns.read().await;
1329            serde_json::to_string_pretty(&*guard)?
1330        };
1331        if let Some(parent) = self.optimization_campaigns_path.parent() {
1332            fs::create_dir_all(parent).await?;
1333        }
1334        fs::write(&self.optimization_campaigns_path, payload).await?;
1335        Ok(())
1336    }
1337
1338    pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1339        if !self.optimization_experiments_path.exists() {
1340            return Ok(());
1341        }
1342        let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1343        let parsed = parse_optimization_experiments_file(&raw);
1344        *self.optimization_experiments.write().await = parsed;
1345        Ok(())
1346    }
1347
1348    pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1349        let payload = {
1350            let guard = self.optimization_experiments.read().await;
1351            serde_json::to_string_pretty(&*guard)?
1352        };
1353        if let Some(parent) = self.optimization_experiments_path.parent() {
1354            fs::create_dir_all(parent).await?;
1355        }
1356        fs::write(&self.optimization_experiments_path, payload).await?;
1357        Ok(())
1358    }
1359
1360    async fn verify_automation_v2_persisted(
1361        &self,
1362        automation_id: &str,
1363        expected_present: bool,
1364    ) -> anyhow::Result<()> {
1365        let active_raw = if self.automations_v2_path.exists() {
1366            fs::read_to_string(&self.automations_v2_path).await?
1367        } else {
1368            String::new()
1369        };
1370        let active_parsed = parse_automation_v2_file(&active_raw);
1371        let active_present = active_parsed.contains_key(automation_id);
1372        if active_present != expected_present {
1373            let active_path = self.automations_v2_path.display().to_string();
1374            tracing::error!(
1375                automation_id,
1376                expected_present,
1377                actual_present = active_present,
1378                count = active_parsed.len(),
1379                active_path,
1380                "automation v2 persistence verification failed"
1381            );
1382            anyhow::bail!(
1383                "automation v2 persistence verification failed for `{}`",
1384                automation_id
1385            );
1386        }
1387        let mut alternate_mismatches = Vec::new();
1388        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1389            if path == self.automations_v2_path {
1390                continue;
1391            }
1392            let raw = if path.exists() {
1393                fs::read_to_string(&path).await?
1394            } else {
1395                String::new()
1396            };
1397            let parsed = parse_automation_v2_file(&raw);
1398            let present = parsed.contains_key(automation_id);
1399            if present != expected_present {
1400                alternate_mismatches.push(format!(
1401                    "{} expected_present={} actual_present={} count={}",
1402                    path.display(),
1403                    expected_present,
1404                    present,
1405                    parsed.len()
1406                ));
1407            }
1408        }
1409        if !alternate_mismatches.is_empty() {
1410            let active_path = self.automations_v2_path.display().to_string();
1411            tracing::warn!(
1412                automation_id,
1413                expected_present,
1414                mismatches = ?alternate_mismatches,
1415                active_path,
1416                "automation v2 alternate persistence paths are stale"
1417            );
1418        }
1419        Ok(())
1420    }
1421
1422    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1423        let runs = self
1424            .automation_v2_runs
1425            .read()
1426            .await
1427            .values()
1428            .cloned()
1429            .collect::<Vec<_>>();
1430        let mut guard = self.automations_v2.write().await;
1431        let mut recovered = 0usize;
1432        for run in runs {
1433            let Some(snapshot) = run.automation_snapshot.clone() else {
1434                continue;
1435            };
1436            let should_replace = match guard.get(&run.automation_id) {
1437                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1438                None => true,
1439            };
1440            if should_replace {
1441                if !guard.contains_key(&run.automation_id) {
1442                    recovered += 1;
1443                }
1444                guard.insert(run.automation_id.clone(), snapshot);
1445            }
1446        }
1447        drop(guard);
1448        if recovered > 0 {
1449            let active_path = self.automations_v2_path.display().to_string();
1450            tracing::warn!(
1451                recovered,
1452                active_path,
1453                "recovered automation v2 definitions from run snapshots"
1454            );
1455            self.persist_automations_v2().await?;
1456        }
1457        Ok(recovered)
1458    }
1459
1460    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1461        let path = if self.bug_monitor_config_path.exists() {
1462            self.bug_monitor_config_path.clone()
1463        } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1464            .exists()
1465        {
1466            config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1467        } else {
1468            return Ok(());
1469        };
1470        let raw = fs::read_to_string(path).await?;
1471        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1472            .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1473        *self.bug_monitor_config.write().await = parsed;
1474        Ok(())
1475    }
1476
1477    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1478        if let Some(parent) = self.bug_monitor_config_path.parent() {
1479            fs::create_dir_all(parent).await?;
1480        }
1481        let payload = {
1482            let guard = self.bug_monitor_config.read().await;
1483            serde_json::to_string_pretty(&*guard)?
1484        };
1485        fs::write(&self.bug_monitor_config_path, payload).await?;
1486        Ok(())
1487    }
1488
1489    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1490        self.bug_monitor_config.read().await.clone()
1491    }
1492
1493    pub async fn put_bug_monitor_config(
1494        &self,
1495        mut config: BugMonitorConfig,
1496    ) -> anyhow::Result<BugMonitorConfig> {
1497        config.workspace_root = config
1498            .workspace_root
1499            .as_ref()
1500            .map(|v| v.trim().to_string())
1501            .filter(|v| !v.is_empty());
1502        if let Some(repo) = config.repo.as_ref() {
1503            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1504                anyhow::bail!("repo must be in owner/repo format");
1505            }
1506        }
1507        if let Some(server) = config.mcp_server.as_ref() {
1508            let servers = self.mcp.list().await;
1509            if !servers.contains_key(server) {
1510                anyhow::bail!("unknown mcp server `{server}`");
1511            }
1512        }
1513        if let Some(model_policy) = config.model_policy.as_ref() {
1514            crate::http::routines_automations::validate_model_policy(model_policy)
1515                .map_err(anyhow::Error::msg)?;
1516        }
1517        config.updated_at_ms = now_ms();
1518        *self.bug_monitor_config.write().await = config.clone();
1519        self.persist_bug_monitor_config().await?;
1520        Ok(config)
1521    }
1522
1523    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1524        let path = if self.bug_monitor_drafts_path.exists() {
1525            self.bug_monitor_drafts_path.clone()
1526        } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1527            .exists()
1528        {
1529            config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1530        } else {
1531            return Ok(());
1532        };
1533        let raw = fs::read_to_string(path).await?;
1534        let parsed =
1535            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1536                .unwrap_or_default();
1537        *self.bug_monitor_drafts.write().await = parsed;
1538        Ok(())
1539    }
1540
1541    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1542        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1543            fs::create_dir_all(parent).await?;
1544        }
1545        let payload = {
1546            let guard = self.bug_monitor_drafts.read().await;
1547            serde_json::to_string_pretty(&*guard)?
1548        };
1549        fs::write(&self.bug_monitor_drafts_path, payload).await?;
1550        Ok(())
1551    }
1552
1553    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1554        let path = if self.bug_monitor_incidents_path.exists() {
1555            self.bug_monitor_incidents_path.clone()
1556        } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1557            .exists()
1558        {
1559            config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1560        } else {
1561            return Ok(());
1562        };
1563        let raw = fs::read_to_string(path).await?;
1564        let parsed = serde_json::from_str::<
1565            std::collections::HashMap<String, BugMonitorIncidentRecord>,
1566        >(&raw)
1567        .unwrap_or_default();
1568        *self.bug_monitor_incidents.write().await = parsed;
1569        Ok(())
1570    }
1571
1572    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1573        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1574            fs::create_dir_all(parent).await?;
1575        }
1576        let payload = {
1577            let guard = self.bug_monitor_incidents.read().await;
1578            serde_json::to_string_pretty(&*guard)?
1579        };
1580        fs::write(&self.bug_monitor_incidents_path, payload).await?;
1581        Ok(())
1582    }
1583
1584    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1585        let path = if self.bug_monitor_posts_path.exists() {
1586            self.bug_monitor_posts_path.clone()
1587        } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1588            .exists()
1589        {
1590            config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1591        } else {
1592            return Ok(());
1593        };
1594        let raw = fs::read_to_string(path).await?;
1595        let parsed =
1596            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1597                .unwrap_or_default();
1598        *self.bug_monitor_posts.write().await = parsed;
1599        Ok(())
1600    }
1601
1602    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1603        if let Some(parent) = self.bug_monitor_posts_path.parent() {
1604            fs::create_dir_all(parent).await?;
1605        }
1606        let payload = {
1607            let guard = self.bug_monitor_posts.read().await;
1608            serde_json::to_string_pretty(&*guard)?
1609        };
1610        fs::write(&self.bug_monitor_posts_path, payload).await?;
1611        Ok(())
1612    }
1613
1614    pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1615        if !self.external_actions_path.exists() {
1616            return Ok(());
1617        }
1618        let raw = fs::read_to_string(&self.external_actions_path).await?;
1619        let parsed =
1620            serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1621                .unwrap_or_default();
1622        *self.external_actions.write().await = parsed;
1623        Ok(())
1624    }
1625
1626    pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1627        if let Some(parent) = self.external_actions_path.parent() {
1628            fs::create_dir_all(parent).await?;
1629        }
1630        let payload = {
1631            let guard = self.external_actions.read().await;
1632            serde_json::to_string_pretty(&*guard)?
1633        };
1634        fs::write(&self.external_actions_path, payload).await?;
1635        Ok(())
1636    }
1637
1638    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1639        let mut rows = self
1640            .bug_monitor_incidents
1641            .read()
1642            .await
1643            .values()
1644            .cloned()
1645            .collect::<Vec<_>>();
1646        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1647        rows.truncate(limit.clamp(1, 200));
1648        rows
1649    }
1650
1651    pub async fn get_bug_monitor_incident(
1652        &self,
1653        incident_id: &str,
1654    ) -> Option<BugMonitorIncidentRecord> {
1655        self.bug_monitor_incidents
1656            .read()
1657            .await
1658            .get(incident_id)
1659            .cloned()
1660    }
1661
1662    pub async fn put_bug_monitor_incident(
1663        &self,
1664        incident: BugMonitorIncidentRecord,
1665    ) -> anyhow::Result<BugMonitorIncidentRecord> {
1666        self.bug_monitor_incidents
1667            .write()
1668            .await
1669            .insert(incident.incident_id.clone(), incident.clone());
1670        self.persist_bug_monitor_incidents().await?;
1671        Ok(incident)
1672    }
1673
1674    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1675        let mut rows = self
1676            .bug_monitor_posts
1677            .read()
1678            .await
1679            .values()
1680            .cloned()
1681            .collect::<Vec<_>>();
1682        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1683        rows.truncate(limit.clamp(1, 200));
1684        rows
1685    }
1686
1687    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1688        self.bug_monitor_posts.read().await.get(post_id).cloned()
1689    }
1690
1691    pub async fn put_bug_monitor_post(
1692        &self,
1693        post: BugMonitorPostRecord,
1694    ) -> anyhow::Result<BugMonitorPostRecord> {
1695        self.bug_monitor_posts
1696            .write()
1697            .await
1698            .insert(post.post_id.clone(), post.clone());
1699        self.persist_bug_monitor_posts().await?;
1700        Ok(post)
1701    }
1702
1703    pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1704        let mut rows = self
1705            .external_actions
1706            .read()
1707            .await
1708            .values()
1709            .cloned()
1710            .collect::<Vec<_>>();
1711        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1712        rows.truncate(limit.clamp(1, 200));
1713        rows
1714    }
1715
1716    pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1717        self.external_actions.read().await.get(action_id).cloned()
1718    }
1719
1720    pub async fn get_external_action_by_idempotency_key(
1721        &self,
1722        idempotency_key: &str,
1723    ) -> Option<ExternalActionRecord> {
1724        let normalized = idempotency_key.trim();
1725        if normalized.is_empty() {
1726            return None;
1727        }
1728        self.external_actions
1729            .read()
1730            .await
1731            .values()
1732            .find(|action| {
1733                action
1734                    .idempotency_key
1735                    .as_deref()
1736                    .map(str::trim)
1737                    .filter(|value| !value.is_empty())
1738                    == Some(normalized)
1739            })
1740            .cloned()
1741    }
1742
1743    pub async fn put_external_action(
1744        &self,
1745        action: ExternalActionRecord,
1746    ) -> anyhow::Result<ExternalActionRecord> {
1747        self.external_actions
1748            .write()
1749            .await
1750            .insert(action.action_id.clone(), action.clone());
1751        self.persist_external_actions().await?;
1752        Ok(action)
1753    }
1754
1755    pub async fn record_external_action(
1756        &self,
1757        action: ExternalActionRecord,
1758    ) -> anyhow::Result<ExternalActionRecord> {
1759        let action = {
1760            let mut guard = self.external_actions.write().await;
1761            if let Some(idempotency_key) = action
1762                .idempotency_key
1763                .as_deref()
1764                .map(str::trim)
1765                .filter(|value| !value.is_empty())
1766            {
1767                if let Some(existing) = guard
1768                    .values()
1769                    .find(|existing| {
1770                        existing
1771                            .idempotency_key
1772                            .as_deref()
1773                            .map(str::trim)
1774                            .filter(|value| !value.is_empty())
1775                            == Some(idempotency_key)
1776                    })
1777                    .cloned()
1778                {
1779                    return Ok(existing);
1780                }
1781            }
1782            guard.insert(action.action_id.clone(), action.clone());
1783            action
1784        };
1785        self.persist_external_actions().await?;
1786        if let Some(run_id) = action.routine_run_id.as_deref() {
1787            let artifact = RoutineRunArtifact {
1788                artifact_id: format!("external-action-{}", action.action_id),
1789                uri: format!("external-action://{}", action.action_id),
1790                kind: "external_action_receipt".to_string(),
1791                label: Some(format!("external action receipt: {}", action.operation)),
1792                created_at_ms: action.updated_at_ms,
1793                metadata: Some(json!({
1794                    "actionID": action.action_id,
1795                    "operation": action.operation,
1796                    "status": action.status,
1797                    "sourceKind": action.source_kind,
1798                    "sourceID": action.source_id,
1799                    "capabilityID": action.capability_id,
1800                    "target": action.target,
1801                })),
1802            };
1803            let _ = self
1804                .append_routine_run_artifact(run_id, artifact.clone())
1805                .await;
1806            if let Some(runtime) = self.runtime.get() {
1807                runtime.event_bus.publish(EngineEvent::new(
1808                    "routine.run.artifact_added",
1809                    json!({
1810                        "runID": run_id,
1811                        "artifact": artifact,
1812                    }),
1813                ));
1814            }
1815        }
1816        if let Some(context_run_id) = action.context_run_id.as_deref() {
1817            let payload = serde_json::to_value(&action)?;
1818            if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1819                self,
1820                context_run_id,
1821                &format!("external-action-{}", action.action_id),
1822                "external_action_receipt",
1823                &format!("external-actions/{}.json", action.action_id),
1824                &payload,
1825            )
1826            .await
1827            {
1828                tracing::warn!(
1829                    "failed to append external action artifact {} to context run {}: {}",
1830                    action.action_id,
1831                    context_run_id,
1832                    error
1833                );
1834            }
1835        }
1836        Ok(action)
1837    }
1838
1839    pub async fn update_bug_monitor_runtime_status(
1840        &self,
1841        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1842    ) -> BugMonitorRuntimeStatus {
1843        let mut guard = self.bug_monitor_runtime_status.write().await;
1844        update(&mut guard);
1845        guard.clone()
1846    }
1847
1848    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1849        let mut rows = self
1850            .bug_monitor_drafts
1851            .read()
1852            .await
1853            .values()
1854            .cloned()
1855            .collect::<Vec<_>>();
1856        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1857        rows.truncate(limit.clamp(1, 200));
1858        rows
1859    }
1860
1861    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1862        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1863    }
1864
1865    pub async fn put_bug_monitor_draft(
1866        &self,
1867        draft: BugMonitorDraftRecord,
1868    ) -> anyhow::Result<BugMonitorDraftRecord> {
1869        self.bug_monitor_drafts
1870            .write()
1871            .await
1872            .insert(draft.draft_id.clone(), draft.clone());
1873        self.persist_bug_monitor_drafts().await?;
1874        Ok(draft)
1875    }
1876
1877    pub async fn submit_bug_monitor_draft(
1878        &self,
1879        mut submission: BugMonitorSubmission,
1880    ) -> anyhow::Result<BugMonitorDraftRecord> {
1881        fn normalize_optional(value: Option<String>) -> Option<String> {
1882            value
1883                .map(|v| v.trim().to_string())
1884                .filter(|v| !v.is_empty())
1885        }
1886
1887        fn compute_fingerprint(parts: &[&str]) -> String {
1888            use std::hash::{Hash, Hasher};
1889
1890            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1891            for part in parts {
1892                part.hash(&mut hasher);
1893            }
1894            format!("{:016x}", hasher.finish())
1895        }
1896
1897        submission.repo = normalize_optional(submission.repo);
1898        submission.title = normalize_optional(submission.title);
1899        submission.detail = normalize_optional(submission.detail);
1900        submission.source = normalize_optional(submission.source);
1901        submission.run_id = normalize_optional(submission.run_id);
1902        submission.session_id = normalize_optional(submission.session_id);
1903        submission.correlation_id = normalize_optional(submission.correlation_id);
1904        submission.file_name = normalize_optional(submission.file_name);
1905        submission.process = normalize_optional(submission.process);
1906        submission.component = normalize_optional(submission.component);
1907        submission.event = normalize_optional(submission.event);
1908        submission.level = normalize_optional(submission.level);
1909        submission.fingerprint = normalize_optional(submission.fingerprint);
1910        submission.excerpt = submission
1911            .excerpt
1912            .into_iter()
1913            .map(|line| line.trim_end().to_string())
1914            .filter(|line| !line.is_empty())
1915            .take(50)
1916            .collect();
1917
1918        let config = self.bug_monitor_config().await;
1919        let repo = submission
1920            .repo
1921            .clone()
1922            .or(config.repo.clone())
1923            .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1924        if !is_valid_owner_repo_slug(&repo) {
1925            anyhow::bail!("Bug Monitor repo must be in owner/repo format");
1926        }
1927
1928        let title = submission.title.clone().unwrap_or_else(|| {
1929            if let Some(event) = submission.event.as_ref() {
1930                format!("Failure detected in {event}")
1931            } else if let Some(component) = submission.component.as_ref() {
1932                format!("Failure detected in {component}")
1933            } else if let Some(process) = submission.process.as_ref() {
1934                format!("Failure detected in {process}")
1935            } else if let Some(source) = submission.source.as_ref() {
1936                format!("Failure report from {source}")
1937            } else {
1938                "Failure report".to_string()
1939            }
1940        });
1941
1942        let mut detail_lines = Vec::new();
1943        if let Some(source) = submission.source.as_ref() {
1944            detail_lines.push(format!("source: {source}"));
1945        }
1946        if let Some(file_name) = submission.file_name.as_ref() {
1947            detail_lines.push(format!("file: {file_name}"));
1948        }
1949        if let Some(level) = submission.level.as_ref() {
1950            detail_lines.push(format!("level: {level}"));
1951        }
1952        if let Some(process) = submission.process.as_ref() {
1953            detail_lines.push(format!("process: {process}"));
1954        }
1955        if let Some(component) = submission.component.as_ref() {
1956            detail_lines.push(format!("component: {component}"));
1957        }
1958        if let Some(event) = submission.event.as_ref() {
1959            detail_lines.push(format!("event: {event}"));
1960        }
1961        if let Some(run_id) = submission.run_id.as_ref() {
1962            detail_lines.push(format!("run_id: {run_id}"));
1963        }
1964        if let Some(session_id) = submission.session_id.as_ref() {
1965            detail_lines.push(format!("session_id: {session_id}"));
1966        }
1967        if let Some(correlation_id) = submission.correlation_id.as_ref() {
1968            detail_lines.push(format!("correlation_id: {correlation_id}"));
1969        }
1970        if let Some(detail) = submission.detail.as_ref() {
1971            detail_lines.push(String::new());
1972            detail_lines.push(detail.clone());
1973        }
1974        if !submission.excerpt.is_empty() {
1975            if !detail_lines.is_empty() {
1976                detail_lines.push(String::new());
1977            }
1978            detail_lines.push("excerpt:".to_string());
1979            detail_lines.extend(submission.excerpt.iter().map(|line| format!("  {line}")));
1980        }
1981        let detail = if detail_lines.is_empty() {
1982            None
1983        } else {
1984            Some(detail_lines.join("\n"))
1985        };
1986
1987        let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
1988            compute_fingerprint(&[
1989                repo.as_str(),
1990                title.as_str(),
1991                detail.as_deref().unwrap_or(""),
1992                submission.source.as_deref().unwrap_or(""),
1993                submission.run_id.as_deref().unwrap_or(""),
1994                submission.session_id.as_deref().unwrap_or(""),
1995                submission.correlation_id.as_deref().unwrap_or(""),
1996            ])
1997        });
1998
1999        let mut drafts = self.bug_monitor_drafts.write().await;
2000        if let Some(existing) = drafts
2001            .values()
2002            .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2003            .cloned()
2004        {
2005            return Ok(existing);
2006        }
2007
2008        let draft = BugMonitorDraftRecord {
2009            draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2010            fingerprint,
2011            repo,
2012            status: if config.require_approval_for_new_issues {
2013                "approval_required".to_string()
2014            } else {
2015                "draft_ready".to_string()
2016            },
2017            created_at_ms: now_ms(),
2018            triage_run_id: None,
2019            issue_number: None,
2020            title: Some(title),
2021            detail,
2022            github_status: None,
2023            github_issue_url: None,
2024            github_comment_url: None,
2025            github_posted_at_ms: None,
2026            matched_issue_number: None,
2027            matched_issue_state: None,
2028            evidence_digest: None,
2029            last_post_error: None,
2030        };
2031        drafts.insert(draft.draft_id.clone(), draft.clone());
2032        drop(drafts);
2033        self.persist_bug_monitor_drafts().await?;
2034        Ok(draft)
2035    }
2036
2037    pub async fn update_bug_monitor_draft_status(
2038        &self,
2039        draft_id: &str,
2040        next_status: &str,
2041        reason: Option<&str>,
2042    ) -> anyhow::Result<BugMonitorDraftRecord> {
2043        let normalized_status = next_status.trim().to_ascii_lowercase();
2044        if normalized_status != "draft_ready" && normalized_status != "denied" {
2045            anyhow::bail!("unsupported Bug Monitor draft status");
2046        }
2047
2048        let mut drafts = self.bug_monitor_drafts.write().await;
2049        let Some(draft) = drafts.get_mut(draft_id) else {
2050            anyhow::bail!("Bug Monitor draft not found");
2051        };
2052        if !draft.status.eq_ignore_ascii_case("approval_required") {
2053            anyhow::bail!("Bug Monitor draft is not waiting for approval");
2054        }
2055        draft.status = normalized_status.clone();
2056        if let Some(reason) = reason
2057            .map(|value| value.trim())
2058            .filter(|value| !value.is_empty())
2059        {
2060            let next_detail = if let Some(detail) = draft.detail.as_ref() {
2061                format!("{detail}\n\noperator_note: {reason}")
2062            } else {
2063                format!("operator_note: {reason}")
2064            };
2065            draft.detail = Some(next_detail);
2066        }
2067        let updated = draft.clone();
2068        drop(drafts);
2069        self.persist_bug_monitor_drafts().await?;
2070
2071        let event_name = if normalized_status == "draft_ready" {
2072            "bug_monitor.draft.approved"
2073        } else {
2074            "bug_monitor.draft.denied"
2075        };
2076        self.event_bus.publish(EngineEvent::new(
2077            event_name,
2078            serde_json::json!({
2079                "draft_id": updated.draft_id,
2080                "repo": updated.repo,
2081                "status": updated.status,
2082                "reason": reason,
2083            }),
2084        ));
2085        Ok(updated)
2086    }
2087
2088    pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2089        let required_capabilities = vec![
2090            "github.list_issues".to_string(),
2091            "github.get_issue".to_string(),
2092            "github.create_issue".to_string(),
2093            "github.comment_on_issue".to_string(),
2094        ];
2095        let config = self.bug_monitor_config().await;
2096        let drafts = self.bug_monitor_drafts.read().await;
2097        let incidents = self.bug_monitor_incidents.read().await;
2098        let posts = self.bug_monitor_posts.read().await;
2099        let total_incidents = incidents.len();
2100        let pending_incidents = incidents
2101            .values()
2102            .filter(|row| {
2103                matches!(
2104                    row.status.as_str(),
2105                    "queued"
2106                        | "draft_created"
2107                        | "triage_queued"
2108                        | "analysis_queued"
2109                        | "triage_pending"
2110                        | "issue_draft_pending"
2111                )
2112            })
2113            .count();
2114        let pending_drafts = drafts
2115            .values()
2116            .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2117            .count();
2118        let pending_posts = posts
2119            .values()
2120            .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2121            .count();
2122        let last_activity_at_ms = drafts
2123            .values()
2124            .map(|row| row.created_at_ms)
2125            .chain(posts.values().map(|row| row.updated_at_ms))
2126            .max();
2127        drop(drafts);
2128        drop(incidents);
2129        drop(posts);
2130        let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2131        runtime.paused = config.paused;
2132        runtime.total_incidents = total_incidents;
2133        runtime.pending_incidents = pending_incidents;
2134        runtime.pending_posts = pending_posts;
2135
2136        let mut status = BugMonitorStatus {
2137            config: config.clone(),
2138            runtime,
2139            pending_drafts,
2140            pending_posts,
2141            last_activity_at_ms,
2142            ..BugMonitorStatus::default()
2143        };
2144        let repo_valid = config
2145            .repo
2146            .as_ref()
2147            .map(|repo| is_valid_owner_repo_slug(repo))
2148            .unwrap_or(false);
2149        let servers = self.mcp.list().await;
2150        let selected_server = config
2151            .mcp_server
2152            .as_ref()
2153            .and_then(|name| servers.get(name))
2154            .cloned();
2155        let provider_catalog = self.providers.list().await;
2156        let selected_model = config
2157            .model_policy
2158            .as_ref()
2159            .and_then(|policy| policy.get("default_model"))
2160            .and_then(crate::app::routines::parse_model_spec);
2161        let selected_model_ready = selected_model
2162            .as_ref()
2163            .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2164            .unwrap_or(false);
2165        let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2166            self.mcp.server_tools(server_name).await
2167        } else {
2168            Vec::new()
2169        };
2170        let discovered_tools = self
2171            .capability_resolver
2172            .discover_from_runtime(selected_server_tools, Vec::new())
2173            .await;
2174        status.discovered_mcp_tools = discovered_tools
2175            .iter()
2176            .map(|row| row.tool_name.clone())
2177            .collect();
2178        let discovered_providers = discovered_tools
2179            .iter()
2180            .map(|row| row.provider.to_ascii_lowercase())
2181            .collect::<std::collections::HashSet<_>>();
2182        let provider_preference = match config.provider_preference {
2183            BugMonitorProviderPreference::OfficialGithub => {
2184                vec![
2185                    "mcp".to_string(),
2186                    "composio".to_string(),
2187                    "arcade".to_string(),
2188                ]
2189            }
2190            BugMonitorProviderPreference::Composio => {
2191                vec![
2192                    "composio".to_string(),
2193                    "mcp".to_string(),
2194                    "arcade".to_string(),
2195                ]
2196            }
2197            BugMonitorProviderPreference::Arcade => {
2198                vec![
2199                    "arcade".to_string(),
2200                    "mcp".to_string(),
2201                    "composio".to_string(),
2202                ]
2203            }
2204            BugMonitorProviderPreference::Auto => {
2205                vec![
2206                    "mcp".to_string(),
2207                    "composio".to_string(),
2208                    "arcade".to_string(),
2209                ]
2210            }
2211        };
2212        let capability_resolution = self
2213            .capability_resolver
2214            .resolve(
2215                crate::capability_resolver::CapabilityResolveInput {
2216                    workflow_id: Some("bug_monitor".to_string()),
2217                    required_capabilities: required_capabilities.clone(),
2218                    optional_capabilities: Vec::new(),
2219                    provider_preference,
2220                    available_tools: discovered_tools,
2221                },
2222                Vec::new(),
2223            )
2224            .await
2225            .ok();
2226        let bindings_file = self.capability_resolver.list_bindings().await.ok();
2227        if let Some(bindings) = bindings_file.as_ref() {
2228            status.binding_source_version = bindings.builtin_version.clone();
2229            status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2230            status.selected_server_binding_candidates = bindings
2231                .bindings
2232                .iter()
2233                .filter(|binding| required_capabilities.contains(&binding.capability_id))
2234                .filter(|binding| {
2235                    discovered_providers.is_empty()
2236                        || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2237                })
2238                .map(|binding| {
2239                    let binding_key = format!(
2240                        "{}::{}",
2241                        binding.capability_id,
2242                        binding.tool_name.to_ascii_lowercase()
2243                    );
2244                    let matched = capability_resolution
2245                        .as_ref()
2246                        .map(|resolution| {
2247                            resolution.resolved.iter().any(|row| {
2248                                row.capability_id == binding.capability_id
2249                                    && format!(
2250                                        "{}::{}",
2251                                        row.capability_id,
2252                                        row.tool_name.to_ascii_lowercase()
2253                                    ) == binding_key
2254                            })
2255                        })
2256                        .unwrap_or(false);
2257                    BugMonitorBindingCandidate {
2258                        capability_id: binding.capability_id.clone(),
2259                        binding_tool_name: binding.tool_name.clone(),
2260                        aliases: binding.tool_name_aliases.clone(),
2261                        matched,
2262                    }
2263                })
2264                .collect();
2265            status.selected_server_binding_candidates.sort_by(|a, b| {
2266                a.capability_id
2267                    .cmp(&b.capability_id)
2268                    .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2269            });
2270        }
2271        let capability_ready = |capability_id: &str| -> bool {
2272            capability_resolution
2273                .as_ref()
2274                .map(|resolved| {
2275                    resolved
2276                        .resolved
2277                        .iter()
2278                        .any(|row| row.capability_id == capability_id)
2279                })
2280                .unwrap_or(false)
2281        };
2282        if let Some(resolution) = capability_resolution.as_ref() {
2283            status.missing_required_capabilities = resolution.missing_required.clone();
2284            status.resolved_capabilities = resolution
2285                .resolved
2286                .iter()
2287                .map(|row| BugMonitorCapabilityMatch {
2288                    capability_id: row.capability_id.clone(),
2289                    provider: row.provider.clone(),
2290                    tool_name: row.tool_name.clone(),
2291                    binding_index: row.binding_index,
2292                })
2293                .collect();
2294        } else {
2295            status.missing_required_capabilities = required_capabilities.clone();
2296        }
2297        status.required_capabilities = BugMonitorCapabilityReadiness {
2298            github_list_issues: capability_ready("github.list_issues"),
2299            github_get_issue: capability_ready("github.get_issue"),
2300            github_create_issue: capability_ready("github.create_issue"),
2301            github_comment_on_issue: capability_ready("github.comment_on_issue"),
2302        };
2303        status.selected_model = selected_model;
2304        status.readiness = BugMonitorReadiness {
2305            config_valid: repo_valid
2306                && selected_server.is_some()
2307                && status.required_capabilities.github_list_issues
2308                && status.required_capabilities.github_get_issue
2309                && status.required_capabilities.github_create_issue
2310                && status.required_capabilities.github_comment_on_issue
2311                && selected_model_ready,
2312            repo_valid,
2313            mcp_server_present: selected_server.is_some(),
2314            mcp_connected: selected_server
2315                .as_ref()
2316                .map(|row| row.connected)
2317                .unwrap_or(false),
2318            github_read_ready: status.required_capabilities.github_list_issues
2319                && status.required_capabilities.github_get_issue,
2320            github_write_ready: status.required_capabilities.github_create_issue
2321                && status.required_capabilities.github_comment_on_issue,
2322            selected_model_ready,
2323            ingest_ready: config.enabled && !config.paused && repo_valid,
2324            publish_ready: config.enabled
2325                && !config.paused
2326                && repo_valid
2327                && selected_server
2328                    .as_ref()
2329                    .map(|row| row.connected)
2330                    .unwrap_or(false)
2331                && status.required_capabilities.github_list_issues
2332                && status.required_capabilities.github_get_issue
2333                && status.required_capabilities.github_create_issue
2334                && status.required_capabilities.github_comment_on_issue
2335                && selected_model_ready,
2336            runtime_ready: config.enabled
2337                && !config.paused
2338                && repo_valid
2339                && selected_server
2340                    .as_ref()
2341                    .map(|row| row.connected)
2342                    .unwrap_or(false)
2343                && status.required_capabilities.github_list_issues
2344                && status.required_capabilities.github_get_issue
2345                && status.required_capabilities.github_create_issue
2346                && status.required_capabilities.github_comment_on_issue
2347                && selected_model_ready,
2348        };
2349        if config.enabled {
2350            if config.paused {
2351                status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2352            } else if !repo_valid {
2353                status.last_error = Some("Target repo is missing or invalid.".to_string());
2354            } else if selected_server.is_none() {
2355                status.last_error = Some("Selected MCP server is missing.".to_string());
2356            } else if !status.readiness.mcp_connected {
2357                status.last_error = Some("Selected MCP server is disconnected.".to_string());
2358            } else if !selected_model_ready {
2359                status.last_error = Some(
2360                    "Selected provider/model is unavailable. Bug monitor is fail-closed."
2361                        .to_string(),
2362                );
2363            } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2364                let missing = if status.missing_required_capabilities.is_empty() {
2365                    "unknown".to_string()
2366                } else {
2367                    status.missing_required_capabilities.join(", ")
2368                };
2369                status.last_error = Some(format!(
2370                    "Selected MCP server is missing required GitHub capabilities: {missing}"
2371                ));
2372            }
2373        }
2374        status.runtime.monitoring_active = status.readiness.ingest_ready;
2375        status
2376    }
2377
2378    pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2379        if !self.workflow_runs_path.exists() {
2380            return Ok(());
2381        }
2382        let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2383        let parsed =
2384            serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2385                .unwrap_or_default();
2386        *self.workflow_runs.write().await = parsed;
2387        Ok(())
2388    }
2389
2390    pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2391        if let Some(parent) = self.workflow_runs_path.parent() {
2392            fs::create_dir_all(parent).await?;
2393        }
2394        let payload = {
2395            let guard = self.workflow_runs.read().await;
2396            serde_json::to_string_pretty(&*guard)?
2397        };
2398        fs::write(&self.workflow_runs_path, payload).await?;
2399        Ok(())
2400    }
2401
2402    pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2403        if !self.workflow_hook_overrides_path.exists() {
2404            return Ok(());
2405        }
2406        let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2407        let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2408            .unwrap_or_default();
2409        *self.workflow_hook_overrides.write().await = parsed;
2410        Ok(())
2411    }
2412
2413    pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2414        if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2415            fs::create_dir_all(parent).await?;
2416        }
2417        let payload = {
2418            let guard = self.workflow_hook_overrides.read().await;
2419            serde_json::to_string_pretty(&*guard)?
2420        };
2421        fs::write(&self.workflow_hook_overrides_path, payload).await?;
2422        Ok(())
2423    }
2424
2425    pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2426        let mut sources = Vec::new();
2427        sources.push(WorkflowLoadSource {
2428            root: config::paths::resolve_builtin_workflows_dir(),
2429            kind: WorkflowSourceKind::BuiltIn,
2430            pack_id: None,
2431        });
2432
2433        let workspace_root = self.workspace_index.snapshot().await.root;
2434        sources.push(WorkflowLoadSource {
2435            root: PathBuf::from(workspace_root).join(".tandem"),
2436            kind: WorkflowSourceKind::Workspace,
2437            pack_id: None,
2438        });
2439
2440        if let Ok(packs) = self.pack_manager.list().await {
2441            for pack in packs {
2442                sources.push(WorkflowLoadSource {
2443                    root: PathBuf::from(pack.install_path),
2444                    kind: WorkflowSourceKind::Pack,
2445                    pack_id: Some(pack.pack_id),
2446                });
2447            }
2448        }
2449
2450        let mut registry = load_workflow_registry(&sources)?;
2451        let overrides = self.workflow_hook_overrides.read().await.clone();
2452        for hook in &mut registry.hooks {
2453            if let Some(enabled) = overrides.get(&hook.binding_id) {
2454                hook.enabled = *enabled;
2455            }
2456        }
2457        for workflow in registry.workflows.values_mut() {
2458            workflow.hooks = registry
2459                .hooks
2460                .iter()
2461                .filter(|hook| hook.workflow_id == workflow.workflow_id)
2462                .cloned()
2463                .collect();
2464        }
2465        let messages = validate_workflow_registry(&registry);
2466        *self.workflows.write().await = registry;
2467        Ok(messages)
2468    }
2469
2470    pub async fn workflow_registry(&self) -> WorkflowRegistry {
2471        self.workflows.read().await.clone()
2472    }
2473
2474    pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2475        let mut rows = self
2476            .workflows
2477            .read()
2478            .await
2479            .workflows
2480            .values()
2481            .cloned()
2482            .collect::<Vec<_>>();
2483        rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2484        rows
2485    }
2486
2487    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2488        self.workflows
2489            .read()
2490            .await
2491            .workflows
2492            .get(workflow_id)
2493            .cloned()
2494    }
2495
2496    pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2497        let mut rows = self
2498            .workflows
2499            .read()
2500            .await
2501            .hooks
2502            .iter()
2503            .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2504            .cloned()
2505            .collect::<Vec<_>>();
2506        rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2507        rows
2508    }
2509
2510    pub async fn set_workflow_hook_enabled(
2511        &self,
2512        binding_id: &str,
2513        enabled: bool,
2514    ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2515        self.workflow_hook_overrides
2516            .write()
2517            .await
2518            .insert(binding_id.to_string(), enabled);
2519        self.persist_workflow_hook_overrides().await?;
2520        let _ = self.reload_workflows().await?;
2521        Ok(self
2522            .workflows
2523            .read()
2524            .await
2525            .hooks
2526            .iter()
2527            .find(|hook| hook.binding_id == binding_id)
2528            .cloned())
2529    }
2530
2531    pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2532        self.workflow_runs
2533            .write()
2534            .await
2535            .insert(run.run_id.clone(), run);
2536        self.persist_workflow_runs().await
2537    }
2538
2539    pub async fn update_workflow_run(
2540        &self,
2541        run_id: &str,
2542        update: impl FnOnce(&mut WorkflowRunRecord),
2543    ) -> Option<WorkflowRunRecord> {
2544        let mut guard = self.workflow_runs.write().await;
2545        let row = guard.get_mut(run_id)?;
2546        update(row);
2547        row.updated_at_ms = now_ms();
2548        if matches!(
2549            row.status,
2550            WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2551        ) {
2552            row.finished_at_ms.get_or_insert_with(now_ms);
2553        }
2554        let out = row.clone();
2555        drop(guard);
2556        let _ = self.persist_workflow_runs().await;
2557        Some(out)
2558    }
2559
2560    pub async fn list_workflow_runs(
2561        &self,
2562        workflow_id: Option<&str>,
2563        limit: usize,
2564    ) -> Vec<WorkflowRunRecord> {
2565        let mut rows = self
2566            .workflow_runs
2567            .read()
2568            .await
2569            .values()
2570            .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2571            .cloned()
2572            .collect::<Vec<_>>();
2573        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2574        rows.truncate(limit.clamp(1, 500));
2575        rows
2576    }
2577
2578    pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2579        self.workflow_runs.read().await.get(run_id).cloned()
2580    }
2581
2582    pub async fn put_automation_v2(
2583        &self,
2584        mut automation: AutomationV2Spec,
2585    ) -> anyhow::Result<AutomationV2Spec> {
2586        if automation.automation_id.trim().is_empty() {
2587            anyhow::bail!("automation_id is required");
2588        }
2589        for agent in &mut automation.agents {
2590            if agent.display_name.trim().is_empty() {
2591                agent.display_name = auto_generated_agent_name(&agent.agent_id);
2592            }
2593            agent.tool_policy.allowlist =
2594                config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2595            agent.tool_policy.denylist =
2596                config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2597            agent.mcp_policy.allowed_servers =
2598                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2599            agent.mcp_policy.allowed_tools = agent
2600                .mcp_policy
2601                .allowed_tools
2602                .take()
2603                .map(normalize_allowed_tools);
2604        }
2605        let now = now_ms();
2606        if automation.created_at_ms == 0 {
2607            automation.created_at_ms = now;
2608        }
2609        automation.updated_at_ms = now;
2610        if automation.next_fire_at_ms.is_none() {
2611            automation.next_fire_at_ms =
2612                automation_schedule_next_fire_at_ms(&automation.schedule, now);
2613        }
2614        migrate_bundled_studio_research_split_automation(&mut automation);
2615        self.automations_v2
2616            .write()
2617            .await
2618            .insert(automation.automation_id.clone(), automation.clone());
2619        self.persist_automations_v2().await?;
2620        self.verify_automation_v2_persisted(&automation.automation_id, true)
2621            .await?;
2622        Ok(automation)
2623    }
2624
2625    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2626        self.automations_v2.read().await.get(automation_id).cloned()
2627    }
2628
2629    pub fn automation_v2_runtime_context(
2630        &self,
2631        run: &AutomationV2RunRecord,
2632    ) -> Option<AutomationRuntimeContextMaterialization> {
2633        run.runtime_context.clone().or_else(|| {
2634            run.automation_snapshot.as_ref().and_then(|automation| {
2635                automation
2636                    .runtime_context_materialization()
2637                    .or_else(|| automation.approved_plan_runtime_context_materialization())
2638            })
2639        })
2640    }
2641
2642    pub(crate) fn automation_v2_approved_plan_materialization(
2643        &self,
2644        run: &AutomationV2RunRecord,
2645    ) -> Option<tandem_plan_compiler::api::ApprovedPlanMaterialization> {
2646        run.automation_snapshot
2647            .as_ref()
2648            .and_then(AutomationV2Spec::approved_plan_materialization)
2649    }
2650
2651    pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2652        self.workflow_plans
2653            .write()
2654            .await
2655            .insert(plan.plan_id.clone(), plan);
2656    }
2657
2658    pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2659        self.workflow_plans.read().await.get(plan_id).cloned()
2660    }
2661
2662    pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2663        self.workflow_plan_drafts
2664            .write()
2665            .await
2666            .insert(draft.current_plan.plan_id.clone(), draft.clone());
2667        self.put_workflow_plan(draft.current_plan).await;
2668    }
2669
2670    pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2671        self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2672    }
2673
2674    pub async fn load_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2675        if !self.workflow_planner_sessions_path.exists() {
2676            return Ok(());
2677        }
2678        let raw = fs::read_to_string(&self.workflow_planner_sessions_path).await?;
2679        let parsed = serde_json::from_str::<
2680            std::collections::HashMap<
2681                String,
2682                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2683            >,
2684        >(&raw)
2685        .unwrap_or_default();
2686        self.replace_workflow_planner_sessions(parsed).await?;
2687        Ok(())
2688    }
2689
2690    pub async fn persist_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2691        if let Some(parent) = self.workflow_planner_sessions_path.parent() {
2692            fs::create_dir_all(parent).await?;
2693        }
2694        let payload = {
2695            let guard = self.workflow_planner_sessions.read().await;
2696            serde_json::to_string_pretty(&*guard)?
2697        };
2698        fs::write(&self.workflow_planner_sessions_path, payload).await?;
2699        Ok(())
2700    }
2701
2702    async fn replace_workflow_planner_sessions(
2703        &self,
2704        sessions: std::collections::HashMap<
2705            String,
2706            crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2707        >,
2708    ) -> anyhow::Result<()> {
2709        {
2710            let mut sessions_guard = self.workflow_planner_sessions.write().await;
2711            *sessions_guard = sessions.clone();
2712        }
2713        {
2714            let mut plans = self.workflow_plans.write().await;
2715            let mut drafts = self.workflow_plan_drafts.write().await;
2716            plans.clear();
2717            drafts.clear();
2718            for session in sessions.values() {
2719                if let Some(draft) = session.draft.as_ref() {
2720                    plans.insert(
2721                        draft.current_plan.plan_id.clone(),
2722                        draft.current_plan.clone(),
2723                    );
2724                    drafts.insert(draft.current_plan.plan_id.clone(), draft.clone());
2725                }
2726            }
2727        }
2728        Ok(())
2729    }
2730
2731    async fn sync_workflow_planner_session_cache(
2732        &self,
2733        session: &crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2734    ) {
2735        if let Some(draft) = session.draft.as_ref() {
2736            self.workflow_plans.write().await.insert(
2737                draft.current_plan.plan_id.clone(),
2738                draft.current_plan.clone(),
2739            );
2740            self.workflow_plan_drafts
2741                .write()
2742                .await
2743                .insert(draft.current_plan.plan_id.clone(), draft.clone());
2744        }
2745    }
2746
2747    pub async fn put_workflow_planner_session(
2748        &self,
2749        mut session: crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2750    ) -> anyhow::Result<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2751        if session.session_id.trim().is_empty() {
2752            anyhow::bail!("session_id is required");
2753        }
2754        if session.project_slug.trim().is_empty() {
2755            anyhow::bail!("project_slug is required");
2756        }
2757        let now = now_ms();
2758        if session.created_at_ms == 0 {
2759            session.created_at_ms = now;
2760        }
2761        session.updated_at_ms = now;
2762        {
2763            self.workflow_planner_sessions
2764                .write()
2765                .await
2766                .insert(session.session_id.clone(), session.clone());
2767        }
2768        self.sync_workflow_planner_session_cache(&session).await;
2769        self.persist_workflow_planner_sessions().await?;
2770        Ok(session)
2771    }
2772
2773    pub async fn get_workflow_planner_session(
2774        &self,
2775        session_id: &str,
2776    ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2777        self.workflow_planner_sessions
2778            .read()
2779            .await
2780            .get(session_id)
2781            .cloned()
2782    }
2783
2784    pub async fn list_workflow_planner_sessions(
2785        &self,
2786        project_slug: Option<&str>,
2787    ) -> Vec<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2788        let mut rows = self
2789            .workflow_planner_sessions
2790            .read()
2791            .await
2792            .values()
2793            .filter(|session| {
2794                project_slug
2795                    .map(|slug| session.project_slug == slug)
2796                    .unwrap_or(true)
2797            })
2798            .cloned()
2799            .collect::<Vec<_>>();
2800        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2801        rows
2802    }
2803
2804    pub async fn delete_workflow_planner_session(
2805        &self,
2806        session_id: &str,
2807    ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2808        let removed = self
2809            .workflow_planner_sessions
2810            .write()
2811            .await
2812            .remove(session_id);
2813        if let Some(session) = removed.as_ref() {
2814            if let Some(draft) = session.draft.as_ref() {
2815                self.workflow_plan_drafts
2816                    .write()
2817                    .await
2818                    .remove(&draft.current_plan.plan_id);
2819                self.workflow_plans
2820                    .write()
2821                    .await
2822                    .remove(&draft.current_plan.plan_id);
2823            }
2824        }
2825        let _ = self.persist_workflow_planner_sessions().await;
2826        removed
2827    }
2828
2829    pub async fn put_optimization_campaign(
2830        &self,
2831        mut campaign: OptimizationCampaignRecord,
2832    ) -> anyhow::Result<OptimizationCampaignRecord> {
2833        if campaign.optimization_id.trim().is_empty() {
2834            anyhow::bail!("optimization_id is required");
2835        }
2836        if campaign.source_workflow_id.trim().is_empty() {
2837            anyhow::bail!("source_workflow_id is required");
2838        }
2839        if campaign.name.trim().is_empty() {
2840            anyhow::bail!("name is required");
2841        }
2842        let now = now_ms();
2843        if campaign.created_at_ms == 0 {
2844            campaign.created_at_ms = now;
2845        }
2846        campaign.updated_at_ms = now;
2847        campaign.source_workflow_snapshot_hash =
2848            optimization_snapshot_hash(&campaign.source_workflow_snapshot);
2849        campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
2850        self.optimization_campaigns
2851            .write()
2852            .await
2853            .insert(campaign.optimization_id.clone(), campaign.clone());
2854        self.persist_optimization_campaigns().await?;
2855        Ok(campaign)
2856    }
2857
2858    pub async fn get_optimization_campaign(
2859        &self,
2860        optimization_id: &str,
2861    ) -> Option<OptimizationCampaignRecord> {
2862        self.optimization_campaigns
2863            .read()
2864            .await
2865            .get(optimization_id)
2866            .cloned()
2867    }
2868
2869    pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
2870        let mut rows = self
2871            .optimization_campaigns
2872            .read()
2873            .await
2874            .values()
2875            .cloned()
2876            .collect::<Vec<_>>();
2877        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2878        rows
2879    }
2880
2881    pub async fn put_optimization_experiment(
2882        &self,
2883        mut experiment: OptimizationExperimentRecord,
2884    ) -> anyhow::Result<OptimizationExperimentRecord> {
2885        if experiment.experiment_id.trim().is_empty() {
2886            anyhow::bail!("experiment_id is required");
2887        }
2888        if experiment.optimization_id.trim().is_empty() {
2889            anyhow::bail!("optimization_id is required");
2890        }
2891        let now = now_ms();
2892        if experiment.created_at_ms == 0 {
2893            experiment.created_at_ms = now;
2894        }
2895        experiment.updated_at_ms = now;
2896        experiment.candidate_snapshot_hash =
2897            optimization_snapshot_hash(&experiment.candidate_snapshot);
2898        self.optimization_experiments
2899            .write()
2900            .await
2901            .insert(experiment.experiment_id.clone(), experiment.clone());
2902        self.persist_optimization_experiments().await?;
2903        Ok(experiment)
2904    }
2905
2906    pub async fn get_optimization_experiment(
2907        &self,
2908        optimization_id: &str,
2909        experiment_id: &str,
2910    ) -> Option<OptimizationExperimentRecord> {
2911        self.optimization_experiments
2912            .read()
2913            .await
2914            .get(experiment_id)
2915            .filter(|row| row.optimization_id == optimization_id)
2916            .cloned()
2917    }
2918
2919    pub async fn list_optimization_experiments(
2920        &self,
2921        optimization_id: &str,
2922    ) -> Vec<OptimizationExperimentRecord> {
2923        let mut rows = self
2924            .optimization_experiments
2925            .read()
2926            .await
2927            .values()
2928            .filter(|row| row.optimization_id == optimization_id)
2929            .cloned()
2930            .collect::<Vec<_>>();
2931        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2932        rows
2933    }
2934
2935    pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
2936        self.optimization_experiments
2937            .read()
2938            .await
2939            .values()
2940            .filter(|row| row.optimization_id == optimization_id)
2941            .count()
2942    }
2943
2944    fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
2945        matches!(
2946            status,
2947            crate::AutomationRunStatus::Completed
2948                | crate::AutomationRunStatus::Blocked
2949                | crate::AutomationRunStatus::Failed
2950                | crate::AutomationRunStatus::Cancelled
2951        )
2952    }
2953
2954    fn optimization_consecutive_failure_count(
2955        experiments: &[OptimizationExperimentRecord],
2956    ) -> usize {
2957        let mut ordered = experiments.to_vec();
2958        ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
2959        ordered
2960            .iter()
2961            .rev()
2962            .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
2963            .count()
2964    }
2965
2966    fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
2967        match field {
2968            OptimizationMutableField::Objective => "objective",
2969            OptimizationMutableField::OutputContractSummaryGuidance => {
2970                "output_contract.summary_guidance"
2971            }
2972            OptimizationMutableField::TimeoutMs => "timeout_ms",
2973            OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
2974            OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
2975        }
2976    }
2977
2978    fn optimization_node_field_value(
2979        node: &crate::AutomationFlowNode,
2980        field: OptimizationMutableField,
2981    ) -> Result<Value, String> {
2982        match field {
2983            OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
2984            OptimizationMutableField::OutputContractSummaryGuidance => node
2985                .output_contract
2986                .as_ref()
2987                .and_then(|contract| contract.summary_guidance.clone())
2988                .map(Value::String)
2989                .ok_or_else(|| {
2990                    format!(
2991                        "node `{}` is missing output_contract.summary_guidance",
2992                        node.node_id
2993                    )
2994                }),
2995            OptimizationMutableField::TimeoutMs => node
2996                .timeout_ms
2997                .map(|value| json!(value))
2998                .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
2999            OptimizationMutableField::RetryPolicyMaxAttempts => node
3000                .retry_policy
3001                .as_ref()
3002                .and_then(Value::as_object)
3003                .and_then(|policy| policy.get("max_attempts"))
3004                .cloned()
3005                .ok_or_else(|| {
3006                    format!(
3007                        "node `{}` is missing retry_policy.max_attempts",
3008                        node.node_id
3009                    )
3010                }),
3011            OptimizationMutableField::RetryPolicyRetries => node
3012                .retry_policy
3013                .as_ref()
3014                .and_then(Value::as_object)
3015                .and_then(|policy| policy.get("retries"))
3016                .cloned()
3017                .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
3018        }
3019    }
3020
3021    fn set_optimization_node_field_value(
3022        node: &mut crate::AutomationFlowNode,
3023        field: OptimizationMutableField,
3024        value: &Value,
3025    ) -> Result<(), String> {
3026        match field {
3027            OptimizationMutableField::Objective => {
3028                node.objective = value
3029                    .as_str()
3030                    .ok_or_else(|| "objective apply value must be a string".to_string())?
3031                    .to_string();
3032            }
3033            OptimizationMutableField::OutputContractSummaryGuidance => {
3034                let guidance = value
3035                    .as_str()
3036                    .ok_or_else(|| {
3037                        "output_contract.summary_guidance apply value must be a string".to_string()
3038                    })?
3039                    .to_string();
3040                let contract = node.output_contract.as_mut().ok_or_else(|| {
3041                    format!(
3042                        "node `{}` is missing output_contract for apply",
3043                        node.node_id
3044                    )
3045                })?;
3046                contract.summary_guidance = Some(guidance);
3047            }
3048            OptimizationMutableField::TimeoutMs => {
3049                node.timeout_ms = Some(
3050                    value
3051                        .as_u64()
3052                        .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
3053                );
3054            }
3055            OptimizationMutableField::RetryPolicyMaxAttempts => {
3056                let next = value.as_i64().ok_or_else(|| {
3057                    "retry_policy.max_attempts apply value must be an integer".to_string()
3058                })?;
3059                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3060                let object = policy.as_object_mut().ok_or_else(|| {
3061                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
3062                })?;
3063                object.insert("max_attempts".to_string(), json!(next));
3064            }
3065            OptimizationMutableField::RetryPolicyRetries => {
3066                let next = value.as_i64().ok_or_else(|| {
3067                    "retry_policy.retries apply value must be an integer".to_string()
3068                })?;
3069                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3070                let object = policy.as_object_mut().ok_or_else(|| {
3071                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
3072                })?;
3073                object.insert("retries".to_string(), json!(next));
3074            }
3075        }
3076        Ok(())
3077    }
3078
3079    fn append_optimization_apply_metadata(
3080        metadata: Option<Value>,
3081        record: Value,
3082    ) -> Result<Option<Value>, String> {
3083        let mut root = match metadata {
3084            Some(Value::Object(map)) => map,
3085            Some(_) => return Err("automation metadata must be a JSON object".to_string()),
3086            None => serde_json::Map::new(),
3087        };
3088        let history = root
3089            .entry("optimization_apply_history".to_string())
3090            .or_insert_with(|| Value::Array(Vec::new()));
3091        let Some(entries) = history.as_array_mut() else {
3092            return Err("optimization_apply_history metadata must be an array".to_string());
3093        };
3094        entries.push(record.clone());
3095        root.insert("last_optimization_apply".to_string(), record);
3096        Ok(Some(Value::Object(root)))
3097    }
3098
3099    fn build_optimization_apply_patch(
3100        baseline: &crate::AutomationV2Spec,
3101        candidate: &crate::AutomationV2Spec,
3102        mutation: &crate::OptimizationValidatedMutation,
3103        approved_at_ms: u64,
3104    ) -> Result<Value, String> {
3105        let baseline_node = baseline
3106            .flow
3107            .nodes
3108            .iter()
3109            .find(|node| node.node_id == mutation.node_id)
3110            .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
3111        let candidate_node = candidate
3112            .flow
3113            .nodes
3114            .iter()
3115            .find(|node| node.node_id == mutation.node_id)
3116            .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
3117        let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
3118        let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
3119        Ok(json!({
3120            "node_id": mutation.node_id,
3121            "field": mutation.field,
3122            "field_path": Self::optimization_mutation_field_path(mutation.field),
3123            "expected_before": before,
3124            "apply_value": after,
3125            "approved_at_ms": approved_at_ms,
3126        }))
3127    }
3128
3129    pub async fn apply_optimization_winner(
3130        &self,
3131        optimization_id: &str,
3132        experiment_id: &str,
3133    ) -> Result<
3134        (
3135            OptimizationCampaignRecord,
3136            OptimizationExperimentRecord,
3137            crate::AutomationV2Spec,
3138        ),
3139        String,
3140    > {
3141        let campaign = self
3142            .get_optimization_campaign(optimization_id)
3143            .await
3144            .ok_or_else(|| "optimization not found".to_string())?;
3145        let mut experiment = self
3146            .get_optimization_experiment(optimization_id, experiment_id)
3147            .await
3148            .ok_or_else(|| "experiment not found".to_string())?;
3149        if experiment.status != OptimizationExperimentStatus::PromotionApproved {
3150            return Err("only approved winner experiments may be applied".to_string());
3151        }
3152        if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
3153            return Err(
3154                "only the latest approved winner may be applied to the live workflow".to_string(),
3155            );
3156        }
3157        let patch = experiment
3158            .metadata
3159            .as_ref()
3160            .and_then(|metadata| metadata.get("apply_patch"))
3161            .cloned()
3162            .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
3163        let node_id = patch
3164            .get("node_id")
3165            .and_then(Value::as_str)
3166            .map(str::to_string)
3167            .filter(|value| !value.is_empty())
3168            .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
3169        let field: OptimizationMutableField = serde_json::from_value(
3170            patch
3171                .get("field")
3172                .cloned()
3173                .ok_or_else(|| "apply_patch.field is required".to_string())?,
3174        )
3175        .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
3176        let expected_before = patch
3177            .get("expected_before")
3178            .cloned()
3179            .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
3180        let apply_value = patch
3181            .get("apply_value")
3182            .cloned()
3183            .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
3184        let mut live = self
3185            .get_automation_v2(&campaign.source_workflow_id)
3186            .await
3187            .ok_or_else(|| "source workflow not found".to_string())?;
3188        let current_value = {
3189            let live_node = live
3190                .flow
3191                .nodes
3192                .iter()
3193                .find(|node| node.node_id == node_id)
3194                .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3195            Self::optimization_node_field_value(live_node, field)?
3196        };
3197        if current_value != expected_before {
3198            return Err(format!(
3199                "live workflow drift detected for node `{node_id}` {}",
3200                Self::optimization_mutation_field_path(field)
3201            ));
3202        }
3203        let live_node = live
3204            .flow
3205            .nodes
3206            .iter_mut()
3207            .find(|node| node.node_id == node_id)
3208            .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3209        Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
3210        let applied_at_ms = now_ms();
3211        let apply_record = json!({
3212            "optimization_id": campaign.optimization_id,
3213            "experiment_id": experiment.experiment_id,
3214            "node_id": node_id,
3215            "field": field,
3216            "field_path": Self::optimization_mutation_field_path(field),
3217            "previous_value": expected_before,
3218            "new_value": apply_value,
3219            "applied_at_ms": applied_at_ms,
3220        });
3221        live.metadata =
3222            Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
3223        let stored_live = self
3224            .put_automation_v2(live)
3225            .await
3226            .map_err(|error| error.to_string())?;
3227        let mut metadata = match experiment.metadata.take() {
3228            Some(Value::Object(map)) => map,
3229            Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
3230            None => serde_json::Map::new(),
3231        };
3232        metadata.insert(
3233            "applied_to_live".to_string(),
3234            json!({
3235                "automation_id": stored_live.automation_id,
3236                "applied_at_ms": applied_at_ms,
3237                "field": field,
3238                "node_id": node_id,
3239            }),
3240        );
3241        experiment.metadata = Some(Value::Object(metadata));
3242        let stored_experiment = self
3243            .put_optimization_experiment(experiment)
3244            .await
3245            .map_err(|error| error.to_string())?;
3246        Ok((campaign, stored_experiment, stored_live))
3247    }
3248
3249    fn optimization_objective_hint(text: &str) -> String {
3250        let cleaned = text
3251            .lines()
3252            .map(str::trim)
3253            .filter(|line| !line.is_empty() && !line.starts_with('#'))
3254            .collect::<Vec<_>>()
3255            .join(" ");
3256        let hint = if cleaned.is_empty() {
3257            "Prioritize validator-complete output with explicit evidence."
3258        } else {
3259            cleaned.as_str()
3260        };
3261        let trimmed = hint.trim();
3262        let clipped = if trimmed.len() > 140 {
3263            trimmed[..140].trim_end()
3264        } else {
3265            trimmed
3266        };
3267        let mut sentence = clipped.trim_end_matches('.').to_string();
3268        if sentence.is_empty() {
3269            sentence = "Prioritize validator-complete output with explicit evidence".to_string();
3270        }
3271        sentence.push('.');
3272        sentence
3273    }
3274
3275    fn build_phase1_candidate_options(
3276        baseline: &crate::AutomationV2Spec,
3277        phase1: &crate::OptimizationPhase1Config,
3278    ) -> Vec<(
3279        crate::AutomationV2Spec,
3280        crate::OptimizationValidatedMutation,
3281    )> {
3282        let mut options = Vec::new();
3283        let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
3284        for (index, node) in baseline.flow.nodes.iter().enumerate() {
3285            if phase1
3286                .mutation_policy
3287                .allowed_text_fields
3288                .contains(&OptimizationMutableField::Objective)
3289            {
3290                let addition = if node.objective.contains(&hint) {
3291                    "Prioritize validator-complete output with concrete evidence."
3292                } else {
3293                    &hint
3294                };
3295                let mut candidate = baseline.clone();
3296                candidate.flow.nodes[index].objective =
3297                    format!("{} {}", node.objective.trim(), addition.trim())
3298                        .trim()
3299                        .to_string();
3300                if let Ok(validated) =
3301                    validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3302                {
3303                    options.push((candidate, validated));
3304                }
3305            }
3306            if phase1
3307                .mutation_policy
3308                .allowed_text_fields
3309                .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
3310            {
3311                if let Some(summary_guidance) = node
3312                    .output_contract
3313                    .as_ref()
3314                    .and_then(|contract| contract.summary_guidance.as_ref())
3315                {
3316                    let addition = if summary_guidance.contains("Cite concrete evidence") {
3317                        "Keep evidence explicit."
3318                    } else {
3319                        "Cite concrete evidence in the summary."
3320                    };
3321                    let mut candidate = baseline.clone();
3322                    if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
3323                        contract.summary_guidance = Some(
3324                            format!("{} {}", summary_guidance.trim(), addition)
3325                                .trim()
3326                                .to_string(),
3327                        );
3328                    }
3329                    if let Ok(validated) =
3330                        validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3331                    {
3332                        options.push((candidate, validated));
3333                    }
3334                }
3335            }
3336            if phase1
3337                .mutation_policy
3338                .allowed_knob_fields
3339                .contains(&OptimizationMutableField::TimeoutMs)
3340            {
3341                if let Some(timeout_ms) = node.timeout_ms {
3342                    let delta_by_percent = ((timeout_ms as f64)
3343                        * phase1.mutation_policy.timeout_delta_percent)
3344                        .round() as u64;
3345                    let delta = delta_by_percent
3346                        .min(phase1.mutation_policy.timeout_delta_ms)
3347                        .max(1);
3348                    let next = timeout_ms
3349                        .saturating_add(delta)
3350                        .min(phase1.mutation_policy.timeout_max_ms);
3351                    if next != timeout_ms {
3352                        let mut candidate = baseline.clone();
3353                        candidate.flow.nodes[index].timeout_ms = Some(next);
3354                        if let Ok(validated) =
3355                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3356                        {
3357                            options.push((candidate, validated));
3358                        }
3359                    }
3360                }
3361            }
3362            if phase1
3363                .mutation_policy
3364                .allowed_knob_fields
3365                .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
3366            {
3367                let current = node
3368                    .retry_policy
3369                    .as_ref()
3370                    .and_then(Value::as_object)
3371                    .and_then(|row| row.get("max_attempts"))
3372                    .and_then(Value::as_i64);
3373                if let Some(before) = current {
3374                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3375                    if next != before {
3376                        let mut candidate = baseline.clone();
3377                        let policy = candidate.flow.nodes[index]
3378                            .retry_policy
3379                            .get_or_insert_with(|| json!({}));
3380                        if let Some(object) = policy.as_object_mut() {
3381                            object.insert("max_attempts".to_string(), json!(next));
3382                        }
3383                        if let Ok(validated) =
3384                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3385                        {
3386                            options.push((candidate, validated));
3387                        }
3388                    }
3389                }
3390            }
3391            if phase1
3392                .mutation_policy
3393                .allowed_knob_fields
3394                .contains(&OptimizationMutableField::RetryPolicyRetries)
3395            {
3396                let current = node
3397                    .retry_policy
3398                    .as_ref()
3399                    .and_then(Value::as_object)
3400                    .and_then(|row| row.get("retries"))
3401                    .and_then(Value::as_i64);
3402                if let Some(before) = current {
3403                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3404                    if next != before {
3405                        let mut candidate = baseline.clone();
3406                        let policy = candidate.flow.nodes[index]
3407                            .retry_policy
3408                            .get_or_insert_with(|| json!({}));
3409                        if let Some(object) = policy.as_object_mut() {
3410                            object.insert("retries".to_string(), json!(next));
3411                        }
3412                        if let Ok(validated) =
3413                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3414                        {
3415                            options.push((candidate, validated));
3416                        }
3417                    }
3418                }
3419            }
3420        }
3421        options
3422    }
3423
3424    async fn maybe_queue_phase1_candidate_experiment(
3425        &self,
3426        campaign: &mut OptimizationCampaignRecord,
3427    ) -> Result<bool, String> {
3428        let Some(phase1) = campaign.phase1.as_ref() else {
3429            return Ok(false);
3430        };
3431        let experiment_count = self
3432            .count_optimization_experiments(&campaign.optimization_id)
3433            .await;
3434        if experiment_count >= phase1.budget.max_experiments as usize {
3435            campaign.status = OptimizationCampaignStatus::Completed;
3436            campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
3437            campaign.updated_at_ms = now_ms();
3438            return Ok(true);
3439        }
3440        if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
3441        {
3442            return Ok(false);
3443        }
3444        let existing = self
3445            .list_optimization_experiments(&campaign.optimization_id)
3446            .await;
3447        let active_eval_exists = existing.iter().any(|experiment| {
3448            matches!(experiment.status, OptimizationExperimentStatus::Draft)
3449                && experiment
3450                    .metadata
3451                    .as_ref()
3452                    .and_then(|metadata| metadata.get("eval_run_id"))
3453                    .and_then(Value::as_str)
3454                    .is_some()
3455        });
3456        if active_eval_exists {
3457            return Ok(false);
3458        }
3459        let existing_hashes = existing
3460            .iter()
3461            .map(|experiment| experiment.candidate_snapshot_hash.clone())
3462            .collect::<std::collections::HashSet<_>>();
3463        let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
3464        let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
3465            !existing_hashes.contains(&optimization_snapshot_hash(candidate))
3466        }) else {
3467            campaign.status = OptimizationCampaignStatus::Completed;
3468            campaign.last_pause_reason = Some(
3469                "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
3470            );
3471            campaign.updated_at_ms = now_ms();
3472            return Ok(true);
3473        };
3474        let eval_run = self
3475            .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
3476            .await
3477            .map_err(|error| error.to_string())?;
3478        let now = now_ms();
3479        let experiment = OptimizationExperimentRecord {
3480            experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
3481            optimization_id: campaign.optimization_id.clone(),
3482            status: OptimizationExperimentStatus::Draft,
3483            candidate_snapshot: candidate_snapshot.clone(),
3484            candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
3485            baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
3486            mutation_summary: Some(mutation.summary.clone()),
3487            metrics: None,
3488            phase1_metrics: None,
3489            promotion_recommendation: None,
3490            promotion_decision: None,
3491            created_at_ms: now,
3492            updated_at_ms: now,
3493            metadata: Some(json!({
3494                "generator": "phase1_deterministic_v1",
3495                "eval_run_id": eval_run.run_id,
3496                "mutation": mutation,
3497            })),
3498        };
3499        self.put_optimization_experiment(experiment)
3500            .await
3501            .map_err(|error| error.to_string())?;
3502        campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
3503        campaign.updated_at_ms = now_ms();
3504        Ok(true)
3505    }
3506
3507    async fn reconcile_phase1_candidate_experiments(
3508        &self,
3509        campaign: &mut OptimizationCampaignRecord,
3510    ) -> Result<bool, String> {
3511        let Some(phase1) = campaign.phase1.as_ref() else {
3512            return Ok(false);
3513        };
3514        let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
3515            return Ok(false);
3516        };
3517        let experiments = self
3518            .list_optimization_experiments(&campaign.optimization_id)
3519            .await;
3520        let mut changed = false;
3521        for mut experiment in experiments {
3522            if experiment.status != OptimizationExperimentStatus::Draft {
3523                continue;
3524            }
3525            let Some(eval_run_id) = experiment
3526                .metadata
3527                .as_ref()
3528                .and_then(|metadata| metadata.get("eval_run_id"))
3529                .and_then(Value::as_str)
3530                .map(str::to_string)
3531            else {
3532                continue;
3533            };
3534            let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
3535                continue;
3536            };
3537            if !Self::automation_run_is_terminal(&run.status) {
3538                continue;
3539            }
3540            if run.status != crate::AutomationRunStatus::Completed {
3541                experiment.status = OptimizationExperimentStatus::Failed;
3542                let mut metadata = match experiment.metadata.take() {
3543                    Some(Value::Object(map)) => map,
3544                    Some(_) => serde_json::Map::new(),
3545                    None => serde_json::Map::new(),
3546                };
3547                metadata.insert(
3548                    "eval_failure".to_string(),
3549                    json!({
3550                        "run_id": run.run_id,
3551                        "status": run.status,
3552                    }),
3553                );
3554                experiment.metadata = Some(Value::Object(metadata));
3555                self.put_optimization_experiment(experiment)
3556                    .await
3557                    .map_err(|error| error.to_string())?;
3558                changed = true;
3559                continue;
3560            }
3561            if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3562                experiment.status = OptimizationExperimentStatus::Failed;
3563                let mut metadata = match experiment.metadata.take() {
3564                    Some(Value::Object(map)) => map,
3565                    Some(_) => serde_json::Map::new(),
3566                    None => serde_json::Map::new(),
3567                };
3568                metadata.insert(
3569                    "eval_failure".to_string(),
3570                    json!({
3571                        "run_id": run.run_id,
3572                        "status": run.status,
3573                        "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
3574                    }),
3575                );
3576                experiment.metadata = Some(Value::Object(metadata));
3577                self.put_optimization_experiment(experiment)
3578                    .await
3579                    .map_err(|error| error.to_string())?;
3580                changed = true;
3581                continue;
3582            }
3583            let metrics =
3584                match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
3585                    Ok(metrics) => metrics,
3586                    Err(error) => {
3587                        experiment.status = OptimizationExperimentStatus::Failed;
3588                        let mut metadata = match experiment.metadata.take() {
3589                            Some(Value::Object(map)) => map,
3590                            Some(_) => serde_json::Map::new(),
3591                            None => serde_json::Map::new(),
3592                        };
3593                        metadata.insert(
3594                            "eval_failure".to_string(),
3595                            json!({
3596                                "run_id": run.run_id,
3597                                "status": run.status,
3598                                "reason": error,
3599                            }),
3600                        );
3601                        experiment.metadata = Some(Value::Object(metadata));
3602                        self.put_optimization_experiment(experiment)
3603                            .await
3604                            .map_err(|error| error.to_string())?;
3605                        changed = true;
3606                        continue;
3607                    }
3608                };
3609            let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
3610            experiment.phase1_metrics = Some(metrics.clone());
3611            experiment.metrics = Some(json!({
3612                "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
3613                "unmet_requirement_count": metrics.unmet_requirement_count,
3614                "blocked_node_rate": metrics.blocked_node_rate,
3615                "budget_within_limits": metrics.budget_within_limits,
3616            }));
3617            experiment.promotion_recommendation = Some(
3618                match decision.decision {
3619                    OptimizationPromotionDecisionKind::Promote => "promote",
3620                    OptimizationPromotionDecisionKind::Discard => "discard",
3621                    OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3622                        "needs_operator_review"
3623                    }
3624                }
3625                .to_string(),
3626            );
3627            experiment.promotion_decision = Some(decision.clone());
3628            match decision.decision {
3629                OptimizationPromotionDecisionKind::Promote
3630                | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3631                    experiment.status = OptimizationExperimentStatus::PromotionRecommended;
3632                    campaign.pending_promotion_experiment_id =
3633                        Some(experiment.experiment_id.clone());
3634                    campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
3635                    campaign.last_pause_reason = Some(decision.reason.clone());
3636                }
3637                OptimizationPromotionDecisionKind::Discard => {
3638                    experiment.status = OptimizationExperimentStatus::Discarded;
3639                    if campaign.status == OptimizationCampaignStatus::Running {
3640                        campaign.last_pause_reason = Some(decision.reason.clone());
3641                    }
3642                }
3643            }
3644            self.put_optimization_experiment(experiment)
3645                .await
3646                .map_err(|error| error.to_string())?;
3647            changed = true;
3648        }
3649        let refreshed = self
3650            .list_optimization_experiments(&campaign.optimization_id)
3651            .await;
3652        let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
3653        if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
3654            && phase1.budget.max_consecutive_failures > 0
3655        {
3656            campaign.status = OptimizationCampaignStatus::Failed;
3657            campaign.last_pause_reason = Some(format!(
3658                "phase 1 candidate evaluations reached {} consecutive failures",
3659                consecutive_failures
3660            ));
3661            changed = true;
3662        }
3663        Ok(changed)
3664    }
3665
3666    async fn reconcile_pending_baseline_replays(
3667        &self,
3668        campaign: &mut OptimizationCampaignRecord,
3669    ) -> Result<bool, String> {
3670        let Some(phase1) = campaign.phase1.as_ref() else {
3671            return Ok(false);
3672        };
3673        let mut changed = false;
3674        let mut remaining = Vec::new();
3675        for run_id in campaign.pending_baseline_run_ids.clone() {
3676            let Some(run) = self.get_automation_v2_run(&run_id).await else {
3677                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3678                campaign.last_pause_reason = Some(format!(
3679                    "baseline replay run `{run_id}` was not found during optimization reconciliation"
3680                ));
3681                changed = true;
3682                continue;
3683            };
3684            if !Self::automation_run_is_terminal(&run.status) {
3685                remaining.push(run_id);
3686                continue;
3687            }
3688            if run.status != crate::AutomationRunStatus::Completed {
3689                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3690                campaign.last_pause_reason = Some(format!(
3691                    "baseline replay run `{}` finished with status `{:?}`",
3692                    run.run_id, run.status
3693                ));
3694                changed = true;
3695                continue;
3696            }
3697            if run.automation_id != campaign.source_workflow_id {
3698                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3699                campaign.last_pause_reason = Some(
3700                    "baseline replay run must belong to the optimization source workflow"
3701                        .to_string(),
3702                );
3703                changed = true;
3704                continue;
3705            }
3706            let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3707                "baseline replay run must include an automation snapshot".to_string()
3708            })?;
3709            if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3710                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3711                campaign.last_pause_reason = Some(
3712                    "baseline replay run does not match the current campaign baseline snapshot"
3713                        .to_string(),
3714                );
3715                changed = true;
3716                continue;
3717            }
3718            let metrics =
3719                derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3720            let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3721            campaign
3722                .baseline_replays
3723                .push(OptimizationBaselineReplayRecord {
3724                    replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3725                    automation_run_id: Some(run.run_id.clone()),
3726                    phase1_metrics: metrics,
3727                    validator_case_outcomes,
3728                    experiment_count_at_recording: self
3729                        .count_optimization_experiments(&campaign.optimization_id)
3730                        .await as u64,
3731                    recorded_at_ms: now_ms(),
3732                });
3733            changed = true;
3734        }
3735        if remaining != campaign.pending_baseline_run_ids {
3736            campaign.pending_baseline_run_ids = remaining;
3737            changed = true;
3738        }
3739        Ok(changed)
3740    }
3741
3742    pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
3743        let campaigns = self.list_optimization_campaigns().await;
3744        let mut updated = 0usize;
3745        for campaign in campaigns {
3746            let Some(mut latest) = self
3747                .get_optimization_campaign(&campaign.optimization_id)
3748                .await
3749            else {
3750                continue;
3751            };
3752            let Some(phase1) = latest.phase1.clone() else {
3753                continue;
3754            };
3755            let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
3756            changed |= self
3757                .reconcile_phase1_candidate_experiments(&mut latest)
3758                .await?;
3759            let experiment_count = self
3760                .count_optimization_experiments(&latest.optimization_id)
3761                .await;
3762            if latest.pending_baseline_run_ids.is_empty() {
3763                if phase1_baseline_replay_due(
3764                    &latest.baseline_replays,
3765                    latest.pending_baseline_run_ids.len(),
3766                    &phase1,
3767                    experiment_count,
3768                    now_ms(),
3769                ) {
3770                    if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
3771                        latest.status = OptimizationCampaignStatus::Draft;
3772                        changed = true;
3773                    }
3774                } else if latest.baseline_replays.len()
3775                    >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
3776                {
3777                    match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
3778                        Ok(metrics) => {
3779                            if latest.baseline_metrics.as_ref() != Some(&metrics) {
3780                                latest.baseline_metrics = Some(metrics);
3781                                changed = true;
3782                            }
3783                            if matches!(
3784                                latest.status,
3785                                OptimizationCampaignStatus::Draft
3786                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
3787                            ) || (latest.status == OptimizationCampaignStatus::Running
3788                                && latest.last_pause_reason.is_some())
3789                            {
3790                                latest.status = OptimizationCampaignStatus::Running;
3791                                latest.last_pause_reason = None;
3792                                changed = true;
3793                            }
3794                        }
3795                        Err(error) => {
3796                            if matches!(
3797                                latest.status,
3798                                OptimizationCampaignStatus::Draft
3799                                    | OptimizationCampaignStatus::Running
3800                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
3801                            ) && (latest.status
3802                                != OptimizationCampaignStatus::PausedEvaluatorUnstable
3803                                || latest.last_pause_reason.as_deref() != Some(error.as_str()))
3804                            {
3805                                latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3806                                latest.last_pause_reason = Some(error);
3807                                changed = true;
3808                            }
3809                        }
3810                    }
3811                }
3812            } else if latest.last_pause_reason.as_deref()
3813                != Some("waiting for phase 1 baseline replay completion")
3814            {
3815                latest.last_pause_reason =
3816                    Some("waiting for phase 1 baseline replay completion".to_string());
3817                changed = true;
3818            }
3819            if latest.status == OptimizationCampaignStatus::Running
3820                && latest.pending_baseline_run_ids.is_empty()
3821            {
3822                changed |= self
3823                    .maybe_queue_phase1_candidate_experiment(&mut latest)
3824                    .await?;
3825            }
3826            if changed {
3827                self.put_optimization_campaign(latest)
3828                    .await
3829                    .map_err(|error| error.to_string())?;
3830                updated = updated.saturating_add(1);
3831            }
3832        }
3833        Ok(updated)
3834    }
3835
3836    async fn maybe_queue_phase1_baseline_replay(
3837        &self,
3838        campaign: &mut OptimizationCampaignRecord,
3839    ) -> Result<bool, String> {
3840        let Some(phase1) = campaign.phase1.as_ref() else {
3841            return Ok(false);
3842        };
3843        if !campaign.pending_baseline_run_ids.is_empty() {
3844            campaign.last_pause_reason =
3845                Some("waiting for phase 1 baseline replay completion".into());
3846            campaign.updated_at_ms = now_ms();
3847            return Ok(true);
3848        }
3849        let experiment_count = self
3850            .count_optimization_experiments(&campaign.optimization_id)
3851            .await;
3852        if !phase1_baseline_replay_due(
3853            &campaign.baseline_replays,
3854            campaign.pending_baseline_run_ids.len(),
3855            phase1,
3856            experiment_count,
3857            now_ms(),
3858        ) {
3859            return Ok(false);
3860        }
3861        let replay_run = self
3862            .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
3863            .await
3864            .map_err(|error| error.to_string())?;
3865        if !campaign
3866            .pending_baseline_run_ids
3867            .iter()
3868            .any(|value| value == &replay_run.run_id)
3869        {
3870            campaign
3871                .pending_baseline_run_ids
3872                .push(replay_run.run_id.clone());
3873        }
3874        campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
3875        campaign.updated_at_ms = now_ms();
3876        Ok(true)
3877    }
3878
3879    async fn maybe_queue_initial_phase1_baseline_replay(
3880        &self,
3881        campaign: &mut OptimizationCampaignRecord,
3882    ) -> Result<bool, String> {
3883        let Some(phase1) = campaign.phase1.as_ref() else {
3884            return Ok(false);
3885        };
3886        let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
3887        if campaign.baseline_replays.len() >= required_runs {
3888            return Ok(false);
3889        }
3890        self.maybe_queue_phase1_baseline_replay(campaign).await
3891    }
3892
3893    pub async fn apply_optimization_action(
3894        &self,
3895        optimization_id: &str,
3896        action: &str,
3897        experiment_id: Option<&str>,
3898        run_id: Option<&str>,
3899        reason: Option<&str>,
3900    ) -> Result<OptimizationCampaignRecord, String> {
3901        let normalized = action.trim().to_ascii_lowercase();
3902        let mut campaign = self
3903            .get_optimization_campaign(optimization_id)
3904            .await
3905            .ok_or_else(|| "optimization not found".to_string())?;
3906        match normalized.as_str() {
3907            "start" => {
3908                if campaign.phase1.is_some() {
3909                    if self
3910                        .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3911                        .await?
3912                    {
3913                        campaign.status = OptimizationCampaignStatus::Draft;
3914                    } else {
3915                        let phase1 = campaign
3916                            .phase1
3917                            .as_ref()
3918                            .ok_or_else(|| "phase 1 config is required".to_string())?;
3919                        match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
3920                            Ok(metrics) => {
3921                                campaign.baseline_metrics = Some(metrics);
3922                                campaign.status = OptimizationCampaignStatus::Running;
3923                                campaign.last_pause_reason = None;
3924                            }
3925                            Err(error) => {
3926                                campaign.status =
3927                                    OptimizationCampaignStatus::PausedEvaluatorUnstable;
3928                                campaign.last_pause_reason = Some(error);
3929                            }
3930                        }
3931                    }
3932                } else {
3933                    campaign.status = OptimizationCampaignStatus::Running;
3934                    campaign.last_pause_reason = None;
3935                }
3936            }
3937            "pause" => {
3938                campaign.status = OptimizationCampaignStatus::PausedManual;
3939                campaign.last_pause_reason = reason
3940                    .map(str::trim)
3941                    .filter(|value| !value.is_empty())
3942                    .map(str::to_string);
3943            }
3944            "resume" => {
3945                if self
3946                    .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3947                    .await?
3948                {
3949                    campaign.status = OptimizationCampaignStatus::Draft;
3950                } else {
3951                    campaign.status = OptimizationCampaignStatus::Running;
3952                    campaign.last_pause_reason = None;
3953                }
3954            }
3955            "queue_baseline_replay" => {
3956                let replay_run = self
3957                    .create_automation_v2_run(
3958                        &campaign.baseline_snapshot,
3959                        "optimization_baseline_replay",
3960                    )
3961                    .await
3962                    .map_err(|error| error.to_string())?;
3963                if !campaign
3964                    .pending_baseline_run_ids
3965                    .iter()
3966                    .any(|value| value == &replay_run.run_id)
3967                {
3968                    campaign
3969                        .pending_baseline_run_ids
3970                        .push(replay_run.run_id.clone());
3971                }
3972                campaign.updated_at_ms = now_ms();
3973            }
3974            "record_baseline_replay" => {
3975                let run_id = run_id
3976                    .map(str::trim)
3977                    .filter(|value| !value.is_empty())
3978                    .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
3979                let phase1 = campaign
3980                    .phase1
3981                    .as_ref()
3982                    .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
3983                let run = self
3984                    .get_automation_v2_run(run_id)
3985                    .await
3986                    .ok_or_else(|| "automation run not found".to_string())?;
3987                if run.automation_id != campaign.source_workflow_id {
3988                    return Err(
3989                        "baseline replay run must belong to the optimization source workflow"
3990                            .to_string(),
3991                    );
3992                }
3993                let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3994                    "baseline replay run must include an automation snapshot".to_string()
3995                })?;
3996                if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3997                    return Err(
3998                        "baseline replay run does not match the current campaign baseline snapshot"
3999                            .to_string(),
4000                    );
4001                }
4002                let metrics =
4003                    derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4004                let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4005                campaign
4006                    .baseline_replays
4007                    .push(OptimizationBaselineReplayRecord {
4008                        replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4009                        automation_run_id: Some(run.run_id.clone()),
4010                        phase1_metrics: metrics,
4011                        validator_case_outcomes,
4012                        experiment_count_at_recording: self
4013                            .count_optimization_experiments(&campaign.optimization_id)
4014                            .await as u64,
4015                        recorded_at_ms: now_ms(),
4016                    });
4017                campaign
4018                    .pending_baseline_run_ids
4019                    .retain(|value| value != run_id);
4020                campaign.updated_at_ms = now_ms();
4021            }
4022            "approve_winner" => {
4023                let experiment_id = experiment_id
4024                    .map(str::trim)
4025                    .filter(|value| !value.is_empty())
4026                    .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
4027                let mut experiment = self
4028                    .get_optimization_experiment(optimization_id, experiment_id)
4029                    .await
4030                    .ok_or_else(|| "experiment not found".to_string())?;
4031                if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
4032                    return Err(
4033                        "experiment baseline_snapshot_hash does not match current campaign baseline"
4034                            .to_string(),
4035                    );
4036                }
4037                if let Some(phase1) = campaign.phase1.as_ref() {
4038                    let validated = validate_phase1_candidate_mutation(
4039                        &campaign.baseline_snapshot,
4040                        &experiment.candidate_snapshot,
4041                        phase1,
4042                    )?;
4043                    if experiment.mutation_summary.is_none() {
4044                        experiment.mutation_summary = Some(validated.summary.clone());
4045                    }
4046                    let approved_at_ms = now_ms();
4047                    let apply_patch = Self::build_optimization_apply_patch(
4048                        &campaign.baseline_snapshot,
4049                        &experiment.candidate_snapshot,
4050                        &validated,
4051                        approved_at_ms,
4052                    )?;
4053                    let mut metadata = match experiment.metadata.take() {
4054                        Some(Value::Object(map)) => map,
4055                        Some(_) => {
4056                            return Err("experiment metadata must be a JSON object".to_string());
4057                        }
4058                        None => serde_json::Map::new(),
4059                    };
4060                    metadata.insert("apply_patch".to_string(), apply_patch);
4061                    experiment.metadata = Some(Value::Object(metadata));
4062                    if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
4063                        let candidate_metrics = experiment
4064                            .phase1_metrics
4065                            .clone()
4066                            .or_else(|| {
4067                                experiment
4068                                    .metrics
4069                                    .as_ref()
4070                                    .and_then(|metrics| parse_phase1_metrics(metrics).ok())
4071                            })
4072                            .ok_or_else(|| {
4073                                "phase 1 candidate is missing promotion metrics".to_string()
4074                            })?;
4075                        let decision =
4076                            evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
4077                        experiment.promotion_recommendation = Some(
4078                            match decision.decision {
4079                                OptimizationPromotionDecisionKind::Promote => "promote",
4080                                OptimizationPromotionDecisionKind::Discard => "discard",
4081                                OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4082                                    "needs_operator_review"
4083                                }
4084                            }
4085                            .to_string(),
4086                        );
4087                        experiment.promotion_decision = Some(decision.clone());
4088                        if decision.decision != OptimizationPromotionDecisionKind::Promote {
4089                            let _ = self
4090                                .put_optimization_experiment(experiment)
4091                                .await
4092                                .map_err(|e| e.to_string())?;
4093                            return Err(decision.reason);
4094                        }
4095                        campaign.baseline_metrics = Some(candidate_metrics);
4096                    }
4097                }
4098                campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
4099                campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
4100                campaign.baseline_replays.clear();
4101                campaign.pending_baseline_run_ids.clear();
4102                campaign.pending_promotion_experiment_id = None;
4103                campaign.status = OptimizationCampaignStatus::Draft;
4104                campaign.last_pause_reason = None;
4105                experiment.status = OptimizationExperimentStatus::PromotionApproved;
4106                let _ = self
4107                    .put_optimization_experiment(experiment)
4108                    .await
4109                    .map_err(|e| e.to_string())?;
4110            }
4111            "reject_winner" => {
4112                let experiment_id = experiment_id
4113                    .map(str::trim)
4114                    .filter(|value| !value.is_empty())
4115                    .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
4116                let mut experiment = self
4117                    .get_optimization_experiment(optimization_id, experiment_id)
4118                    .await
4119                    .ok_or_else(|| "experiment not found".to_string())?;
4120                campaign.pending_promotion_experiment_id = None;
4121                campaign.status = OptimizationCampaignStatus::Draft;
4122                campaign.last_pause_reason = reason
4123                    .map(str::trim)
4124                    .filter(|value| !value.is_empty())
4125                    .map(str::to_string);
4126                experiment.status = OptimizationExperimentStatus::PromotionRejected;
4127                let _ = self
4128                    .put_optimization_experiment(experiment)
4129                    .await
4130                    .map_err(|e| e.to_string())?;
4131            }
4132            _ => return Err("unsupported optimization action".to_string()),
4133        }
4134        self.put_optimization_campaign(campaign)
4135            .await
4136            .map_err(|e| e.to_string())
4137    }
4138
4139    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
4140        let mut rows = self
4141            .automations_v2
4142            .read()
4143            .await
4144            .values()
4145            .cloned()
4146            .collect::<Vec<_>>();
4147        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
4148        rows
4149    }
4150
4151    pub async fn delete_automation_v2(
4152        &self,
4153        automation_id: &str,
4154    ) -> anyhow::Result<Option<AutomationV2Spec>> {
4155        let removed = self.automations_v2.write().await.remove(automation_id);
4156        let removed_run_count = {
4157            let mut runs = self.automation_v2_runs.write().await;
4158            let before = runs.len();
4159            runs.retain(|_, run| run.automation_id != automation_id);
4160            before.saturating_sub(runs.len())
4161        };
4162        self.persist_automations_v2().await?;
4163        if removed_run_count > 0 {
4164            self.persist_automation_v2_runs().await?;
4165        }
4166        self.verify_automation_v2_persisted(automation_id, false)
4167            .await?;
4168        Ok(removed)
4169    }
4170
4171    pub async fn create_automation_v2_run(
4172        &self,
4173        automation: &AutomationV2Spec,
4174        trigger_type: &str,
4175    ) -> anyhow::Result<AutomationV2RunRecord> {
4176        let now = now_ms();
4177        let pending_nodes = automation
4178            .flow
4179            .nodes
4180            .iter()
4181            .map(|n| n.node_id.clone())
4182            .collect::<Vec<_>>();
4183        let run = AutomationV2RunRecord {
4184            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4185            automation_id: automation.automation_id.clone(),
4186            trigger_type: trigger_type.to_string(),
4187            status: AutomationRunStatus::Queued,
4188            created_at_ms: now,
4189            updated_at_ms: now,
4190            started_at_ms: None,
4191            finished_at_ms: None,
4192            active_session_ids: Vec::new(),
4193            latest_session_id: None,
4194            active_instance_ids: Vec::new(),
4195            checkpoint: AutomationRunCheckpoint {
4196                completed_nodes: Vec::new(),
4197                pending_nodes,
4198                node_outputs: std::collections::HashMap::new(),
4199                node_attempts: std::collections::HashMap::new(),
4200                blocked_nodes: Vec::new(),
4201                awaiting_gate: None,
4202                gate_history: Vec::new(),
4203                lifecycle_history: Vec::new(),
4204                last_failure: None,
4205            },
4206            runtime_context: automation
4207                .runtime_context_materialization()
4208                .or_else(|| automation.approved_plan_runtime_context_materialization()),
4209            automation_snapshot: Some(automation.clone()),
4210            pause_reason: None,
4211            resume_reason: None,
4212            detail: None,
4213            stop_kind: None,
4214            stop_reason: None,
4215            prompt_tokens: 0,
4216            completion_tokens: 0,
4217            total_tokens: 0,
4218            estimated_cost_usd: 0.0,
4219            scheduler: None,
4220        };
4221        self.automation_v2_runs
4222            .write()
4223            .await
4224            .insert(run.run_id.clone(), run.clone());
4225        self.persist_automation_v2_runs().await?;
4226        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4227            .await
4228            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4229        Ok(run)
4230    }
4231
4232    pub async fn create_automation_v2_dry_run(
4233        &self,
4234        automation: &AutomationV2Spec,
4235        trigger_type: &str,
4236    ) -> anyhow::Result<AutomationV2RunRecord> {
4237        let now = now_ms();
4238        let run = AutomationV2RunRecord {
4239            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4240            automation_id: automation.automation_id.clone(),
4241            trigger_type: format!("{trigger_type}_dry_run"),
4242            status: AutomationRunStatus::Completed,
4243            created_at_ms: now,
4244            updated_at_ms: now,
4245            started_at_ms: Some(now),
4246            finished_at_ms: Some(now),
4247            active_session_ids: Vec::new(),
4248            latest_session_id: None,
4249            active_instance_ids: Vec::new(),
4250            checkpoint: AutomationRunCheckpoint {
4251                completed_nodes: Vec::new(),
4252                pending_nodes: Vec::new(),
4253                node_outputs: std::collections::HashMap::new(),
4254                node_attempts: std::collections::HashMap::new(),
4255                blocked_nodes: Vec::new(),
4256                awaiting_gate: None,
4257                gate_history: Vec::new(),
4258                lifecycle_history: Vec::new(),
4259                last_failure: None,
4260            },
4261            runtime_context: automation
4262                .runtime_context_materialization()
4263                .or_else(|| automation.approved_plan_runtime_context_materialization()),
4264            automation_snapshot: Some(automation.clone()),
4265            pause_reason: None,
4266            resume_reason: None,
4267            detail: Some("dry_run".to_string()),
4268            stop_kind: None,
4269            stop_reason: None,
4270            prompt_tokens: 0,
4271            completion_tokens: 0,
4272            total_tokens: 0,
4273            estimated_cost_usd: 0.0,
4274            scheduler: None,
4275        };
4276        self.automation_v2_runs
4277            .write()
4278            .await
4279            .insert(run.run_id.clone(), run.clone());
4280        self.persist_automation_v2_runs().await?;
4281        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4282            .await
4283            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4284        Ok(run)
4285    }
4286
4287    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
4288        self.automation_v2_runs.read().await.get(run_id).cloned()
4289    }
4290
4291    pub async fn list_automation_v2_runs(
4292        &self,
4293        automation_id: Option<&str>,
4294        limit: usize,
4295    ) -> Vec<AutomationV2RunRecord> {
4296        let mut rows = self
4297            .automation_v2_runs
4298            .read()
4299            .await
4300            .values()
4301            .filter(|row| {
4302                if let Some(id) = automation_id {
4303                    row.automation_id == id
4304                } else {
4305                    true
4306                }
4307            })
4308            .cloned()
4309            .collect::<Vec<_>>();
4310        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
4311        rows.truncate(limit.clamp(1, 500));
4312        rows
4313    }
4314
4315    async fn automation_v2_run_workspace_root(
4316        &self,
4317        run: &AutomationV2RunRecord,
4318    ) -> Option<String> {
4319        if let Some(root) = run
4320            .automation_snapshot
4321            .as_ref()
4322            .and_then(|automation| automation.workspace_root.as_ref())
4323            .map(|value| value.trim())
4324            .filter(|value| !value.is_empty())
4325        {
4326            return Some(root.to_string());
4327        }
4328        self.get_automation_v2(&run.automation_id)
4329            .await
4330            .and_then(|automation| automation.workspace_root)
4331            .map(|value| value.trim().to_string())
4332            .filter(|value| !value.is_empty())
4333    }
4334
4335    async fn sync_automation_scheduler_for_run_transition(
4336        &self,
4337        previous_status: AutomationRunStatus,
4338        run: &AutomationV2RunRecord,
4339    ) {
4340        let had_capacity = automation_status_uses_scheduler_capacity(&previous_status);
4341        let has_capacity = automation_status_uses_scheduler_capacity(&run.status);
4342        let had_lock = automation_status_holds_workspace_lock(&previous_status);
4343        let has_lock = automation_status_holds_workspace_lock(&run.status);
4344        let workspace_root = self.automation_v2_run_workspace_root(run).await;
4345        let mut scheduler = self.automation_scheduler.write().await;
4346
4347        if (had_capacity || had_lock) && !has_capacity && !has_lock {
4348            scheduler.release_run(&run.run_id);
4349            return;
4350        }
4351        if had_capacity && !has_capacity {
4352            scheduler.release_capacity(&run.run_id);
4353        }
4354        if had_lock && !has_lock {
4355            scheduler.release_workspace(&run.run_id);
4356        }
4357        if !had_lock && has_lock {
4358            if has_capacity {
4359                scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4360            } else {
4361                scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4362            }
4363            return;
4364        }
4365        if !had_capacity && has_capacity {
4366            scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4367        }
4368    }
4369
4370    pub async fn recover_in_flight_runs(&self) -> usize {
4371        let runs = self
4372            .automation_v2_runs
4373            .read()
4374            .await
4375            .values()
4376            .cloned()
4377            .collect::<Vec<_>>();
4378        let mut recovered = 0usize;
4379        for run in runs {
4380            match run.status {
4381                AutomationRunStatus::Running => {
4382                    let detail = "automation run interrupted by server restart".to_string();
4383                    if self
4384                        .update_automation_v2_run(&run.run_id, |row| {
4385                            row.status = AutomationRunStatus::Failed;
4386                            row.detail = Some(detail.clone());
4387                            row.stop_kind = Some(AutomationStopKind::ServerRestart);
4388                            row.stop_reason = Some(detail.clone());
4389                            automation::record_automation_lifecycle_event(
4390                                row,
4391                                "run_failed_server_restart",
4392                                Some(detail.clone()),
4393                                Some(AutomationStopKind::ServerRestart),
4394                            );
4395                        })
4396                        .await
4397                        .is_some()
4398                    {
4399                        recovered += 1;
4400                    }
4401                }
4402                AutomationRunStatus::Paused
4403                | AutomationRunStatus::Pausing
4404                | AutomationRunStatus::AwaitingApproval => {
4405                    let workspace_root = self.automation_v2_run_workspace_root(&run).await;
4406                    let mut scheduler = self.automation_scheduler.write().await;
4407                    scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4408                    for (node_id, output) in &run.checkpoint.node_outputs {
4409                        if let Some((path, content_digest)) =
4410                            automation::node_output::automation_output_validated_artifact(output)
4411                        {
4412                            scheduler.preexisting_registry.register_validated(
4413                                &run.run_id,
4414                                node_id,
4415                                automation::scheduler::ValidatedArtifact {
4416                                    path,
4417                                    content_digest,
4418                                },
4419                            );
4420                        }
4421                    }
4422                }
4423                _ => {}
4424            }
4425        }
4426        recovered
4427    }
4428
4429    pub fn is_automation_scheduler_stopping(&self) -> bool {
4430        self.automation_scheduler_stopping.load(Ordering::Relaxed)
4431    }
4432
4433    pub fn set_automation_scheduler_stopping(&self, stopping: bool) {
4434        self.automation_scheduler_stopping
4435            .store(stopping, Ordering::Relaxed);
4436    }
4437
4438    pub async fn fail_running_automation_runs_for_shutdown(&self) -> usize {
4439        let run_ids = self
4440            .automation_v2_runs
4441            .read()
4442            .await
4443            .values()
4444            .filter(|run| matches!(run.status, AutomationRunStatus::Running))
4445            .map(|run| run.run_id.clone())
4446            .collect::<Vec<_>>();
4447        let mut failed = 0usize;
4448        for run_id in run_ids {
4449            let detail = "automation run stopped during server shutdown".to_string();
4450            if self
4451                .update_automation_v2_run(&run_id, |row| {
4452                    row.status = AutomationRunStatus::Failed;
4453                    row.detail = Some(detail.clone());
4454                    row.stop_kind = Some(AutomationStopKind::Shutdown);
4455                    row.stop_reason = Some(detail.clone());
4456                    automation::record_automation_lifecycle_event(
4457                        row,
4458                        "run_failed_shutdown",
4459                        Some(detail.clone()),
4460                        Some(AutomationStopKind::Shutdown),
4461                    );
4462                })
4463                .await
4464                .is_some()
4465            {
4466                failed += 1;
4467            }
4468        }
4469        failed
4470    }
4471
4472    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
4473        let run_id = self
4474            .automation_v2_runs
4475            .read()
4476            .await
4477            .values()
4478            .filter(|row| row.status == AutomationRunStatus::Queued)
4479            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
4480            .map(|row| row.run_id.clone())?;
4481        self.claim_specific_automation_v2_run(&run_id).await
4482    }
4483    pub async fn claim_specific_automation_v2_run(
4484        &self,
4485        run_id: &str,
4486    ) -> Option<AutomationV2RunRecord> {
4487        let mut guard = self.automation_v2_runs.write().await;
4488        let run = guard.get_mut(run_id)?;
4489        if run.status != AutomationRunStatus::Queued {
4490            return None;
4491        }
4492        let previous_status = run.status.clone();
4493        let now = now_ms();
4494        if run.runtime_context.is_none() {
4495            run.runtime_context = run.automation_snapshot.as_ref().and_then(|automation| {
4496                automation
4497                    .runtime_context_materialization()
4498                    .or_else(|| automation.approved_plan_runtime_context_materialization())
4499            });
4500        }
4501        if run.runtime_context.is_none() {
4502            let previous_status = run.status.clone();
4503            let now = now_ms();
4504            run.status = AutomationRunStatus::Failed;
4505            run.updated_at_ms = now;
4506            run.finished_at_ms.get_or_insert(now);
4507            run.scheduler = None;
4508            run.detail = Some("runtime context partition missing for automation run".to_string());
4509            let claimed = run.clone();
4510            drop(guard);
4511            self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
4512                .await;
4513            let _ = self.persist_automation_v2_runs().await;
4514            return None;
4515        }
4516        run.status = AutomationRunStatus::Running;
4517        run.updated_at_ms = now;
4518        run.started_at_ms.get_or_insert(now);
4519        run.scheduler = None;
4520        let claimed = run.clone();
4521        drop(guard);
4522        self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
4523            .await;
4524        let _ = self.persist_automation_v2_runs().await;
4525        Some(claimed)
4526    }
4527    pub async fn update_automation_v2_run(
4528        &self,
4529        run_id: &str,
4530        update: impl FnOnce(&mut AutomationV2RunRecord),
4531    ) -> Option<AutomationV2RunRecord> {
4532        let mut guard = self.automation_v2_runs.write().await;
4533        let run = guard.get_mut(run_id)?;
4534        let previous_status = run.status.clone();
4535        update(run);
4536        if run.status != AutomationRunStatus::Queued {
4537            run.scheduler = None;
4538        }
4539        run.updated_at_ms = now_ms();
4540        if matches!(
4541            run.status,
4542            AutomationRunStatus::Completed
4543                | AutomationRunStatus::Blocked
4544                | AutomationRunStatus::Failed
4545                | AutomationRunStatus::Cancelled
4546        ) {
4547            run.finished_at_ms.get_or_insert_with(now_ms);
4548        }
4549        let out = run.clone();
4550        drop(guard);
4551        self.sync_automation_scheduler_for_run_transition(previous_status, &out)
4552            .await;
4553        let _ = self.persist_automation_v2_runs().await;
4554        Some(out)
4555    }
4556
4557    pub async fn set_automation_v2_run_scheduler_metadata(
4558        &self,
4559        run_id: &str,
4560        meta: automation::SchedulerMetadata,
4561    ) -> Option<AutomationV2RunRecord> {
4562        self.update_automation_v2_run(run_id, |row| {
4563            row.scheduler = Some(meta);
4564        })
4565        .await
4566    }
4567
4568    pub async fn clear_automation_v2_run_scheduler_metadata(
4569        &self,
4570        run_id: &str,
4571    ) -> Option<AutomationV2RunRecord> {
4572        self.update_automation_v2_run(run_id, |row| {
4573            row.scheduler = None;
4574        })
4575        .await
4576    }
4577
4578    pub async fn add_automation_v2_session(
4579        &self,
4580        run_id: &str,
4581        session_id: &str,
4582    ) -> Option<AutomationV2RunRecord> {
4583        let updated = self
4584            .update_automation_v2_run(run_id, |row| {
4585                if !row.active_session_ids.iter().any(|id| id == session_id) {
4586                    row.active_session_ids.push(session_id.to_string());
4587                }
4588                row.latest_session_id = Some(session_id.to_string());
4589            })
4590            .await;
4591        self.automation_v2_session_runs
4592            .write()
4593            .await
4594            .insert(session_id.to_string(), run_id.to_string());
4595        updated
4596    }
4597
4598    pub async fn clear_automation_v2_session(
4599        &self,
4600        run_id: &str,
4601        session_id: &str,
4602    ) -> Option<AutomationV2RunRecord> {
4603        self.automation_v2_session_runs
4604            .write()
4605            .await
4606            .remove(session_id);
4607        self.update_automation_v2_run(run_id, |row| {
4608            row.active_session_ids.retain(|id| id != session_id);
4609        })
4610        .await
4611    }
4612
4613    pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
4614        let mut guard = self.automation_v2_session_runs.write().await;
4615        for session_id in session_ids {
4616            guard.remove(session_id);
4617        }
4618    }
4619
4620    pub async fn add_automation_v2_instance(
4621        &self,
4622        run_id: &str,
4623        instance_id: &str,
4624    ) -> Option<AutomationV2RunRecord> {
4625        self.update_automation_v2_run(run_id, |row| {
4626            if !row.active_instance_ids.iter().any(|id| id == instance_id) {
4627                row.active_instance_ids.push(instance_id.to_string());
4628            }
4629        })
4630        .await
4631    }
4632
4633    pub async fn clear_automation_v2_instance(
4634        &self,
4635        run_id: &str,
4636        instance_id: &str,
4637    ) -> Option<AutomationV2RunRecord> {
4638        self.update_automation_v2_run(run_id, |row| {
4639            row.active_instance_ids.retain(|id| id != instance_id);
4640        })
4641        .await
4642    }
4643
4644    pub async fn apply_provider_usage_to_runs(
4645        &self,
4646        session_id: &str,
4647        prompt_tokens: u64,
4648        completion_tokens: u64,
4649        total_tokens: u64,
4650    ) {
4651        if let Some(policy) = self.routine_session_policy(session_id).await {
4652            let rate = self.token_cost_per_1k_usd.max(0.0);
4653            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4654            let mut guard = self.routine_runs.write().await;
4655            if let Some(run) = guard.get_mut(&policy.run_id) {
4656                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4657                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4658                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4659                run.estimated_cost_usd += delta_cost;
4660                run.updated_at_ms = now_ms();
4661            }
4662            drop(guard);
4663            let _ = self.persist_routine_runs().await;
4664        }
4665
4666        let maybe_v2_run_id = self
4667            .automation_v2_session_runs
4668            .read()
4669            .await
4670            .get(session_id)
4671            .cloned();
4672        if let Some(run_id) = maybe_v2_run_id {
4673            let rate = self.token_cost_per_1k_usd.max(0.0);
4674            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4675            let mut guard = self.automation_v2_runs.write().await;
4676            if let Some(run) = guard.get_mut(&run_id) {
4677                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4678                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4679                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4680                run.estimated_cost_usd += delta_cost;
4681                run.updated_at_ms = now_ms();
4682            }
4683            drop(guard);
4684            let _ = self.persist_automation_v2_runs().await;
4685        }
4686    }
4687
4688    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
4689        let mut fired = Vec::new();
4690        let mut guard = self.automations_v2.write().await;
4691        for automation in guard.values_mut() {
4692            if automation.status != AutomationV2Status::Active {
4693                continue;
4694            }
4695            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
4696                automation.next_fire_at_ms =
4697                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4698                continue;
4699            };
4700            if now_ms < next_fire_at_ms {
4701                continue;
4702            }
4703            let run_count =
4704                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
4705            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4706            automation.next_fire_at_ms = next;
4707            automation.last_fired_at_ms = Some(now_ms);
4708            for _ in 0..run_count {
4709                fired.push(automation.automation_id.clone());
4710            }
4711        }
4712        drop(guard);
4713        let _ = self.persist_automations_v2().await;
4714        fired
4715    }
4716}
4717
4718async fn build_channels_config(
4719    state: &AppState,
4720    channels: &ChannelsConfigFile,
4721) -> Option<ChannelsConfig> {
4722    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
4723        return None;
4724    }
4725    Some(ChannelsConfig {
4726        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
4727            bot_token: cfg.bot_token,
4728            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4729            mention_only: cfg.mention_only,
4730            style_profile: cfg.style_profile,
4731            security_profile: cfg.security_profile,
4732        }),
4733        discord: channels.discord.clone().map(|cfg| DiscordConfig {
4734            bot_token: cfg.bot_token,
4735            guild_id: cfg.guild_id,
4736            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4737            mention_only: cfg.mention_only,
4738            security_profile: cfg.security_profile,
4739        }),
4740        slack: channels.slack.clone().map(|cfg| SlackConfig {
4741            bot_token: cfg.bot_token,
4742            channel_id: cfg.channel_id,
4743            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4744            mention_only: cfg.mention_only,
4745            security_profile: cfg.security_profile,
4746        }),
4747        server_base_url: state.server_base_url(),
4748        api_token: state.api_token().await.unwrap_or_default(),
4749        tool_policy: channels.tool_policy.clone(),
4750    })
4751}
4752
4753// channel config normalization moved to crate::config::channels
4754
4755fn is_valid_owner_repo_slug(value: &str) -> bool {
4756    let trimmed = value.trim();
4757    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
4758        return false;
4759    }
4760    let mut parts = trimmed.split('/');
4761    let Some(owner) = parts.next() else {
4762        return false;
4763    };
4764    let Some(repo) = parts.next() else {
4765        return false;
4766    };
4767    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
4768}
4769
4770fn legacy_automations_v2_path() -> Option<PathBuf> {
4771    config::paths::resolve_legacy_root_file_path("automations_v2.json")
4772        .filter(|path| path != &config::paths::resolve_automations_v2_path())
4773}
4774
4775fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4776    let mut candidates = vec![active_path.clone()];
4777    if let Some(legacy_path) = legacy_automations_v2_path() {
4778        if !candidates.contains(&legacy_path) {
4779            candidates.push(legacy_path);
4780        }
4781    }
4782    let default_path = config::paths::default_state_dir().join("automations_v2.json");
4783    if !candidates.contains(&default_path) {
4784        candidates.push(default_path);
4785    }
4786    candidates
4787}
4788
4789async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
4790    let Some(legacy_path) = legacy_automations_v2_path() else {
4791        return Ok(());
4792    };
4793    if legacy_path == *active_path || !legacy_path.exists() {
4794        return Ok(());
4795    }
4796    fs::remove_file(&legacy_path).await?;
4797    tracing::info!(
4798        active_path = active_path.display().to_string(),
4799        removed_path = legacy_path.display().to_string(),
4800        "removed stale legacy automation v2 file after canonical persistence"
4801    );
4802    Ok(())
4803}
4804
4805fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
4806    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
4807        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
4808}
4809
4810fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4811    let mut candidates = vec![active_path.clone()];
4812    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
4813        if !candidates.contains(&legacy_path) {
4814            candidates.push(legacy_path);
4815        }
4816    }
4817    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
4818    if !candidates.contains(&default_path) {
4819        candidates.push(default_path);
4820    }
4821    candidates
4822}
4823
4824fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4825    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4826        .unwrap_or_default()
4827}
4828
4829fn parse_automation_v2_runs_file(
4830    raw: &str,
4831) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4832    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4833        .unwrap_or_default()
4834}
4835
4836fn parse_optimization_campaigns_file(
4837    raw: &str,
4838) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
4839    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
4840        .unwrap_or_default()
4841}
4842
4843fn parse_optimization_experiments_file(
4844    raw: &str,
4845) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
4846    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
4847        .unwrap_or_default()
4848}
4849
4850fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4851    match schedule {
4852        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4853        RoutineSchedule::Cron { .. } => None,
4854    }
4855}
4856
4857fn parse_timezone(timezone: &str) -> Option<Tz> {
4858    timezone.trim().parse::<Tz>().ok()
4859}
4860
4861fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4862    let tz = parse_timezone(timezone)?;
4863    let schedule = Schedule::from_str(expression).ok()?;
4864    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4865    let local_from = from_dt.with_timezone(&tz);
4866    let next = schedule.after(&local_from).next()?;
4867    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4868}
4869
4870fn compute_next_schedule_fire_at_ms(
4871    schedule: &RoutineSchedule,
4872    timezone: &str,
4873    from_ms: u64,
4874) -> Option<u64> {
4875    let _ = parse_timezone(timezone)?;
4876    match schedule {
4877        RoutineSchedule::IntervalSeconds { seconds } => {
4878            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4879        }
4880        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4881    }
4882}
4883
4884fn compute_misfire_plan_for_schedule(
4885    now_ms: u64,
4886    next_fire_at_ms: u64,
4887    schedule: &RoutineSchedule,
4888    timezone: &str,
4889    policy: &RoutineMisfirePolicy,
4890) -> (u32, u64) {
4891    match schedule {
4892        RoutineSchedule::IntervalSeconds { .. } => {
4893            let Some(interval_ms) = routine_interval_ms(schedule) else {
4894                return (0, next_fire_at_ms);
4895            };
4896            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4897        }
4898        RoutineSchedule::Cron { expression } => {
4899            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4900                .unwrap_or_else(|| now_ms.saturating_add(60_000));
4901            match policy {
4902                RoutineMisfirePolicy::Skip => (0, aligned_next),
4903                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4904                RoutineMisfirePolicy::CatchUp { max_runs } => {
4905                    let mut count = 0u32;
4906                    let mut cursor = next_fire_at_ms;
4907                    while cursor <= now_ms && count < *max_runs {
4908                        count = count.saturating_add(1);
4909                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4910                            break;
4911                        };
4912                        if next <= cursor {
4913                            break;
4914                        }
4915                        cursor = next;
4916                    }
4917                    (count, aligned_next)
4918                }
4919            }
4920        }
4921    }
4922}
4923
4924fn compute_misfire_plan(
4925    now_ms: u64,
4926    next_fire_at_ms: u64,
4927    interval_ms: u64,
4928    policy: &RoutineMisfirePolicy,
4929) -> (u32, u64) {
4930    if now_ms < next_fire_at_ms || interval_ms == 0 {
4931        return (0, next_fire_at_ms);
4932    }
4933    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4934    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4935    match policy {
4936        RoutineMisfirePolicy::Skip => (0, aligned_next),
4937        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4938        RoutineMisfirePolicy::CatchUp { max_runs } => {
4939            let count = missed.min(u64::from(*max_runs)) as u32;
4940            (count, aligned_next)
4941        }
4942    }
4943}
4944
4945fn auto_generated_agent_name(agent_id: &str) -> String {
4946    let names = [
4947        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4948    ];
4949    let digest = Sha256::digest(agent_id.as_bytes());
4950    let idx = usize::from(digest[0]) % names.len();
4951    format!("{}-{:02x}", names[idx], digest[1])
4952}
4953
4954fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4955    match schedule.schedule_type {
4956        AutomationV2ScheduleType::Manual => None,
4957        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4958            seconds: schedule.interval_seconds.unwrap_or(60),
4959        }),
4960        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4961            expression: schedule.cron_expression.clone().unwrap_or_default(),
4962        }),
4963    }
4964}
4965
4966fn automation_schedule_next_fire_at_ms(
4967    schedule: &AutomationV2Schedule,
4968    from_ms: u64,
4969) -> Option<u64> {
4970    let routine_schedule = schedule_from_automation_v2(schedule)?;
4971    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4972}
4973
4974fn automation_schedule_due_count(
4975    schedule: &AutomationV2Schedule,
4976    now_ms: u64,
4977    next_fire_at_ms: u64,
4978) -> u32 {
4979    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4980        return 0;
4981    };
4982    let (count, _) = compute_misfire_plan_for_schedule(
4983        now_ms,
4984        next_fire_at_ms,
4985        &routine_schedule,
4986        &schedule.timezone,
4987        &schedule.misfire_policy,
4988    );
4989    count.max(1)
4990}
4991
4992#[derive(Debug, Clone, PartialEq, Eq)]
4993pub enum RoutineExecutionDecision {
4994    Allowed,
4995    RequiresApproval { reason: String },
4996    Blocked { reason: String },
4997}
4998
4999pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
5000    let entrypoint = routine.entrypoint.to_ascii_lowercase();
5001    if entrypoint.starts_with("connector.")
5002        || entrypoint.starts_with("integration.")
5003        || entrypoint.contains("external")
5004    {
5005        return true;
5006    }
5007    routine
5008        .args
5009        .get("uses_external_integrations")
5010        .and_then(|v| v.as_bool())
5011        .unwrap_or(false)
5012        || routine
5013            .args
5014            .get("connector_id")
5015            .and_then(|v| v.as_str())
5016            .is_some()
5017}
5018
5019pub fn evaluate_routine_execution_policy(
5020    routine: &RoutineSpec,
5021    trigger_type: &str,
5022) -> RoutineExecutionDecision {
5023    if !routine_uses_external_integrations(routine) {
5024        return RoutineExecutionDecision::Allowed;
5025    }
5026    if !routine.external_integrations_allowed {
5027        return RoutineExecutionDecision::Blocked {
5028            reason: "external integrations are disabled by policy".to_string(),
5029        };
5030    }
5031    if routine.requires_approval {
5032        return RoutineExecutionDecision::RequiresApproval {
5033            reason: format!(
5034                "manual approval required before external side effects ({})",
5035                trigger_type
5036            ),
5037        };
5038    }
5039    RoutineExecutionDecision::Allowed
5040}
5041
5042fn is_valid_resource_key(key: &str) -> bool {
5043    let trimmed = key.trim();
5044    if trimmed.is_empty() {
5045        return false;
5046    }
5047    if trimmed == "swarm.active_tasks" {
5048        return true;
5049    }
5050    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
5051    if !allowed_prefix
5052        .iter()
5053        .any(|prefix| trimmed.starts_with(prefix))
5054    {
5055        return false;
5056    }
5057    !trimmed.contains("//")
5058}
5059
5060impl Deref for AppState {
5061    type Target = RuntimeState;
5062
5063    fn deref(&self) -> &Self::Target {
5064        self.runtime
5065            .get()
5066            .expect("runtime accessed before startup completion")
5067    }
5068}
5069
5070#[derive(Clone)]
5071struct ServerPromptContextHook {
5072    state: AppState,
5073}
5074
5075impl ServerPromptContextHook {
5076    fn new(state: AppState) -> Self {
5077        Self { state }
5078    }
5079
5080    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
5081        let paths = resolve_shared_paths().ok()?;
5082        MemoryDatabase::new(&paths.memory_db_path).await.ok()
5083    }
5084
5085    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
5086        let paths = resolve_shared_paths().ok()?;
5087        tandem_memory::MemoryManager::new(&paths.memory_db_path)
5088            .await
5089            .ok()
5090    }
5091
5092    fn hash_query(input: &str) -> String {
5093        let mut hasher = Sha256::new();
5094        hasher.update(input.as_bytes());
5095        format!("{:x}", hasher.finalize())
5096    }
5097
5098    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
5099        let mut out = vec!["<memory_context>".to_string()];
5100        let mut used = 0usize;
5101        for hit in hits {
5102            let text = hit
5103                .record
5104                .content
5105                .split_whitespace()
5106                .take(60)
5107                .collect::<Vec<_>>()
5108                .join(" ");
5109            let line = format!(
5110                "- [{:.3}] {} (source={}, run={})",
5111                hit.score, text, hit.record.source_type, hit.record.run_id
5112            );
5113            used = used.saturating_add(line.len());
5114            if used > 2200 {
5115                break;
5116            }
5117            out.push(line);
5118        }
5119        out.push("</memory_context>".to_string());
5120        out.join("\n")
5121    }
5122
5123    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
5124        chunk
5125            .metadata
5126            .as_ref()
5127            .and_then(|meta| meta.get("source_url"))
5128            .and_then(Value::as_str)
5129            .map(str::trim)
5130            .filter(|v| !v.is_empty())
5131            .map(ToString::to_string)
5132    }
5133
5134    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
5135        if let Some(path) = chunk
5136            .metadata
5137            .as_ref()
5138            .and_then(|meta| meta.get("relative_path"))
5139            .and_then(Value::as_str)
5140            .map(str::trim)
5141            .filter(|v| !v.is_empty())
5142        {
5143            return path.to_string();
5144        }
5145        chunk
5146            .source
5147            .strip_prefix("guide_docs:")
5148            .unwrap_or(chunk.source.as_str())
5149            .to_string()
5150    }
5151
5152    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
5153        let mut out = vec!["<docs_context>".to_string()];
5154        let mut used = 0usize;
5155        for hit in hits {
5156            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
5157            let path = Self::extract_docs_relative_path(&hit.chunk);
5158            let text = hit
5159                .chunk
5160                .content
5161                .split_whitespace()
5162                .take(70)
5163                .collect::<Vec<_>>()
5164                .join(" ");
5165            let line = format!(
5166                "- [{:.3}] {} (doc_path={}, source_url={})",
5167                hit.similarity, text, path, url
5168            );
5169            used = used.saturating_add(line.len());
5170            if used > 2800 {
5171                break;
5172            }
5173            out.push(line);
5174        }
5175        out.push("</docs_context>".to_string());
5176        out.join("\n")
5177    }
5178
5179    async fn search_embedded_docs(
5180        &self,
5181        query: &str,
5182        limit: usize,
5183    ) -> Vec<tandem_memory::types::MemorySearchResult> {
5184        let Some(manager) = self.open_memory_manager().await else {
5185            return Vec::new();
5186        };
5187        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
5188        manager
5189            .search(
5190                query,
5191                Some(MemoryTier::Global),
5192                None,
5193                None,
5194                Some(search_limit),
5195            )
5196            .await
5197            .unwrap_or_default()
5198            .into_iter()
5199            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
5200            .take(limit)
5201            .collect()
5202    }
5203
5204    fn should_skip_memory_injection(query: &str) -> bool {
5205        let trimmed = query.trim();
5206        if trimmed.is_empty() {
5207            return true;
5208        }
5209        let lower = trimmed.to_ascii_lowercase();
5210        let social = [
5211            "hi",
5212            "hello",
5213            "hey",
5214            "thanks",
5215            "thank you",
5216            "ok",
5217            "okay",
5218            "cool",
5219            "nice",
5220            "yo",
5221            "good morning",
5222            "good afternoon",
5223            "good evening",
5224        ];
5225        lower.len() <= 32 && social.contains(&lower.as_str())
5226    }
5227
5228    fn personality_preset_text(preset: &str) -> &'static str {
5229        match preset {
5230            "concise" => {
5231                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
5232            }
5233            "friendly" => {
5234                "Default style: friendly and supportive while staying technically rigorous and concrete."
5235            }
5236            "mentor" => {
5237                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
5238            }
5239            "critical" => {
5240                "Default style: critical and risk-first. Surface failure modes and assumptions early."
5241            }
5242            _ => {
5243                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
5244            }
5245        }
5246    }
5247
5248    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
5249        let allow_agent_override = agent_name
5250            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
5251            .unwrap_or(false);
5252        let legacy_bot_name = config
5253            .get("bot_name")
5254            .and_then(Value::as_str)
5255            .map(str::trim)
5256            .filter(|v| !v.is_empty());
5257        let bot_name = config
5258            .get("identity")
5259            .and_then(|identity| identity.get("bot"))
5260            .and_then(|bot| bot.get("canonical_name"))
5261            .and_then(Value::as_str)
5262            .map(str::trim)
5263            .filter(|v| !v.is_empty())
5264            .or(legacy_bot_name)
5265            .unwrap_or("Tandem");
5266
5267        let default_profile = config
5268            .get("identity")
5269            .and_then(|identity| identity.get("personality"))
5270            .and_then(|personality| personality.get("default"));
5271        let default_preset = default_profile
5272            .and_then(|profile| profile.get("preset"))
5273            .and_then(Value::as_str)
5274            .map(str::trim)
5275            .filter(|v| !v.is_empty())
5276            .unwrap_or("balanced");
5277        let default_custom = default_profile
5278            .and_then(|profile| profile.get("custom_instructions"))
5279            .and_then(Value::as_str)
5280            .map(str::trim)
5281            .filter(|v| !v.is_empty())
5282            .map(ToString::to_string);
5283        let legacy_persona = config
5284            .get("persona")
5285            .and_then(Value::as_str)
5286            .map(str::trim)
5287            .filter(|v| !v.is_empty())
5288            .map(ToString::to_string);
5289
5290        let per_agent_profile = if allow_agent_override {
5291            agent_name.and_then(|name| {
5292                config
5293                    .get("identity")
5294                    .and_then(|identity| identity.get("personality"))
5295                    .and_then(|personality| personality.get("per_agent"))
5296                    .and_then(|per_agent| per_agent.get(name))
5297            })
5298        } else {
5299            None
5300        };
5301        let preset = per_agent_profile
5302            .and_then(|profile| profile.get("preset"))
5303            .and_then(Value::as_str)
5304            .map(str::trim)
5305            .filter(|v| !v.is_empty())
5306            .unwrap_or(default_preset);
5307        let custom = per_agent_profile
5308            .and_then(|profile| profile.get("custom_instructions"))
5309            .and_then(Value::as_str)
5310            .map(str::trim)
5311            .filter(|v| !v.is_empty())
5312            .map(ToString::to_string)
5313            .or(default_custom)
5314            .or(legacy_persona);
5315
5316        let mut lines = vec![
5317            format!("You are {bot_name}, an AI assistant."),
5318            Self::personality_preset_text(preset).to_string(),
5319        ];
5320        if let Some(custom) = custom {
5321            lines.push(format!("Additional personality instructions: {custom}"));
5322        }
5323        Some(lines.join("\n"))
5324    }
5325
5326    fn build_memory_scope_block(
5327        session_id: &str,
5328        project_id: Option<&str>,
5329        workspace_root: Option<&str>,
5330    ) -> String {
5331        let mut lines = vec![
5332            "<memory_scope>".to_string(),
5333            format!("- current_session_id: {}", session_id),
5334        ];
5335        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
5336            lines.push(format!("- current_project_id: {}", project_id));
5337        }
5338        if let Some(workspace_root) = workspace_root
5339            .map(str::trim)
5340            .filter(|value| !value.is_empty())
5341        {
5342            lines.push(format!("- workspace_root: {}", workspace_root));
5343        }
5344        lines.push(
5345            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
5346                .to_string(),
5347        );
5348        lines.push(
5349            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
5350                .to_string(),
5351        );
5352        lines.push(
5353            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
5354                .to_string(),
5355        );
5356        lines.push("</memory_scope>".to_string());
5357        lines.join("\n")
5358    }
5359}
5360
5361impl PromptContextHook for ServerPromptContextHook {
5362    fn augment_provider_messages(
5363        &self,
5364        ctx: PromptContextHookContext,
5365        mut messages: Vec<ChatMessage>,
5366    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
5367        let this = self.clone();
5368        Box::pin(async move {
5369            // Startup can invoke prompt plumbing before RuntimeState is installed.
5370            // Never panic from context hooks; fail-open and continue without augmentation.
5371            if !this.state.is_ready() {
5372                return Ok(messages);
5373            }
5374            let run = this.state.run_registry.get(&ctx.session_id).await;
5375            let Some(run) = run else {
5376                return Ok(messages);
5377            };
5378            let config = this.state.config.get_effective_value().await;
5379            if let Some(identity_block) =
5380                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
5381            {
5382                messages.push(ChatMessage {
5383                    role: "system".to_string(),
5384                    content: identity_block,
5385                    attachments: Vec::new(),
5386                });
5387            }
5388            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
5389                messages.push(ChatMessage {
5390                    role: "system".to_string(),
5391                    content: Self::build_memory_scope_block(
5392                        &ctx.session_id,
5393                        session.project_id.as_deref(),
5394                        session.workspace_root.as_deref(),
5395                    ),
5396                    attachments: Vec::new(),
5397                });
5398            }
5399            let run_id = run.run_id;
5400            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
5401            let query = messages
5402                .iter()
5403                .rev()
5404                .find(|m| m.role == "user")
5405                .map(|m| m.content.clone())
5406                .unwrap_or_default();
5407            if query.trim().is_empty() {
5408                return Ok(messages);
5409            }
5410            if Self::should_skip_memory_injection(&query) {
5411                return Ok(messages);
5412            }
5413
5414            let docs_hits = this.search_embedded_docs(&query, 6).await;
5415            if !docs_hits.is_empty() {
5416                let docs_block = Self::build_docs_memory_block(&docs_hits);
5417                messages.push(ChatMessage {
5418                    role: "system".to_string(),
5419                    content: docs_block.clone(),
5420                    attachments: Vec::new(),
5421                });
5422                this.state.event_bus.publish(EngineEvent::new(
5423                    "memory.docs.context.injected",
5424                    json!({
5425                        "runID": run_id,
5426                        "sessionID": ctx.session_id,
5427                        "messageID": ctx.message_id,
5428                        "iteration": ctx.iteration,
5429                        "count": docs_hits.len(),
5430                        "tokenSizeApprox": docs_block.split_whitespace().count(),
5431                        "sourcePrefix": "guide_docs:"
5432                    }),
5433                ));
5434                return Ok(messages);
5435            }
5436
5437            let Some(db) = this.open_memory_db().await else {
5438                return Ok(messages);
5439            };
5440            let started = now_ms();
5441            let hits = db
5442                .search_global_memory(&user_id, &query, 8, None, None, None)
5443                .await
5444                .unwrap_or_default();
5445            let latency_ms = now_ms().saturating_sub(started);
5446            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
5447            this.state.event_bus.publish(EngineEvent::new(
5448                "memory.search.performed",
5449                json!({
5450                    "runID": run_id,
5451                    "sessionID": ctx.session_id,
5452                    "messageID": ctx.message_id,
5453                    "providerID": ctx.provider_id,
5454                    "modelID": ctx.model_id,
5455                    "iteration": ctx.iteration,
5456                    "queryHash": Self::hash_query(&query),
5457                    "resultCount": hits.len(),
5458                    "scoreMin": scores.iter().copied().reduce(f64::min),
5459                    "scoreMax": scores.iter().copied().reduce(f64::max),
5460                    "scores": scores,
5461                    "latencyMs": latency_ms,
5462                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
5463                }),
5464            ));
5465
5466            if hits.is_empty() {
5467                return Ok(messages);
5468            }
5469
5470            let memory_block = Self::build_memory_block(&hits);
5471            messages.push(ChatMessage {
5472                role: "system".to_string(),
5473                content: memory_block.clone(),
5474                attachments: Vec::new(),
5475            });
5476            this.state.event_bus.publish(EngineEvent::new(
5477                "memory.context.injected",
5478                json!({
5479                    "runID": run_id,
5480                    "sessionID": ctx.session_id,
5481                    "messageID": ctx.message_id,
5482                    "iteration": ctx.iteration,
5483                    "count": hits.len(),
5484                    "tokenSizeApprox": memory_block.split_whitespace().count(),
5485                }),
5486            ));
5487            Ok(messages)
5488        })
5489    }
5490}
5491
5492fn extract_event_session_id(properties: &Value) -> Option<String> {
5493    properties
5494        .get("sessionID")
5495        .or_else(|| properties.get("sessionId"))
5496        .or_else(|| properties.get("id"))
5497        .or_else(|| {
5498            properties
5499                .get("part")
5500                .and_then(|part| part.get("sessionID"))
5501        })
5502        .or_else(|| {
5503            properties
5504                .get("part")
5505                .and_then(|part| part.get("sessionId"))
5506        })
5507        .and_then(|v| v.as_str())
5508        .map(|s| s.to_string())
5509}
5510
5511fn extract_event_run_id(properties: &Value) -> Option<String> {
5512    properties
5513        .get("runID")
5514        .or_else(|| properties.get("run_id"))
5515        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
5516        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
5517        .and_then(|v| v.as_str())
5518        .map(|s| s.to_string())
5519}
5520
5521pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
5522    let part = properties.get("part")?;
5523    let part_type = part
5524        .get("type")
5525        .and_then(|v| v.as_str())
5526        .unwrap_or_default()
5527        .to_ascii_lowercase();
5528    if part_type != "tool"
5529        && part_type != "tool-invocation"
5530        && part_type != "tool-result"
5531        && part_type != "tool_invocation"
5532        && part_type != "tool_result"
5533    {
5534        return None;
5535    }
5536    let part_state = part
5537        .get("state")
5538        .and_then(|v| v.as_str())
5539        .unwrap_or_default()
5540        .to_ascii_lowercase();
5541    let has_result = part.get("result").is_some_and(|value| !value.is_null());
5542    let has_error = part
5543        .get("error")
5544        .and_then(|v| v.as_str())
5545        .is_some_and(|value| !value.trim().is_empty());
5546    // Skip transient "running" deltas to avoid persistence storms from streamed
5547    // tool-argument chunks; keep actionable/final updates.
5548    if part_state == "running" && !has_result && !has_error {
5549        return None;
5550    }
5551    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
5552    let message_id = part
5553        .get("messageID")
5554        .or_else(|| part.get("message_id"))
5555        .and_then(|v| v.as_str())?
5556        .to_string();
5557    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
5558    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5559        if let Some(preview) = properties
5560            .get("toolCallDelta")
5561            .and_then(|delta| delta.get("parsedArgsPreview"))
5562            .cloned()
5563        {
5564            let preview_nonempty = !preview.is_null()
5565                && !preview.as_object().is_some_and(|value| value.is_empty())
5566                && !preview
5567                    .as_str()
5568                    .map(|value| value.trim().is_empty())
5569                    .unwrap_or(false);
5570            if preview_nonempty {
5571                args = preview;
5572            }
5573        }
5574        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5575            if let Some(raw_preview) = properties
5576                .get("toolCallDelta")
5577                .and_then(|delta| delta.get("rawArgsPreview"))
5578                .and_then(|value| value.as_str())
5579                .map(str::trim)
5580                .filter(|value| !value.is_empty())
5581            {
5582                args = Value::String(raw_preview.to_string());
5583            }
5584        }
5585    }
5586    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
5587    {
5588        tracing::info!(
5589            message_id = %message_id,
5590            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
5591            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
5592            has_result = part.get("result").is_some(),
5593            has_error = part.get("error").is_some(),
5594            "persistable write tool part still has empty args"
5595        );
5596    }
5597    let result = part.get("result").cloned().filter(|value| !value.is_null());
5598    let error = part
5599        .get("error")
5600        .and_then(|v| v.as_str())
5601        .map(|value| value.to_string());
5602    Some((
5603        message_id,
5604        MessagePart::ToolInvocation {
5605            tool,
5606            args,
5607            result,
5608            error,
5609        },
5610    ))
5611}
5612
5613pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
5614    let session_id = extract_event_session_id(&event.properties)?;
5615    let run_id = extract_event_run_id(&event.properties);
5616    let key = format!("run/{session_id}/status");
5617
5618    let mut base = serde_json::Map::new();
5619    base.insert("sessionID".to_string(), Value::String(session_id));
5620    if let Some(run_id) = run_id {
5621        base.insert("runID".to_string(), Value::String(run_id));
5622    }
5623
5624    match event.event_type.as_str() {
5625        "session.run.started" => {
5626            base.insert("state".to_string(), Value::String("running".to_string()));
5627            base.insert("phase".to_string(), Value::String("run".to_string()));
5628            base.insert(
5629                "eventType".to_string(),
5630                Value::String("session.run.started".to_string()),
5631            );
5632            Some(StatusIndexUpdate {
5633                key,
5634                value: Value::Object(base),
5635            })
5636        }
5637        "session.run.finished" => {
5638            base.insert("state".to_string(), Value::String("finished".to_string()));
5639            base.insert("phase".to_string(), Value::String("run".to_string()));
5640            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
5641                base.insert("result".to_string(), Value::String(status.to_string()));
5642            }
5643            base.insert(
5644                "eventType".to_string(),
5645                Value::String("session.run.finished".to_string()),
5646            );
5647            Some(StatusIndexUpdate {
5648                key,
5649                value: Value::Object(base),
5650            })
5651        }
5652        "message.part.updated" => {
5653            let part = event.properties.get("part")?;
5654            let part_type = part.get("type").and_then(|v| v.as_str())?;
5655            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
5656            let (phase, tool_active) = match (part_type, part_state) {
5657                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
5658                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
5659                _ => return None,
5660            };
5661            base.insert("state".to_string(), Value::String("running".to_string()));
5662            base.insert("phase".to_string(), Value::String(phase.to_string()));
5663            base.insert("toolActive".to_string(), Value::Bool(tool_active));
5664            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
5665                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
5666            }
5667            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
5668                base.insert(
5669                    "toolState".to_string(),
5670                    Value::String(tool_state.to_string()),
5671                );
5672            }
5673            if let Some(tool_error) = part
5674                .get("error")
5675                .and_then(|v| v.as_str())
5676                .map(str::trim)
5677                .filter(|value| !value.is_empty())
5678            {
5679                base.insert(
5680                    "toolError".to_string(),
5681                    Value::String(tool_error.to_string()),
5682                );
5683            }
5684            if let Some(tool_call_id) = part
5685                .get("id")
5686                .and_then(|v| v.as_str())
5687                .map(str::trim)
5688                .filter(|value| !value.is_empty())
5689            {
5690                base.insert(
5691                    "toolCallID".to_string(),
5692                    Value::String(tool_call_id.to_string()),
5693                );
5694            }
5695            if let Some(args_preview) = part
5696                .get("args")
5697                .filter(|value| {
5698                    !value.is_null()
5699                        && !value.as_object().is_some_and(|map| map.is_empty())
5700                        && !value
5701                            .as_str()
5702                            .map(|text| text.trim().is_empty())
5703                            .unwrap_or(false)
5704                })
5705                .map(|value| truncate_text(&value.to_string(), 500))
5706            {
5707                base.insert(
5708                    "toolArgsPreview".to_string(),
5709                    Value::String(args_preview.to_string()),
5710                );
5711            }
5712            base.insert(
5713                "eventType".to_string(),
5714                Value::String("message.part.updated".to_string()),
5715            );
5716            Some(StatusIndexUpdate {
5717                key,
5718                value: Value::Object(base),
5719            })
5720        }
5721        _ => None,
5722    }
5723}
5724
5725pub async fn run_session_part_persister(state: AppState) {
5726    crate::app::tasks::run_session_part_persister(state).await
5727}
5728
5729pub async fn run_status_indexer(state: AppState) {
5730    crate::app::tasks::run_status_indexer(state).await
5731}
5732
5733pub async fn run_agent_team_supervisor(state: AppState) {
5734    crate::app::tasks::run_agent_team_supervisor(state).await
5735}
5736
5737pub async fn run_bug_monitor(state: AppState) {
5738    crate::app::tasks::run_bug_monitor(state).await
5739}
5740
5741pub async fn run_usage_aggregator(state: AppState) {
5742    crate::app::tasks::run_usage_aggregator(state).await
5743}
5744
5745pub async fn run_optimization_scheduler(state: AppState) {
5746    crate::app::tasks::run_optimization_scheduler(state).await
5747}
5748
5749pub async fn process_bug_monitor_event(
5750    state: &AppState,
5751    event: &EngineEvent,
5752    config: &BugMonitorConfig,
5753) -> anyhow::Result<BugMonitorIncidentRecord> {
5754    let submission =
5755        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
5756            .await?;
5757    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5758        state,
5759        submission.repo.as_deref().unwrap_or_default(),
5760        submission.fingerprint.as_deref().unwrap_or_default(),
5761        submission.title.as_deref(),
5762        submission.detail.as_deref(),
5763        &submission.excerpt,
5764        3,
5765    )
5766    .await;
5767    let fingerprint = submission
5768        .fingerprint
5769        .clone()
5770        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5771    let default_workspace_root = state.workspace_index.snapshot().await.root;
5772    let workspace_root = config
5773        .workspace_root
5774        .clone()
5775        .unwrap_or(default_workspace_root);
5776    let now = now_ms();
5777
5778    let existing = state
5779        .bug_monitor_incidents
5780        .read()
5781        .await
5782        .values()
5783        .find(|row| row.fingerprint == fingerprint)
5784        .cloned();
5785
5786    let mut incident = if let Some(mut row) = existing {
5787        row.occurrence_count = row.occurrence_count.saturating_add(1);
5788        row.updated_at_ms = now;
5789        row.last_seen_at_ms = Some(now);
5790        if row.excerpt.is_empty() {
5791            row.excerpt = submission.excerpt.clone();
5792        }
5793        row
5794    } else {
5795        BugMonitorIncidentRecord {
5796            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5797            fingerprint: fingerprint.clone(),
5798            event_type: event.event_type.clone(),
5799            status: "queued".to_string(),
5800            repo: submission.repo.clone().unwrap_or_default(),
5801            workspace_root,
5802            title: submission
5803                .title
5804                .clone()
5805                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5806            detail: submission.detail.clone(),
5807            excerpt: submission.excerpt.clone(),
5808            source: submission.source.clone(),
5809            run_id: submission.run_id.clone(),
5810            session_id: submission.session_id.clone(),
5811            correlation_id: submission.correlation_id.clone(),
5812            component: submission.component.clone(),
5813            level: submission.level.clone(),
5814            occurrence_count: 1,
5815            created_at_ms: now,
5816            updated_at_ms: now,
5817            last_seen_at_ms: Some(now),
5818            draft_id: None,
5819            triage_run_id: None,
5820            last_error: None,
5821            duplicate_summary: None,
5822            duplicate_matches: None,
5823            event_payload: Some(event.properties.clone()),
5824        }
5825    };
5826    state.put_bug_monitor_incident(incident.clone()).await?;
5827
5828    if !duplicate_matches.is_empty() {
5829        incident.status = "duplicate_suppressed".to_string();
5830        let duplicate_summary =
5831            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5832        incident.duplicate_summary = Some(duplicate_summary.clone());
5833        incident.duplicate_matches = Some(duplicate_matches.clone());
5834        incident.updated_at_ms = now_ms();
5835        state.put_bug_monitor_incident(incident.clone()).await?;
5836        state.event_bus.publish(EngineEvent::new(
5837            "bug_monitor.incident.duplicate_suppressed",
5838            serde_json::json!({
5839                "incident_id": incident.incident_id,
5840                "fingerprint": incident.fingerprint,
5841                "eventType": incident.event_type,
5842                "status": incident.status,
5843                "duplicate_summary": duplicate_summary,
5844                "duplicate_matches": duplicate_matches,
5845            }),
5846        ));
5847        return Ok(incident);
5848    }
5849
5850    let draft = match state.submit_bug_monitor_draft(submission).await {
5851        Ok(draft) => draft,
5852        Err(error) => {
5853            incident.status = "draft_failed".to_string();
5854            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5855            incident.updated_at_ms = now_ms();
5856            state.put_bug_monitor_incident(incident.clone()).await?;
5857            state.event_bus.publish(EngineEvent::new(
5858                "bug_monitor.incident.detected",
5859                serde_json::json!({
5860                    "incident_id": incident.incident_id,
5861                    "fingerprint": incident.fingerprint,
5862                    "eventType": incident.event_type,
5863                    "draft_id": incident.draft_id,
5864                    "triage_run_id": incident.triage_run_id,
5865                    "status": incident.status,
5866                    "detail": incident.last_error,
5867                }),
5868            ));
5869            return Ok(incident);
5870        }
5871    };
5872    incident.draft_id = Some(draft.draft_id.clone());
5873    incident.status = "draft_created".to_string();
5874    state.put_bug_monitor_incident(incident.clone()).await?;
5875
5876    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5877        state.clone(),
5878        &draft.draft_id,
5879        true,
5880    )
5881    .await
5882    {
5883        Ok((updated_draft, _run_id, _deduped)) => {
5884            incident.triage_run_id = updated_draft.triage_run_id.clone();
5885            if incident.triage_run_id.is_some() {
5886                incident.status = "triage_queued".to_string();
5887            }
5888            incident.last_error = None;
5889        }
5890        Err(error) => {
5891            incident.status = "draft_created".to_string();
5892            incident.last_error = Some(truncate_text(&error.to_string(), 500));
5893        }
5894    }
5895
5896    if let Some(draft_id) = incident.draft_id.clone() {
5897        let latest_draft = state
5898            .get_bug_monitor_draft(&draft_id)
5899            .await
5900            .unwrap_or(draft.clone());
5901        match crate::bug_monitor_github::publish_draft(
5902            state,
5903            &draft_id,
5904            Some(&incident.incident_id),
5905            crate::bug_monitor_github::PublishMode::Auto,
5906        )
5907        .await
5908        {
5909            Ok(outcome) => {
5910                incident.status = outcome.action;
5911                incident.last_error = None;
5912            }
5913            Err(error) => {
5914                let detail = truncate_text(&error.to_string(), 500);
5915                incident.last_error = Some(detail.clone());
5916                let mut failed_draft = latest_draft;
5917                failed_draft.status = "github_post_failed".to_string();
5918                failed_draft.github_status = Some("github_post_failed".to_string());
5919                failed_draft.last_post_error = Some(detail.clone());
5920                let evidence_digest = failed_draft.evidence_digest.clone();
5921                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5922                let _ = crate::bug_monitor_github::record_post_failure(
5923                    state,
5924                    &failed_draft,
5925                    Some(&incident.incident_id),
5926                    "auto_post",
5927                    evidence_digest.as_deref(),
5928                    &detail,
5929                )
5930                .await;
5931            }
5932        }
5933    }
5934
5935    incident.updated_at_ms = now_ms();
5936    state.put_bug_monitor_incident(incident.clone()).await?;
5937    state.event_bus.publish(EngineEvent::new(
5938        "bug_monitor.incident.detected",
5939        serde_json::json!({
5940            "incident_id": incident.incident_id,
5941            "fingerprint": incident.fingerprint,
5942            "eventType": incident.event_type,
5943            "draft_id": incident.draft_id,
5944            "triage_run_id": incident.triage_run_id,
5945            "status": incident.status,
5946        }),
5947    ));
5948    Ok(incident)
5949}
5950
5951pub fn sha256_hex(parts: &[&str]) -> String {
5952    let mut hasher = Sha256::new();
5953    for part in parts {
5954        hasher.update(part.as_bytes());
5955        hasher.update([0u8]);
5956    }
5957    format!("{:x}", hasher.finalize())
5958}
5959
5960fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
5961    matches!(status, AutomationRunStatus::Running)
5962}
5963
5964fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
5965    matches!(
5966        status,
5967        AutomationRunStatus::Running
5968            | AutomationRunStatus::Pausing
5969            | AutomationRunStatus::Paused
5970            | AutomationRunStatus::AwaitingApproval
5971    )
5972}
5973
5974pub async fn run_routine_scheduler(state: AppState) {
5975    crate::app::tasks::run_routine_scheduler(state).await
5976}
5977
5978pub async fn run_routine_executor(state: AppState) {
5979    crate::app::tasks::run_routine_executor(state).await
5980}
5981
5982pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
5983    crate::app::routines::build_routine_prompt(state, run).await
5984}
5985
5986pub fn truncate_text(input: &str, max_len: usize) -> String {
5987    if input.len() <= max_len {
5988        return input.to_string();
5989    }
5990    let mut out = input[..max_len].to_string();
5991    out.push_str("...<truncated>");
5992    out
5993}
5994
5995pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
5996    crate::app::routines::append_configured_output_artifacts(state, run).await
5997}
5998
5999pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6000    let provider_id = config
6001        .get("default_provider")
6002        .and_then(|v| v.as_str())
6003        .map(str::trim)
6004        .filter(|v| !v.is_empty())?;
6005    let model_id = config
6006        .get("providers")
6007        .and_then(|v| v.get(provider_id))
6008        .and_then(|v| v.get("default_model"))
6009        .and_then(|v| v.as_str())
6010        .map(str::trim)
6011        .filter(|v| !v.is_empty())?;
6012    Some(ModelSpec {
6013        provider_id: provider_id.to_string(),
6014        model_id: model_id.to_string(),
6015    })
6016}
6017
6018pub async fn resolve_routine_model_spec_for_run(
6019    state: &AppState,
6020    run: &RoutineRunRecord,
6021) -> (Option<ModelSpec>, String) {
6022    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
6023}
6024
6025fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
6026    let mut out = Vec::new();
6027    let mut seen = std::collections::HashSet::new();
6028    for item in raw {
6029        let normalized = item.trim().to_string();
6030        if normalized.is_empty() {
6031            continue;
6032        }
6033        if seen.insert(normalized.clone()) {
6034            out.push(normalized);
6035        }
6036    }
6037    out
6038}
6039
6040#[cfg(not(feature = "browser"))]
6041impl AppState {
6042    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
6043        0
6044    }
6045
6046    pub async fn close_all_browser_sessions(&self) -> usize {
6047        0
6048    }
6049
6050    pub async fn browser_status(&self) -> serde_json::Value {
6051        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
6052    }
6053
6054    pub async fn browser_smoke_test(
6055        &self,
6056        _url: Option<String>,
6057    ) -> anyhow::Result<serde_json::Value> {
6058        anyhow::bail!("browser feature disabled")
6059    }
6060
6061    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
6062        anyhow::bail!("browser feature disabled")
6063    }
6064
6065    pub async fn browser_health_summary(&self) -> serde_json::Value {
6066        serde_json::json!({ "enabled": false })
6067    }
6068}
6069
6070pub mod automation;
6071pub use automation::*;
6072
6073#[cfg(test)]
6074mod tests;