Skip to main content

tandem_server/app/state/
mod.rs

1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec, TenantContext};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
22use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
23use tandem_memory::db::MemoryDatabase;
24use tandem_providers::ChatMessage;
25use tandem_workflows::{
26    load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
27    WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
28    WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
29};
30
31use crate::agent_teams::AgentTeamRuntime;
32use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
33use crate::automation_v2::types::*;
34use crate::bug_monitor::types::*;
35use crate::capability_resolver::CapabilityResolver;
36use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
37use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
38use crate::pack_manager::PackManager;
39use crate::preset_registry::PresetRegistry;
40use crate::routines::{errors::RoutineStoreError, types::*};
41use crate::runtime::{
42    lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
43};
44use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
45use crate::util::{host::detect_host_runtime_context, time::now_ms};
46use crate::{
47    derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
48    establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
49    parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
50    OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
51    OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
52    OptimizationPromotionDecisionKind,
53};
54
55#[derive(Clone)]
56pub struct AppState {
57    pub runtime: Arc<OnceLock<RuntimeState>>,
58    pub startup: Arc<RwLock<StartupState>>,
59    pub in_process_mode: Arc<AtomicBool>,
60    pub api_token: Arc<RwLock<Option<String>>>,
61    pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
62    pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
63    pub run_registry: RunRegistry,
64    pub run_stale_ms: u64,
65    pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
66    pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
67    pub memory_audit_path: PathBuf,
68    pub protected_audit_path: PathBuf,
69    pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
70    pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
71    pub shared_resources_path: PathBuf,
72    pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
73    pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
74    pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
75    pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
76    pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
77    pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
78    pub automation_scheduler_stopping: Arc<AtomicBool>,
79    pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
80    pub workflow_plan_drafts:
81        Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
82    pub workflow_planner_sessions: Arc<
83        RwLock<
84            std::collections::HashMap<
85                String,
86                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
87            >,
88        >,
89    >,
90    pub(crate) context_packs: Arc<
91        RwLock<std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>>,
92    >,
93    pub optimization_campaigns:
94        Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
95    pub optimization_experiments:
96        Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
97    pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
98    pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
99    pub bug_monitor_incidents:
100        Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
101    pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
102    pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
103    pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
104    pub workflows: Arc<RwLock<WorkflowRegistry>>,
105    pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
106    pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
107    pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
108    pub routine_session_policies:
109        Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
110    pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
111    pub automation_v2_session_mcp_servers:
112        Arc<RwLock<std::collections::HashMap<String, Vec<String>>>>,
113    pub token_cost_per_1k_usd: f64,
114    pub routines_path: PathBuf,
115    pub routine_history_path: PathBuf,
116    pub routine_runs_path: PathBuf,
117    pub automations_v2_path: PathBuf,
118    pub automation_v2_runs_path: PathBuf,
119    pub optimization_campaigns_path: PathBuf,
120    pub optimization_experiments_path: PathBuf,
121    pub bug_monitor_config_path: PathBuf,
122    pub bug_monitor_drafts_path: PathBuf,
123    pub bug_monitor_incidents_path: PathBuf,
124    pub bug_monitor_posts_path: PathBuf,
125    pub external_actions_path: PathBuf,
126    pub workflow_runs_path: PathBuf,
127    pub workflow_planner_sessions_path: PathBuf,
128    pub context_packs_path: PathBuf,
129    pub workflow_hook_overrides_path: PathBuf,
130    pub agent_teams: AgentTeamRuntime,
131    pub web_ui_enabled: Arc<AtomicBool>,
132    pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
133    pub server_base_url: Arc<std::sync::RwLock<String>>,
134    pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
135    pub host_runtime_context: HostRuntimeContext,
136    pub pack_manager: Arc<PackManager>,
137    pub capability_resolver: Arc<CapabilityResolver>,
138    pub preset_registry: Arc<PresetRegistry>,
139}
140#[derive(Debug, Clone, Serialize, Deserialize, Default)]
141pub struct ChannelStatus {
142    pub enabled: bool,
143    pub connected: bool,
144    pub last_error: Option<String>,
145    pub active_sessions: u64,
146    pub meta: Value,
147}
148#[derive(Debug, Clone, Serialize, Deserialize, Default)]
149struct EffectiveAppConfig {
150    #[serde(default)]
151    pub channels: ChannelsConfigFile,
152    #[serde(default)]
153    pub web_ui: WebUiConfig,
154    #[serde(default)]
155    pub browser: tandem_core::BrowserConfig,
156    #[serde(default)]
157    pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
158}
159
160#[derive(Default)]
161pub struct ChannelRuntime {
162    pub listeners: Option<tokio::task::JoinSet<()>>,
163    pub statuses: std::collections::HashMap<String, ChannelStatus>,
164}
165
166#[derive(Debug, Clone)]
167pub struct StatusIndexUpdate {
168    pub key: String,
169    pub value: Value,
170}
171
172impl AppState {
173    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
174        Self {
175            runtime: Arc::new(OnceLock::new()),
176            startup: Arc::new(RwLock::new(StartupState {
177                status: StartupStatus::Starting,
178                phase: "boot".to_string(),
179                started_at_ms: now_ms(),
180                attempt_id,
181                last_error: None,
182            })),
183            in_process_mode: Arc::new(AtomicBool::new(in_process)),
184            api_token: Arc::new(RwLock::new(None)),
185            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
186            managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
187            run_registry: RunRegistry::new(),
188            run_stale_ms: config::env::resolve_run_stale_ms(),
189            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
190            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
191            memory_audit_path: config::paths::resolve_memory_audit_path(),
192            protected_audit_path: config::paths::resolve_protected_audit_path(),
193            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
194            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
195            shared_resources_path: config::paths::resolve_shared_resources_path(),
196            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
197            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
198            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
199            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
200            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
201            automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
202                config::env::resolve_scheduler_max_concurrent_runs(),
203            ))),
204            automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
205            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
206            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
207            workflow_planner_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
208            context_packs: Arc::new(RwLock::new(std::collections::HashMap::new())),
209            optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
210            optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
211            bug_monitor_config: Arc::new(
212                RwLock::new(config::env::resolve_bug_monitor_env_config()),
213            ),
214            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
215            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
216            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
217            external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
218            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
219            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
220            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
221            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
222            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
223            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
224            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
225            automation_v2_session_mcp_servers: Arc::new(RwLock::new(
226                std::collections::HashMap::new(),
227            )),
228            routines_path: config::paths::resolve_routines_path(),
229            routine_history_path: config::paths::resolve_routine_history_path(),
230            routine_runs_path: config::paths::resolve_routine_runs_path(),
231            automations_v2_path: config::paths::resolve_automations_v2_path(),
232            automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
233            optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
234            optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
235            bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
236            bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
237            bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
238            bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
239            external_actions_path: config::paths::resolve_external_actions_path(),
240            workflow_runs_path: config::paths::resolve_workflow_runs_path(),
241            workflow_planner_sessions_path: config::paths::resolve_workflow_planner_sessions_path(),
242            context_packs_path: config::paths::resolve_context_packs_path(),
243            workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
244            agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
245            web_ui_enabled: Arc::new(AtomicBool::new(false)),
246            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
247            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
248            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
249            host_runtime_context: detect_host_runtime_context(),
250            token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
251            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
252            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
253            preset_registry: Arc::new(PresetRegistry::new(
254                PackManager::default_root(),
255                resolve_shared_paths()
256                    .map(|paths| paths.canonical_root)
257                    .unwrap_or_else(|_| {
258                        dirs::home_dir()
259                            .unwrap_or_else(|| PathBuf::from("."))
260                            .join(".tandem")
261                    }),
262            )),
263        }
264    }
265
266    pub fn is_ready(&self) -> bool {
267        self.runtime.get().is_some()
268    }
269
270    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
271        for _ in 0..attempts {
272            if self.is_ready() {
273                return true;
274            }
275            let startup = self.startup_snapshot().await;
276            if matches!(startup.status, StartupStatus::Failed) {
277                return false;
278            }
279            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
280        }
281        self.is_ready()
282    }
283
284    pub fn mode_label(&self) -> &'static str {
285        if self.in_process_mode.load(Ordering::Relaxed) {
286            "in-process"
287        } else {
288            "sidecar"
289        }
290    }
291
292    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
293        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
294        if let Ok(mut guard) = self.web_ui_prefix.write() {
295            *guard = config::webui::normalize_web_ui_prefix(&prefix);
296        }
297    }
298
299    pub fn web_ui_enabled(&self) -> bool {
300        self.web_ui_enabled.load(Ordering::Relaxed)
301    }
302
303    pub fn web_ui_prefix(&self) -> String {
304        self.web_ui_prefix
305            .read()
306            .map(|v| v.clone())
307            .unwrap_or_else(|_| "/admin".to_string())
308    }
309
310    pub fn set_server_base_url(&self, base_url: String) {
311        if let Ok(mut guard) = self.server_base_url.write() {
312            *guard = base_url;
313        }
314    }
315
316    pub fn server_base_url(&self) -> String {
317        self.server_base_url
318            .read()
319            .map(|v| v.clone())
320            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
321    }
322
323    pub async fn api_token(&self) -> Option<String> {
324        self.api_token.read().await.clone()
325    }
326
327    pub async fn set_api_token(&self, token: Option<String>) {
328        *self.api_token.write().await = token;
329    }
330
331    pub async fn startup_snapshot(&self) -> StartupSnapshot {
332        let state = self.startup.read().await.clone();
333        StartupSnapshot {
334            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
335            status: state.status,
336            phase: state.phase,
337            started_at_ms: state.started_at_ms,
338            attempt_id: state.attempt_id,
339            last_error: state.last_error,
340        }
341    }
342
343    pub fn host_runtime_context(&self) -> HostRuntimeContext {
344        self.runtime
345            .get()
346            .map(|runtime| runtime.host_runtime_context.clone())
347            .unwrap_or_else(|| self.host_runtime_context.clone())
348    }
349
350    pub async fn set_phase(&self, phase: impl Into<String>) {
351        let mut startup = self.startup.write().await;
352        startup.phase = phase.into();
353    }
354
355    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
356        self.runtime
357            .set(runtime)
358            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
359        #[cfg(feature = "browser")]
360        self.register_browser_tools().await?;
361        self.tools
362            .register_tool(
363                "pack_builder".to_string(),
364                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
365            )
366            .await;
367        self.tools
368            .register_tool(
369                "mcp_list".to_string(),
370                Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
371            )
372            .await;
373        self.engine_loop
374            .set_spawn_agent_hook(std::sync::Arc::new(
375                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
376            ))
377            .await;
378        self.engine_loop
379            .set_tool_policy_hook(std::sync::Arc::new(
380                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
381            ))
382            .await;
383        self.engine_loop
384            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
385                self.clone(),
386            )))
387            .await;
388        let _ = self.load_shared_resources().await;
389        self.load_routines().await?;
390        let _ = self.load_routine_history().await;
391        let _ = self.load_routine_runs().await;
392        self.load_automations_v2().await?;
393        let _ = self.load_automation_v2_runs().await;
394        let _ = self.load_optimization_campaigns().await;
395        let _ = self.load_optimization_experiments().await;
396        let _ = self.load_bug_monitor_config().await;
397        let _ = self.load_bug_monitor_drafts().await;
398        let _ = self.load_bug_monitor_incidents().await;
399        let _ = self.load_bug_monitor_posts().await;
400        let _ = self.load_external_actions().await;
401        let _ = self.load_workflow_planner_sessions().await;
402        let _ = self.load_context_packs().await;
403        let _ = self.load_workflow_runs().await;
404        let _ = self.load_workflow_hook_overrides().await;
405        let _ = self.reload_workflows().await;
406        let workspace_root = self.workspace_index.snapshot().await.root;
407        let _ = self
408            .agent_teams
409            .ensure_loaded_for_workspace(&workspace_root)
410            .await;
411        let mut startup = self.startup.write().await;
412        startup.status = StartupStatus::Ready;
413        startup.phase = "ready".to_string();
414        startup.last_error = None;
415        Ok(())
416    }
417
418    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
419        let mut startup = self.startup.write().await;
420        startup.status = StartupStatus::Failed;
421        startup.phase = phase.into();
422        startup.last_error = Some(error.into());
423    }
424
425    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
426        let runtime = self.channels_runtime.lock().await;
427        runtime.statuses.clone()
428    }
429
430    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
431        let effective = self.config.get_effective_value().await;
432        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
433        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
434
435        let mut runtime = self.channels_runtime.lock().await;
436        if let Some(listeners) = runtime.listeners.as_mut() {
437            listeners.abort_all();
438        }
439        runtime.listeners = None;
440        runtime.statuses.clear();
441
442        let mut status_map = std::collections::HashMap::new();
443        status_map.insert(
444            "telegram".to_string(),
445            ChannelStatus {
446                enabled: parsed.channels.telegram.is_some(),
447                connected: false,
448                last_error: None,
449                active_sessions: 0,
450                meta: serde_json::json!({}),
451            },
452        );
453        status_map.insert(
454            "discord".to_string(),
455            ChannelStatus {
456                enabled: parsed.channels.discord.is_some(),
457                connected: false,
458                last_error: None,
459                active_sessions: 0,
460                meta: serde_json::json!({}),
461            },
462        );
463        status_map.insert(
464            "slack".to_string(),
465            ChannelStatus {
466                enabled: parsed.channels.slack.is_some(),
467                connected: false,
468                last_error: None,
469                active_sessions: 0,
470                meta: serde_json::json!({}),
471            },
472        );
473
474        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
475            let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
476            runtime.listeners = Some(listeners);
477            for status in status_map.values_mut() {
478                if status.enabled {
479                    status.connected = true;
480                }
481            }
482        }
483
484        runtime.statuses = status_map.clone();
485        drop(runtime);
486
487        self.event_bus.publish(EngineEvent::new(
488            "channel.status.changed",
489            serde_json::json!({ "channels": status_map }),
490        ));
491        Ok(())
492    }
493
494    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
495        if !self.shared_resources_path.exists() {
496            return Ok(());
497        }
498        let raw = fs::read_to_string(&self.shared_resources_path).await?;
499        let parsed =
500            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
501                .unwrap_or_default();
502        let mut guard = self.shared_resources.write().await;
503        *guard = parsed;
504        Ok(())
505    }
506
507    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
508        if let Some(parent) = self.shared_resources_path.parent() {
509            fs::create_dir_all(parent).await?;
510        }
511        let payload = {
512            let guard = self.shared_resources.read().await;
513            serde_json::to_string_pretty(&*guard)?
514        };
515        fs::write(&self.shared_resources_path, payload).await?;
516        Ok(())
517    }
518
519    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
520        self.shared_resources.read().await.get(key).cloned()
521    }
522
523    pub async fn list_shared_resources(
524        &self,
525        prefix: Option<&str>,
526        limit: usize,
527    ) -> Vec<SharedResourceRecord> {
528        let limit = limit.clamp(1, 500);
529        let mut rows = self
530            .shared_resources
531            .read()
532            .await
533            .values()
534            .filter(|record| {
535                if let Some(prefix) = prefix {
536                    record.key.starts_with(prefix)
537                } else {
538                    true
539                }
540            })
541            .cloned()
542            .collect::<Vec<_>>();
543        rows.sort_by(|a, b| a.key.cmp(&b.key));
544        rows.truncate(limit);
545        rows
546    }
547
548    pub async fn put_shared_resource(
549        &self,
550        key: String,
551        value: Value,
552        if_match_rev: Option<u64>,
553        updated_by: String,
554        ttl_ms: Option<u64>,
555    ) -> Result<SharedResourceRecord, ResourceStoreError> {
556        if !is_valid_resource_key(&key) {
557            return Err(ResourceStoreError::InvalidKey { key });
558        }
559
560        let now = now_ms();
561        let mut guard = self.shared_resources.write().await;
562        let existing = guard.get(&key).cloned();
563
564        if let Some(expected) = if_match_rev {
565            let current = existing.as_ref().map(|row| row.rev);
566            if current != Some(expected) {
567                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
568                    key,
569                    expected_rev: Some(expected),
570                    current_rev: current,
571                }));
572            }
573        }
574
575        let next_rev = existing
576            .as_ref()
577            .map(|row| row.rev.saturating_add(1))
578            .unwrap_or(1);
579
580        let record = SharedResourceRecord {
581            key: key.clone(),
582            value,
583            rev: next_rev,
584            updated_at_ms: now,
585            updated_by,
586            ttl_ms,
587        };
588
589        let previous = guard.insert(key.clone(), record.clone());
590        drop(guard);
591
592        if let Err(error) = self.persist_shared_resources().await {
593            let mut rollback = self.shared_resources.write().await;
594            if let Some(previous) = previous {
595                rollback.insert(key, previous);
596            } else {
597                rollback.remove(&key);
598            }
599            return Err(ResourceStoreError::PersistFailed {
600                message: error.to_string(),
601            });
602        }
603
604        Ok(record)
605    }
606
607    pub async fn delete_shared_resource(
608        &self,
609        key: &str,
610        if_match_rev: Option<u64>,
611    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
612        if !is_valid_resource_key(key) {
613            return Err(ResourceStoreError::InvalidKey {
614                key: key.to_string(),
615            });
616        }
617
618        let mut guard = self.shared_resources.write().await;
619        let current = guard.get(key).cloned();
620        if let Some(expected) = if_match_rev {
621            let current_rev = current.as_ref().map(|row| row.rev);
622            if current_rev != Some(expected) {
623                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
624                    key: key.to_string(),
625                    expected_rev: Some(expected),
626                    current_rev,
627                }));
628            }
629        }
630
631        let removed = guard.remove(key);
632        drop(guard);
633
634        if let Err(error) = self.persist_shared_resources().await {
635            if let Some(record) = removed.clone() {
636                self.shared_resources
637                    .write()
638                    .await
639                    .insert(record.key.clone(), record);
640            }
641            return Err(ResourceStoreError::PersistFailed {
642                message: error.to_string(),
643            });
644        }
645
646        Ok(removed)
647    }
648
649    pub async fn load_routines(&self) -> anyhow::Result<()> {
650        if !self.routines_path.exists() {
651            return Ok(());
652        }
653        let raw = fs::read_to_string(&self.routines_path).await?;
654        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
655            Ok(parsed) => {
656                let mut guard = self.routines.write().await;
657                *guard = parsed;
658                Ok(())
659            }
660            Err(primary_err) => {
661                let backup_path = config::paths::sibling_backup_path(&self.routines_path);
662                if backup_path.exists() {
663                    let backup_raw = fs::read_to_string(&backup_path).await?;
664                    if let Ok(parsed_backup) = serde_json::from_str::<
665                        std::collections::HashMap<String, RoutineSpec>,
666                    >(&backup_raw)
667                    {
668                        let mut guard = self.routines.write().await;
669                        *guard = parsed_backup;
670                        return Ok(());
671                    }
672                }
673                Err(anyhow::anyhow!(
674                    "failed to parse routines store {}: {primary_err}",
675                    self.routines_path.display()
676                ))
677            }
678        }
679    }
680
681    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
682        if !self.routine_history_path.exists() {
683            return Ok(());
684        }
685        let raw = fs::read_to_string(&self.routine_history_path).await?;
686        let parsed = serde_json::from_str::<
687            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
688        >(&raw)
689        .unwrap_or_default();
690        let mut guard = self.routine_history.write().await;
691        *guard = parsed;
692        Ok(())
693    }
694
695    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
696        if !self.routine_runs_path.exists() {
697            return Ok(());
698        }
699        let raw = fs::read_to_string(&self.routine_runs_path).await?;
700        let parsed =
701            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
702                .unwrap_or_default();
703        let mut guard = self.routine_runs.write().await;
704        *guard = parsed;
705        Ok(())
706    }
707
708    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
709        if let Some(parent) = self.routines_path.parent() {
710            fs::create_dir_all(parent).await?;
711        }
712        let (payload, is_empty) = {
713            let guard = self.routines.read().await;
714            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
715        };
716        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
717            let existing_raw = fs::read_to_string(&self.routines_path)
718                .await
719                .unwrap_or_default();
720            let existing_has_rows = serde_json::from_str::<
721                std::collections::HashMap<String, RoutineSpec>,
722            >(&existing_raw)
723            .map(|rows| !rows.is_empty())
724            .unwrap_or(true);
725            if existing_has_rows {
726                return Err(anyhow::anyhow!(
727                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
728                    self.routines_path.display()
729                ));
730            }
731        }
732        let backup_path = config::paths::sibling_backup_path(&self.routines_path);
733        if self.routines_path.exists() {
734            let _ = fs::copy(&self.routines_path, &backup_path).await;
735        }
736        let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
737        fs::write(&tmp_path, payload).await?;
738        fs::rename(&tmp_path, &self.routines_path).await?;
739        Ok(())
740    }
741
742    pub async fn persist_routines(&self) -> anyhow::Result<()> {
743        self.persist_routines_inner(false).await
744    }
745
746    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
747        if let Some(parent) = self.routine_history_path.parent() {
748            fs::create_dir_all(parent).await?;
749        }
750        let payload = {
751            let guard = self.routine_history.read().await;
752            serde_json::to_string_pretty(&*guard)?
753        };
754        fs::write(&self.routine_history_path, payload).await?;
755        Ok(())
756    }
757
758    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
759        if let Some(parent) = self.routine_runs_path.parent() {
760            fs::create_dir_all(parent).await?;
761        }
762        let payload = {
763            let guard = self.routine_runs.read().await;
764            serde_json::to_string_pretty(&*guard)?
765        };
766        fs::write(&self.routine_runs_path, payload).await?;
767        Ok(())
768    }
769
770    pub async fn put_routine(
771        &self,
772        mut routine: RoutineSpec,
773    ) -> Result<RoutineSpec, RoutineStoreError> {
774        if routine.routine_id.trim().is_empty() {
775            return Err(RoutineStoreError::InvalidRoutineId {
776                routine_id: routine.routine_id,
777            });
778        }
779
780        routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
781        routine.output_targets = normalize_non_empty_list(routine.output_targets);
782
783        let now = now_ms();
784        let next_schedule_fire =
785            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
786                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
787                    detail: "invalid schedule or timezone".to_string(),
788                })?;
789        match routine.schedule {
790            RoutineSchedule::IntervalSeconds { seconds } => {
791                if seconds == 0 {
792                    return Err(RoutineStoreError::InvalidSchedule {
793                        detail: "interval_seconds must be > 0".to_string(),
794                    });
795                }
796                let _ = seconds;
797            }
798            RoutineSchedule::Cron { .. } => {}
799        }
800        if routine.next_fire_at_ms.is_none() {
801            routine.next_fire_at_ms = Some(next_schedule_fire);
802        }
803
804        let mut guard = self.routines.write().await;
805        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
806        drop(guard);
807
808        if let Err(error) = self.persist_routines().await {
809            let mut rollback = self.routines.write().await;
810            if let Some(previous) = previous {
811                rollback.insert(previous.routine_id.clone(), previous);
812            } else {
813                rollback.remove(&routine.routine_id);
814            }
815            return Err(RoutineStoreError::PersistFailed {
816                message: error.to_string(),
817            });
818        }
819
820        Ok(routine)
821    }
822
823    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
824        let mut rows = self
825            .routines
826            .read()
827            .await
828            .values()
829            .cloned()
830            .collect::<Vec<_>>();
831        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
832        rows
833    }
834
835    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
836        self.routines.read().await.get(routine_id).cloned()
837    }
838
839    pub async fn delete_routine(
840        &self,
841        routine_id: &str,
842    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
843        let mut guard = self.routines.write().await;
844        let removed = guard.remove(routine_id);
845        drop(guard);
846
847        let allow_empty_overwrite = self.routines.read().await.is_empty();
848        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
849            if let Some(removed) = removed.clone() {
850                self.routines
851                    .write()
852                    .await
853                    .insert(removed.routine_id.clone(), removed);
854            }
855            return Err(RoutineStoreError::PersistFailed {
856                message: error.to_string(),
857            });
858        }
859        Ok(removed)
860    }
861
862    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
863        let mut plans = Vec::new();
864        let mut guard = self.routines.write().await;
865        for routine in guard.values_mut() {
866            if routine.status != RoutineStatus::Active {
867                continue;
868            }
869            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
870                continue;
871            };
872            if now_ms < next_fire_at_ms {
873                continue;
874            }
875            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
876                now_ms,
877                next_fire_at_ms,
878                &routine.schedule,
879                &routine.timezone,
880                &routine.misfire_policy,
881            );
882            routine.next_fire_at_ms = Some(next_fire_at_ms);
883            if run_count == 0 {
884                continue;
885            }
886            plans.push(RoutineTriggerPlan {
887                routine_id: routine.routine_id.clone(),
888                run_count,
889                scheduled_at_ms: now_ms,
890                next_fire_at_ms,
891            });
892        }
893        drop(guard);
894        let _ = self.persist_routines().await;
895        plans
896    }
897
898    pub async fn mark_routine_fired(
899        &self,
900        routine_id: &str,
901        fired_at_ms: u64,
902    ) -> Option<RoutineSpec> {
903        let mut guard = self.routines.write().await;
904        let routine = guard.get_mut(routine_id)?;
905        routine.last_fired_at_ms = Some(fired_at_ms);
906        let updated = routine.clone();
907        drop(guard);
908        let _ = self.persist_routines().await;
909        Some(updated)
910    }
911
912    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
913        let mut history = self.routine_history.write().await;
914        history
915            .entry(event.routine_id.clone())
916            .or_default()
917            .push(event);
918        drop(history);
919        let _ = self.persist_routine_history().await;
920    }
921
922    pub async fn list_routine_history(
923        &self,
924        routine_id: &str,
925        limit: usize,
926    ) -> Vec<RoutineHistoryEvent> {
927        let limit = limit.clamp(1, 500);
928        let mut rows = self
929            .routine_history
930            .read()
931            .await
932            .get(routine_id)
933            .cloned()
934            .unwrap_or_default();
935        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
936        rows.truncate(limit);
937        rows
938    }
939
940    pub async fn create_routine_run(
941        &self,
942        routine: &RoutineSpec,
943        trigger_type: &str,
944        run_count: u32,
945        status: RoutineRunStatus,
946        detail: Option<String>,
947    ) -> RoutineRunRecord {
948        let now = now_ms();
949        let record = RoutineRunRecord {
950            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
951            routine_id: routine.routine_id.clone(),
952            trigger_type: trigger_type.to_string(),
953            run_count,
954            status,
955            created_at_ms: now,
956            updated_at_ms: now,
957            fired_at_ms: Some(now),
958            started_at_ms: None,
959            finished_at_ms: None,
960            requires_approval: routine.requires_approval,
961            approval_reason: None,
962            denial_reason: None,
963            paused_reason: None,
964            detail,
965            entrypoint: routine.entrypoint.clone(),
966            args: routine.args.clone(),
967            allowed_tools: routine.allowed_tools.clone(),
968            output_targets: routine.output_targets.clone(),
969            artifacts: Vec::new(),
970            active_session_ids: Vec::new(),
971            latest_session_id: None,
972            prompt_tokens: 0,
973            completion_tokens: 0,
974            total_tokens: 0,
975            estimated_cost_usd: 0.0,
976        };
977        self.routine_runs
978            .write()
979            .await
980            .insert(record.run_id.clone(), record.clone());
981        let _ = self.persist_routine_runs().await;
982        record
983    }
984
985    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
986        self.routine_runs.read().await.get(run_id).cloned()
987    }
988
989    pub async fn list_routine_runs(
990        &self,
991        routine_id: Option<&str>,
992        limit: usize,
993    ) -> Vec<RoutineRunRecord> {
994        let mut rows = self
995            .routine_runs
996            .read()
997            .await
998            .values()
999            .filter(|row| {
1000                if let Some(id) = routine_id {
1001                    row.routine_id == id
1002                } else {
1003                    true
1004                }
1005            })
1006            .cloned()
1007            .collect::<Vec<_>>();
1008        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1009        rows.truncate(limit.clamp(1, 500));
1010        rows
1011    }
1012
1013    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1014        let mut guard = self.routine_runs.write().await;
1015        let next_run_id = guard
1016            .values()
1017            .filter(|row| row.status == RoutineRunStatus::Queued)
1018            .min_by(|a, b| {
1019                a.created_at_ms
1020                    .cmp(&b.created_at_ms)
1021                    .then_with(|| a.run_id.cmp(&b.run_id))
1022            })
1023            .map(|row| row.run_id.clone())?;
1024        let now = now_ms();
1025        let row = guard.get_mut(&next_run_id)?;
1026        row.status = RoutineRunStatus::Running;
1027        row.updated_at_ms = now;
1028        row.started_at_ms = Some(now);
1029        let claimed = row.clone();
1030        drop(guard);
1031        let _ = self.persist_routine_runs().await;
1032        Some(claimed)
1033    }
1034
1035    pub async fn set_routine_session_policy(
1036        &self,
1037        session_id: String,
1038        run_id: String,
1039        routine_id: String,
1040        allowed_tools: Vec<String>,
1041    ) {
1042        let policy = RoutineSessionPolicy {
1043            session_id: session_id.clone(),
1044            run_id,
1045            routine_id,
1046            allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1047        };
1048        self.routine_session_policies
1049            .write()
1050            .await
1051            .insert(session_id, policy);
1052    }
1053
1054    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1055        self.routine_session_policies
1056            .read()
1057            .await
1058            .get(session_id)
1059            .cloned()
1060    }
1061
1062    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1063        self.routine_session_policies
1064            .write()
1065            .await
1066            .remove(session_id);
1067    }
1068
1069    pub async fn update_routine_run_status(
1070        &self,
1071        run_id: &str,
1072        status: RoutineRunStatus,
1073        reason: Option<String>,
1074    ) -> Option<RoutineRunRecord> {
1075        let mut guard = self.routine_runs.write().await;
1076        let row = guard.get_mut(run_id)?;
1077        row.status = status.clone();
1078        row.updated_at_ms = now_ms();
1079        match status {
1080            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1081            RoutineRunStatus::Running => {
1082                row.started_at_ms.get_or_insert_with(now_ms);
1083                if let Some(detail) = reason {
1084                    row.detail = Some(detail);
1085                }
1086            }
1087            RoutineRunStatus::Denied => row.denial_reason = reason,
1088            RoutineRunStatus::Paused => row.paused_reason = reason,
1089            RoutineRunStatus::Completed
1090            | RoutineRunStatus::Failed
1091            | RoutineRunStatus::Cancelled => {
1092                row.finished_at_ms = Some(now_ms());
1093                if let Some(detail) = reason {
1094                    row.detail = Some(detail);
1095                }
1096            }
1097            _ => {
1098                if let Some(detail) = reason {
1099                    row.detail = Some(detail);
1100                }
1101            }
1102        }
1103        let updated = row.clone();
1104        drop(guard);
1105        let _ = self.persist_routine_runs().await;
1106        Some(updated)
1107    }
1108
1109    pub async fn append_routine_run_artifact(
1110        &self,
1111        run_id: &str,
1112        artifact: RoutineRunArtifact,
1113    ) -> Option<RoutineRunRecord> {
1114        let mut guard = self.routine_runs.write().await;
1115        let row = guard.get_mut(run_id)?;
1116        row.updated_at_ms = now_ms();
1117        row.artifacts.push(artifact);
1118        let updated = row.clone();
1119        drop(guard);
1120        let _ = self.persist_routine_runs().await;
1121        Some(updated)
1122    }
1123
1124    pub async fn add_active_session_id(
1125        &self,
1126        run_id: &str,
1127        session_id: String,
1128    ) -> Option<RoutineRunRecord> {
1129        let mut guard = self.routine_runs.write().await;
1130        let row = guard.get_mut(run_id)?;
1131        if !row.active_session_ids.iter().any(|id| id == &session_id) {
1132            row.active_session_ids.push(session_id);
1133        }
1134        row.latest_session_id = row.active_session_ids.last().cloned();
1135        row.updated_at_ms = now_ms();
1136        let updated = row.clone();
1137        drop(guard);
1138        let _ = self.persist_routine_runs().await;
1139        Some(updated)
1140    }
1141
1142    pub async fn clear_active_session_id(
1143        &self,
1144        run_id: &str,
1145        session_id: &str,
1146    ) -> Option<RoutineRunRecord> {
1147        let mut guard = self.routine_runs.write().await;
1148        let row = guard.get_mut(run_id)?;
1149        row.active_session_ids.retain(|id| id != session_id);
1150        row.updated_at_ms = now_ms();
1151        let updated = row.clone();
1152        drop(guard);
1153        let _ = self.persist_routine_runs().await;
1154        Some(updated)
1155    }
1156
1157    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1158        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1159        let mut loaded_from_alternate = false;
1160        let mut migrated = false;
1161        let mut path_counts = Vec::new();
1162        let mut canonical_loaded = false;
1163        if self.automations_v2_path.exists() {
1164            let raw = fs::read_to_string(&self.automations_v2_path).await?;
1165            if raw.trim().is_empty() || raw.trim() == "{}" {
1166                path_counts.push((self.automations_v2_path.clone(), 0usize));
1167            } else {
1168                let parsed = parse_automation_v2_file(&raw);
1169                path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1170                canonical_loaded = !parsed.is_empty();
1171                merged = parsed;
1172            }
1173        } else {
1174            path_counts.push((self.automations_v2_path.clone(), 0usize));
1175        }
1176        if !canonical_loaded {
1177            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1178                if path == self.automations_v2_path {
1179                    continue;
1180                }
1181                if !path.exists() {
1182                    path_counts.push((path, 0usize));
1183                    continue;
1184                }
1185                let raw = fs::read_to_string(&path).await?;
1186                if raw.trim().is_empty() || raw.trim() == "{}" {
1187                    path_counts.push((path, 0usize));
1188                    continue;
1189                }
1190                let parsed = parse_automation_v2_file(&raw);
1191                path_counts.push((path.clone(), parsed.len()));
1192                if !parsed.is_empty() {
1193                    loaded_from_alternate = true;
1194                }
1195                for (automation_id, automation) in parsed {
1196                    match merged.get(&automation_id) {
1197                        Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1198                        _ => {
1199                            merged.insert(automation_id, automation);
1200                        }
1201                    }
1202                }
1203            }
1204        } else {
1205            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1206                if path == self.automations_v2_path {
1207                    continue;
1208                }
1209                if !path.exists() {
1210                    path_counts.push((path, 0usize));
1211                    continue;
1212                }
1213                let raw = fs::read_to_string(&path).await?;
1214                let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1215                    0usize
1216                } else {
1217                    parse_automation_v2_file(&raw).len()
1218                };
1219                path_counts.push((path, count));
1220            }
1221        }
1222        let active_path = self.automations_v2_path.display().to_string();
1223        let path_count_summary = path_counts
1224            .iter()
1225            .map(|(path, count)| format!("{}={count}", path.display()))
1226            .collect::<Vec<_>>();
1227        tracing::info!(
1228            active_path,
1229            canonical_loaded,
1230            path_counts = ?path_count_summary,
1231            merged_count = merged.len(),
1232            "loaded automation v2 definitions"
1233        );
1234        for automation in merged.values_mut() {
1235            migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1236        }
1237        *self.automations_v2.write().await = merged;
1238        if loaded_from_alternate || migrated {
1239            let _ = self.persist_automations_v2().await;
1240        } else if canonical_loaded {
1241            let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1242        }
1243        Ok(())
1244    }
1245
1246    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1247        let payload = {
1248            let guard = self.automations_v2.read().await;
1249            serde_json::to_string_pretty(&*guard)?
1250        };
1251        if let Some(parent) = self.automations_v2_path.parent() {
1252            fs::create_dir_all(parent).await?;
1253        }
1254        fs::write(&self.automations_v2_path, &payload).await?;
1255        let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1256        Ok(())
1257    }
1258
1259    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1260        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1261        let mut loaded_from_alternate = false;
1262        let mut path_counts = Vec::new();
1263        for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1264            if !path.exists() {
1265                path_counts.push((path, 0usize));
1266                continue;
1267            }
1268            let raw = fs::read_to_string(&path).await?;
1269            if raw.trim().is_empty() || raw.trim() == "{}" {
1270                path_counts.push((path, 0usize));
1271                continue;
1272            }
1273            let parsed = parse_automation_v2_runs_file(&raw);
1274            path_counts.push((path.clone(), parsed.len()));
1275            if path != self.automation_v2_runs_path {
1276                loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1277            }
1278            for (run_id, run) in parsed {
1279                match merged.get(&run_id) {
1280                    Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1281                    _ => {
1282                        merged.insert(run_id, run);
1283                    }
1284                }
1285            }
1286        }
1287        let active_runs_path = self.automation_v2_runs_path.display().to_string();
1288        let run_path_count_summary = path_counts
1289            .iter()
1290            .map(|(path, count)| format!("{}={count}", path.display()))
1291            .collect::<Vec<_>>();
1292        tracing::info!(
1293            active_path = active_runs_path,
1294            path_counts = ?run_path_count_summary,
1295            merged_count = merged.len(),
1296            "loaded automation v2 runs"
1297        );
1298        *self.automation_v2_runs.write().await = merged;
1299        let recovered = self
1300            .recover_automation_definitions_from_run_snapshots()
1301            .await?;
1302        let automation_count = self.automations_v2.read().await.len();
1303        let run_count = self.automation_v2_runs.read().await.len();
1304        if automation_count == 0 && run_count > 0 {
1305            let active_automations_path = self.automations_v2_path.display().to_string();
1306            let active_runs_path = self.automation_v2_runs_path.display().to_string();
1307            tracing::warn!(
1308                active_automations_path,
1309                active_runs_path,
1310                run_count,
1311                "automation v2 definitions are empty while run history exists"
1312            );
1313        }
1314        if loaded_from_alternate || recovered > 0 {
1315            let _ = self.persist_automation_v2_runs().await;
1316        }
1317        Ok(())
1318    }
1319
1320    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1321        let payload = {
1322            let guard = self.automation_v2_runs.read().await;
1323            serde_json::to_string_pretty(&*guard)?
1324        };
1325        if let Some(parent) = self.automation_v2_runs_path.parent() {
1326            fs::create_dir_all(parent).await?;
1327        }
1328        fs::write(&self.automation_v2_runs_path, &payload).await?;
1329        Ok(())
1330    }
1331
1332    pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1333        if !self.optimization_campaigns_path.exists() {
1334            return Ok(());
1335        }
1336        let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1337        let parsed = parse_optimization_campaigns_file(&raw);
1338        *self.optimization_campaigns.write().await = parsed;
1339        Ok(())
1340    }
1341
1342    pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1343        let payload = {
1344            let guard = self.optimization_campaigns.read().await;
1345            serde_json::to_string_pretty(&*guard)?
1346        };
1347        if let Some(parent) = self.optimization_campaigns_path.parent() {
1348            fs::create_dir_all(parent).await?;
1349        }
1350        fs::write(&self.optimization_campaigns_path, payload).await?;
1351        Ok(())
1352    }
1353
1354    pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1355        if !self.optimization_experiments_path.exists() {
1356            return Ok(());
1357        }
1358        let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1359        let parsed = parse_optimization_experiments_file(&raw);
1360        *self.optimization_experiments.write().await = parsed;
1361        Ok(())
1362    }
1363
1364    pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1365        let payload = {
1366            let guard = self.optimization_experiments.read().await;
1367            serde_json::to_string_pretty(&*guard)?
1368        };
1369        if let Some(parent) = self.optimization_experiments_path.parent() {
1370            fs::create_dir_all(parent).await?;
1371        }
1372        fs::write(&self.optimization_experiments_path, payload).await?;
1373        Ok(())
1374    }
1375
1376    async fn verify_automation_v2_persisted(
1377        &self,
1378        automation_id: &str,
1379        expected_present: bool,
1380    ) -> anyhow::Result<()> {
1381        let active_raw = if self.automations_v2_path.exists() {
1382            fs::read_to_string(&self.automations_v2_path).await?
1383        } else {
1384            String::new()
1385        };
1386        let active_parsed = parse_automation_v2_file(&active_raw);
1387        let active_present = active_parsed.contains_key(automation_id);
1388        if active_present != expected_present {
1389            let active_path = self.automations_v2_path.display().to_string();
1390            tracing::error!(
1391                automation_id,
1392                expected_present,
1393                actual_present = active_present,
1394                count = active_parsed.len(),
1395                active_path,
1396                "automation v2 persistence verification failed"
1397            );
1398            anyhow::bail!(
1399                "automation v2 persistence verification failed for `{}`",
1400                automation_id
1401            );
1402        }
1403        let mut alternate_mismatches = Vec::new();
1404        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1405            if path == self.automations_v2_path {
1406                continue;
1407            }
1408            let raw = if path.exists() {
1409                fs::read_to_string(&path).await?
1410            } else {
1411                String::new()
1412            };
1413            let parsed = parse_automation_v2_file(&raw);
1414            let present = parsed.contains_key(automation_id);
1415            if present != expected_present {
1416                alternate_mismatches.push(format!(
1417                    "{} expected_present={} actual_present={} count={}",
1418                    path.display(),
1419                    expected_present,
1420                    present,
1421                    parsed.len()
1422                ));
1423            }
1424        }
1425        if !alternate_mismatches.is_empty() {
1426            let active_path = self.automations_v2_path.display().to_string();
1427            tracing::warn!(
1428                automation_id,
1429                expected_present,
1430                mismatches = ?alternate_mismatches,
1431                active_path,
1432                "automation v2 alternate persistence paths are stale"
1433            );
1434        }
1435        Ok(())
1436    }
1437
1438    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1439        let runs = self
1440            .automation_v2_runs
1441            .read()
1442            .await
1443            .values()
1444            .cloned()
1445            .collect::<Vec<_>>();
1446        let mut guard = self.automations_v2.write().await;
1447        let mut recovered = 0usize;
1448        for run in runs {
1449            let Some(snapshot) = run.automation_snapshot.clone() else {
1450                continue;
1451            };
1452            let should_replace = match guard.get(&run.automation_id) {
1453                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1454                None => true,
1455            };
1456            if should_replace {
1457                if !guard.contains_key(&run.automation_id) {
1458                    recovered += 1;
1459                }
1460                guard.insert(run.automation_id.clone(), snapshot);
1461            }
1462        }
1463        drop(guard);
1464        if recovered > 0 {
1465            let active_path = self.automations_v2_path.display().to_string();
1466            tracing::warn!(
1467                recovered,
1468                active_path,
1469                "recovered automation v2 definitions from run snapshots"
1470            );
1471            self.persist_automations_v2().await?;
1472        }
1473        Ok(recovered)
1474    }
1475
1476    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1477        let path = if self.bug_monitor_config_path.exists() {
1478            self.bug_monitor_config_path.clone()
1479        } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1480            .exists()
1481        {
1482            config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1483        } else {
1484            return Ok(());
1485        };
1486        let raw = fs::read_to_string(path).await?;
1487        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1488            .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1489        *self.bug_monitor_config.write().await = parsed;
1490        Ok(())
1491    }
1492
1493    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1494        if let Some(parent) = self.bug_monitor_config_path.parent() {
1495            fs::create_dir_all(parent).await?;
1496        }
1497        let payload = {
1498            let guard = self.bug_monitor_config.read().await;
1499            serde_json::to_string_pretty(&*guard)?
1500        };
1501        fs::write(&self.bug_monitor_config_path, payload).await?;
1502        Ok(())
1503    }
1504
1505    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1506        self.bug_monitor_config.read().await.clone()
1507    }
1508
1509    pub async fn put_bug_monitor_config(
1510        &self,
1511        mut config: BugMonitorConfig,
1512    ) -> anyhow::Result<BugMonitorConfig> {
1513        config.workspace_root = config
1514            .workspace_root
1515            .as_ref()
1516            .map(|v| v.trim().to_string())
1517            .filter(|v| !v.is_empty());
1518        if let Some(repo) = config.repo.as_ref() {
1519            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1520                anyhow::bail!("repo must be in owner/repo format");
1521            }
1522        }
1523        if let Some(server) = config.mcp_server.as_ref() {
1524            let servers = self.mcp.list().await;
1525            if !servers.contains_key(server) {
1526                anyhow::bail!("unknown mcp server `{server}`");
1527            }
1528        }
1529        if let Some(model_policy) = config.model_policy.as_ref() {
1530            crate::http::routines_automations::validate_model_policy(model_policy)
1531                .map_err(anyhow::Error::msg)?;
1532        }
1533        config.updated_at_ms = now_ms();
1534        *self.bug_monitor_config.write().await = config.clone();
1535        self.persist_bug_monitor_config().await?;
1536        Ok(config)
1537    }
1538
1539    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1540        let path = if self.bug_monitor_drafts_path.exists() {
1541            self.bug_monitor_drafts_path.clone()
1542        } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1543            .exists()
1544        {
1545            config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1546        } else {
1547            return Ok(());
1548        };
1549        let raw = fs::read_to_string(path).await?;
1550        let parsed =
1551            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1552                .unwrap_or_default();
1553        *self.bug_monitor_drafts.write().await = parsed;
1554        Ok(())
1555    }
1556
1557    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1558        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1559            fs::create_dir_all(parent).await?;
1560        }
1561        let payload = {
1562            let guard = self.bug_monitor_drafts.read().await;
1563            serde_json::to_string_pretty(&*guard)?
1564        };
1565        fs::write(&self.bug_monitor_drafts_path, payload).await?;
1566        Ok(())
1567    }
1568
1569    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1570        let path = if self.bug_monitor_incidents_path.exists() {
1571            self.bug_monitor_incidents_path.clone()
1572        } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1573            .exists()
1574        {
1575            config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1576        } else {
1577            return Ok(());
1578        };
1579        let raw = fs::read_to_string(path).await?;
1580        let parsed = serde_json::from_str::<
1581            std::collections::HashMap<String, BugMonitorIncidentRecord>,
1582        >(&raw)
1583        .unwrap_or_default();
1584        *self.bug_monitor_incidents.write().await = parsed;
1585        Ok(())
1586    }
1587
1588    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1589        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1590            fs::create_dir_all(parent).await?;
1591        }
1592        let payload = {
1593            let guard = self.bug_monitor_incidents.read().await;
1594            serde_json::to_string_pretty(&*guard)?
1595        };
1596        fs::write(&self.bug_monitor_incidents_path, payload).await?;
1597        Ok(())
1598    }
1599
1600    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1601        let path = if self.bug_monitor_posts_path.exists() {
1602            self.bug_monitor_posts_path.clone()
1603        } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1604            .exists()
1605        {
1606            config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1607        } else {
1608            return Ok(());
1609        };
1610        let raw = fs::read_to_string(path).await?;
1611        let parsed =
1612            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1613                .unwrap_or_default();
1614        *self.bug_monitor_posts.write().await = parsed;
1615        Ok(())
1616    }
1617
1618    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1619        if let Some(parent) = self.bug_monitor_posts_path.parent() {
1620            fs::create_dir_all(parent).await?;
1621        }
1622        let payload = {
1623            let guard = self.bug_monitor_posts.read().await;
1624            serde_json::to_string_pretty(&*guard)?
1625        };
1626        fs::write(&self.bug_monitor_posts_path, payload).await?;
1627        Ok(())
1628    }
1629
1630    pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1631        if !self.external_actions_path.exists() {
1632            return Ok(());
1633        }
1634        let raw = fs::read_to_string(&self.external_actions_path).await?;
1635        let parsed =
1636            serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1637                .unwrap_or_default();
1638        *self.external_actions.write().await = parsed;
1639        Ok(())
1640    }
1641
1642    pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1643        if let Some(parent) = self.external_actions_path.parent() {
1644            fs::create_dir_all(parent).await?;
1645        }
1646        let payload = {
1647            let guard = self.external_actions.read().await;
1648            serde_json::to_string_pretty(&*guard)?
1649        };
1650        fs::write(&self.external_actions_path, payload).await?;
1651        Ok(())
1652    }
1653
1654    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1655        let mut rows = self
1656            .bug_monitor_incidents
1657            .read()
1658            .await
1659            .values()
1660            .cloned()
1661            .collect::<Vec<_>>();
1662        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1663        rows.truncate(limit.clamp(1, 200));
1664        rows
1665    }
1666
1667    pub async fn get_bug_monitor_incident(
1668        &self,
1669        incident_id: &str,
1670    ) -> Option<BugMonitorIncidentRecord> {
1671        self.bug_monitor_incidents
1672            .read()
1673            .await
1674            .get(incident_id)
1675            .cloned()
1676    }
1677
1678    pub async fn put_bug_monitor_incident(
1679        &self,
1680        incident: BugMonitorIncidentRecord,
1681    ) -> anyhow::Result<BugMonitorIncidentRecord> {
1682        self.bug_monitor_incidents
1683            .write()
1684            .await
1685            .insert(incident.incident_id.clone(), incident.clone());
1686        self.persist_bug_monitor_incidents().await?;
1687        Ok(incident)
1688    }
1689
1690    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1691        let mut rows = self
1692            .bug_monitor_posts
1693            .read()
1694            .await
1695            .values()
1696            .cloned()
1697            .collect::<Vec<_>>();
1698        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1699        rows.truncate(limit.clamp(1, 200));
1700        rows
1701    }
1702
1703    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1704        self.bug_monitor_posts.read().await.get(post_id).cloned()
1705    }
1706
1707    pub async fn put_bug_monitor_post(
1708        &self,
1709        post: BugMonitorPostRecord,
1710    ) -> anyhow::Result<BugMonitorPostRecord> {
1711        self.bug_monitor_posts
1712            .write()
1713            .await
1714            .insert(post.post_id.clone(), post.clone());
1715        self.persist_bug_monitor_posts().await?;
1716        Ok(post)
1717    }
1718
1719    pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1720        let mut rows = self
1721            .external_actions
1722            .read()
1723            .await
1724            .values()
1725            .cloned()
1726            .collect::<Vec<_>>();
1727        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1728        rows.truncate(limit.clamp(1, 200));
1729        rows
1730    }
1731
1732    pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1733        self.external_actions.read().await.get(action_id).cloned()
1734    }
1735
1736    pub async fn get_external_action_by_idempotency_key(
1737        &self,
1738        idempotency_key: &str,
1739    ) -> Option<ExternalActionRecord> {
1740        let normalized = idempotency_key.trim();
1741        if normalized.is_empty() {
1742            return None;
1743        }
1744        self.external_actions
1745            .read()
1746            .await
1747            .values()
1748            .find(|action| {
1749                action
1750                    .idempotency_key
1751                    .as_deref()
1752                    .map(str::trim)
1753                    .filter(|value| !value.is_empty())
1754                    == Some(normalized)
1755            })
1756            .cloned()
1757    }
1758
1759    pub async fn put_external_action(
1760        &self,
1761        action: ExternalActionRecord,
1762    ) -> anyhow::Result<ExternalActionRecord> {
1763        self.external_actions
1764            .write()
1765            .await
1766            .insert(action.action_id.clone(), action.clone());
1767        self.persist_external_actions().await?;
1768        Ok(action)
1769    }
1770
1771    pub async fn record_external_action(
1772        &self,
1773        action: ExternalActionRecord,
1774    ) -> anyhow::Result<ExternalActionRecord> {
1775        let action = {
1776            let mut guard = self.external_actions.write().await;
1777            if let Some(idempotency_key) = action
1778                .idempotency_key
1779                .as_deref()
1780                .map(str::trim)
1781                .filter(|value| !value.is_empty())
1782            {
1783                if let Some(existing) = guard
1784                    .values()
1785                    .find(|existing| {
1786                        existing
1787                            .idempotency_key
1788                            .as_deref()
1789                            .map(str::trim)
1790                            .filter(|value| !value.is_empty())
1791                            == Some(idempotency_key)
1792                    })
1793                    .cloned()
1794                {
1795                    return Ok(existing);
1796                }
1797            }
1798            guard.insert(action.action_id.clone(), action.clone());
1799            action
1800        };
1801        self.persist_external_actions().await?;
1802        if let Some(run_id) = action.routine_run_id.as_deref() {
1803            let artifact = RoutineRunArtifact {
1804                artifact_id: format!("external-action-{}", action.action_id),
1805                uri: format!("external-action://{}", action.action_id),
1806                kind: "external_action_receipt".to_string(),
1807                label: Some(format!("external action receipt: {}", action.operation)),
1808                created_at_ms: action.updated_at_ms,
1809                metadata: Some(json!({
1810                    "actionID": action.action_id,
1811                    "operation": action.operation,
1812                    "status": action.status,
1813                    "sourceKind": action.source_kind,
1814                    "sourceID": action.source_id,
1815                    "capabilityID": action.capability_id,
1816                    "target": action.target,
1817                })),
1818            };
1819            let _ = self
1820                .append_routine_run_artifact(run_id, artifact.clone())
1821                .await;
1822            if let Some(runtime) = self.runtime.get() {
1823                runtime.event_bus.publish(EngineEvent::new(
1824                    "routine.run.artifact_added",
1825                    json!({
1826                        "runID": run_id,
1827                        "artifact": artifact,
1828                    }),
1829                ));
1830            }
1831        }
1832        if let Some(context_run_id) = action.context_run_id.as_deref() {
1833            let payload = serde_json::to_value(&action)?;
1834            if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1835                self,
1836                context_run_id,
1837                &format!("external-action-{}", action.action_id),
1838                "external_action_receipt",
1839                &format!("external-actions/{}.json", action.action_id),
1840                &payload,
1841            )
1842            .await
1843            {
1844                tracing::warn!(
1845                    "failed to append external action artifact {} to context run {}: {}",
1846                    action.action_id,
1847                    context_run_id,
1848                    error
1849                );
1850            }
1851        }
1852        Ok(action)
1853    }
1854
1855    pub async fn update_bug_monitor_runtime_status(
1856        &self,
1857        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1858    ) -> BugMonitorRuntimeStatus {
1859        let mut guard = self.bug_monitor_runtime_status.write().await;
1860        update(&mut guard);
1861        guard.clone()
1862    }
1863
1864    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1865        let mut rows = self
1866            .bug_monitor_drafts
1867            .read()
1868            .await
1869            .values()
1870            .cloned()
1871            .collect::<Vec<_>>();
1872        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1873        rows.truncate(limit.clamp(1, 200));
1874        rows
1875    }
1876
1877    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1878        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1879    }
1880
1881    pub async fn put_bug_monitor_draft(
1882        &self,
1883        draft: BugMonitorDraftRecord,
1884    ) -> anyhow::Result<BugMonitorDraftRecord> {
1885        self.bug_monitor_drafts
1886            .write()
1887            .await
1888            .insert(draft.draft_id.clone(), draft.clone());
1889        self.persist_bug_monitor_drafts().await?;
1890        Ok(draft)
1891    }
1892
1893    pub async fn submit_bug_monitor_draft(
1894        &self,
1895        mut submission: BugMonitorSubmission,
1896    ) -> anyhow::Result<BugMonitorDraftRecord> {
1897        fn normalize_optional(value: Option<String>) -> Option<String> {
1898            value
1899                .map(|v| v.trim().to_string())
1900                .filter(|v| !v.is_empty())
1901        }
1902
1903        fn compute_fingerprint(parts: &[&str]) -> String {
1904            use std::hash::{Hash, Hasher};
1905
1906            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1907            for part in parts {
1908                part.hash(&mut hasher);
1909            }
1910            format!("{:016x}", hasher.finish())
1911        }
1912
1913        submission.repo = normalize_optional(submission.repo);
1914        submission.title = normalize_optional(submission.title);
1915        submission.detail = normalize_optional(submission.detail);
1916        submission.source = normalize_optional(submission.source);
1917        submission.run_id = normalize_optional(submission.run_id);
1918        submission.session_id = normalize_optional(submission.session_id);
1919        submission.correlation_id = normalize_optional(submission.correlation_id);
1920        submission.file_name = normalize_optional(submission.file_name);
1921        submission.process = normalize_optional(submission.process);
1922        submission.component = normalize_optional(submission.component);
1923        submission.event = normalize_optional(submission.event);
1924        submission.level = normalize_optional(submission.level);
1925        submission.fingerprint = normalize_optional(submission.fingerprint);
1926        submission.excerpt = submission
1927            .excerpt
1928            .into_iter()
1929            .map(|line| line.trim_end().to_string())
1930            .filter(|line| !line.is_empty())
1931            .take(50)
1932            .collect();
1933
1934        let config = self.bug_monitor_config().await;
1935        let repo = submission
1936            .repo
1937            .clone()
1938            .or(config.repo.clone())
1939            .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1940        if !is_valid_owner_repo_slug(&repo) {
1941            anyhow::bail!("Bug Monitor repo must be in owner/repo format");
1942        }
1943
1944        let title = submission.title.clone().unwrap_or_else(|| {
1945            if let Some(event) = submission.event.as_ref() {
1946                format!("Failure detected in {event}")
1947            } else if let Some(component) = submission.component.as_ref() {
1948                format!("Failure detected in {component}")
1949            } else if let Some(process) = submission.process.as_ref() {
1950                format!("Failure detected in {process}")
1951            } else if let Some(source) = submission.source.as_ref() {
1952                format!("Failure report from {source}")
1953            } else {
1954                "Failure report".to_string()
1955            }
1956        });
1957
1958        let mut detail_lines = Vec::new();
1959        if let Some(source) = submission.source.as_ref() {
1960            detail_lines.push(format!("source: {source}"));
1961        }
1962        if let Some(file_name) = submission.file_name.as_ref() {
1963            detail_lines.push(format!("file: {file_name}"));
1964        }
1965        if let Some(level) = submission.level.as_ref() {
1966            detail_lines.push(format!("level: {level}"));
1967        }
1968        if let Some(process) = submission.process.as_ref() {
1969            detail_lines.push(format!("process: {process}"));
1970        }
1971        if let Some(component) = submission.component.as_ref() {
1972            detail_lines.push(format!("component: {component}"));
1973        }
1974        if let Some(event) = submission.event.as_ref() {
1975            detail_lines.push(format!("event: {event}"));
1976        }
1977        if let Some(run_id) = submission.run_id.as_ref() {
1978            detail_lines.push(format!("run_id: {run_id}"));
1979        }
1980        if let Some(session_id) = submission.session_id.as_ref() {
1981            detail_lines.push(format!("session_id: {session_id}"));
1982        }
1983        if let Some(correlation_id) = submission.correlation_id.as_ref() {
1984            detail_lines.push(format!("correlation_id: {correlation_id}"));
1985        }
1986        if let Some(detail) = submission.detail.as_ref() {
1987            detail_lines.push(String::new());
1988            detail_lines.push(detail.clone());
1989        }
1990        if !submission.excerpt.is_empty() {
1991            if !detail_lines.is_empty() {
1992                detail_lines.push(String::new());
1993            }
1994            detail_lines.push("excerpt:".to_string());
1995            detail_lines.extend(submission.excerpt.iter().map(|line| format!("  {line}")));
1996        }
1997        let detail = if detail_lines.is_empty() {
1998            None
1999        } else {
2000            Some(detail_lines.join("\n"))
2001        };
2002
2003        let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
2004            compute_fingerprint(&[
2005                repo.as_str(),
2006                title.as_str(),
2007                detail.as_deref().unwrap_or(""),
2008                submission.source.as_deref().unwrap_or(""),
2009                submission.run_id.as_deref().unwrap_or(""),
2010                submission.session_id.as_deref().unwrap_or(""),
2011                submission.correlation_id.as_deref().unwrap_or(""),
2012            ])
2013        });
2014
2015        let mut drafts = self.bug_monitor_drafts.write().await;
2016        if let Some(existing) = drafts
2017            .values()
2018            .find(|row| row.repo == repo && row.fingerprint == fingerprint)
2019            .cloned()
2020        {
2021            return Ok(existing);
2022        }
2023
2024        let draft = BugMonitorDraftRecord {
2025            draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
2026            fingerprint,
2027            repo,
2028            status: if config.require_approval_for_new_issues {
2029                "approval_required".to_string()
2030            } else {
2031                "draft_ready".to_string()
2032            },
2033            created_at_ms: now_ms(),
2034            triage_run_id: None,
2035            issue_number: None,
2036            title: Some(title),
2037            detail,
2038            github_status: None,
2039            github_issue_url: None,
2040            github_comment_url: None,
2041            github_posted_at_ms: None,
2042            matched_issue_number: None,
2043            matched_issue_state: None,
2044            evidence_digest: None,
2045            last_post_error: None,
2046        };
2047        drafts.insert(draft.draft_id.clone(), draft.clone());
2048        drop(drafts);
2049        self.persist_bug_monitor_drafts().await?;
2050        Ok(draft)
2051    }
2052
2053    pub async fn update_bug_monitor_draft_status(
2054        &self,
2055        draft_id: &str,
2056        next_status: &str,
2057        reason: Option<&str>,
2058    ) -> anyhow::Result<BugMonitorDraftRecord> {
2059        let normalized_status = next_status.trim().to_ascii_lowercase();
2060        if normalized_status != "draft_ready" && normalized_status != "denied" {
2061            anyhow::bail!("unsupported Bug Monitor draft status");
2062        }
2063
2064        let mut drafts = self.bug_monitor_drafts.write().await;
2065        let Some(draft) = drafts.get_mut(draft_id) else {
2066            anyhow::bail!("Bug Monitor draft not found");
2067        };
2068        if !draft.status.eq_ignore_ascii_case("approval_required") {
2069            anyhow::bail!("Bug Monitor draft is not waiting for approval");
2070        }
2071        draft.status = normalized_status.clone();
2072        if let Some(reason) = reason
2073            .map(|value| value.trim())
2074            .filter(|value| !value.is_empty())
2075        {
2076            let next_detail = if let Some(detail) = draft.detail.as_ref() {
2077                format!("{detail}\n\noperator_note: {reason}")
2078            } else {
2079                format!("operator_note: {reason}")
2080            };
2081            draft.detail = Some(next_detail);
2082        }
2083        let updated = draft.clone();
2084        drop(drafts);
2085        self.persist_bug_monitor_drafts().await?;
2086
2087        let event_name = if normalized_status == "draft_ready" {
2088            "bug_monitor.draft.approved"
2089        } else {
2090            "bug_monitor.draft.denied"
2091        };
2092        self.event_bus.publish(EngineEvent::new(
2093            event_name,
2094            serde_json::json!({
2095                "draft_id": updated.draft_id,
2096                "repo": updated.repo,
2097                "status": updated.status,
2098                "reason": reason,
2099            }),
2100        ));
2101        Ok(updated)
2102    }
2103
2104    pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2105        let required_capabilities = vec![
2106            "github.list_issues".to_string(),
2107            "github.get_issue".to_string(),
2108            "github.create_issue".to_string(),
2109            "github.comment_on_issue".to_string(),
2110        ];
2111        let config = self.bug_monitor_config().await;
2112        let drafts = self.bug_monitor_drafts.read().await;
2113        let incidents = self.bug_monitor_incidents.read().await;
2114        let posts = self.bug_monitor_posts.read().await;
2115        let total_incidents = incidents.len();
2116        let pending_incidents = incidents
2117            .values()
2118            .filter(|row| {
2119                matches!(
2120                    row.status.as_str(),
2121                    "queued"
2122                        | "draft_created"
2123                        | "triage_queued"
2124                        | "analysis_queued"
2125                        | "triage_pending"
2126                        | "issue_draft_pending"
2127                )
2128            })
2129            .count();
2130        let pending_drafts = drafts
2131            .values()
2132            .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2133            .count();
2134        let pending_posts = posts
2135            .values()
2136            .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2137            .count();
2138        let last_activity_at_ms = drafts
2139            .values()
2140            .map(|row| row.created_at_ms)
2141            .chain(posts.values().map(|row| row.updated_at_ms))
2142            .max();
2143        drop(drafts);
2144        drop(incidents);
2145        drop(posts);
2146        let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2147        runtime.paused = config.paused;
2148        runtime.total_incidents = total_incidents;
2149        runtime.pending_incidents = pending_incidents;
2150        runtime.pending_posts = pending_posts;
2151
2152        let mut status = BugMonitorStatus {
2153            config: config.clone(),
2154            runtime,
2155            pending_drafts,
2156            pending_posts,
2157            last_activity_at_ms,
2158            ..BugMonitorStatus::default()
2159        };
2160        let repo_valid = config
2161            .repo
2162            .as_ref()
2163            .map(|repo| is_valid_owner_repo_slug(repo))
2164            .unwrap_or(false);
2165        let servers = self.mcp.list().await;
2166        let selected_server = config
2167            .mcp_server
2168            .as_ref()
2169            .and_then(|name| servers.get(name))
2170            .cloned();
2171        let provider_catalog = self.providers.list().await;
2172        let selected_model = config
2173            .model_policy
2174            .as_ref()
2175            .and_then(|policy| policy.get("default_model"))
2176            .and_then(crate::app::routines::parse_model_spec);
2177        let selected_model_ready = selected_model
2178            .as_ref()
2179            .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2180            .unwrap_or(false);
2181        let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2182            self.mcp.server_tools(server_name).await
2183        } else {
2184            Vec::new()
2185        };
2186        let discovered_tools = self
2187            .capability_resolver
2188            .discover_from_runtime(selected_server_tools, Vec::new())
2189            .await;
2190        status.discovered_mcp_tools = discovered_tools
2191            .iter()
2192            .map(|row| row.tool_name.clone())
2193            .collect();
2194        let discovered_providers = discovered_tools
2195            .iter()
2196            .map(|row| row.provider.to_ascii_lowercase())
2197            .collect::<std::collections::HashSet<_>>();
2198        let provider_preference = match config.provider_preference {
2199            BugMonitorProviderPreference::OfficialGithub => {
2200                vec![
2201                    "mcp".to_string(),
2202                    "composio".to_string(),
2203                    "arcade".to_string(),
2204                ]
2205            }
2206            BugMonitorProviderPreference::Composio => {
2207                vec![
2208                    "composio".to_string(),
2209                    "mcp".to_string(),
2210                    "arcade".to_string(),
2211                ]
2212            }
2213            BugMonitorProviderPreference::Arcade => {
2214                vec![
2215                    "arcade".to_string(),
2216                    "mcp".to_string(),
2217                    "composio".to_string(),
2218                ]
2219            }
2220            BugMonitorProviderPreference::Auto => {
2221                vec![
2222                    "mcp".to_string(),
2223                    "composio".to_string(),
2224                    "arcade".to_string(),
2225                ]
2226            }
2227        };
2228        let capability_resolution = self
2229            .capability_resolver
2230            .resolve(
2231                crate::capability_resolver::CapabilityResolveInput {
2232                    workflow_id: Some("bug_monitor".to_string()),
2233                    required_capabilities: required_capabilities.clone(),
2234                    optional_capabilities: Vec::new(),
2235                    provider_preference,
2236                    available_tools: discovered_tools,
2237                },
2238                Vec::new(),
2239            )
2240            .await
2241            .ok();
2242        let bindings_file = self.capability_resolver.list_bindings().await.ok();
2243        if let Some(bindings) = bindings_file.as_ref() {
2244            status.binding_source_version = bindings.builtin_version.clone();
2245            status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2246            status.selected_server_binding_candidates = bindings
2247                .bindings
2248                .iter()
2249                .filter(|binding| required_capabilities.contains(&binding.capability_id))
2250                .filter(|binding| {
2251                    discovered_providers.is_empty()
2252                        || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2253                })
2254                .map(|binding| {
2255                    let binding_key = format!(
2256                        "{}::{}",
2257                        binding.capability_id,
2258                        binding.tool_name.to_ascii_lowercase()
2259                    );
2260                    let matched = capability_resolution
2261                        .as_ref()
2262                        .map(|resolution| {
2263                            resolution.resolved.iter().any(|row| {
2264                                row.capability_id == binding.capability_id
2265                                    && format!(
2266                                        "{}::{}",
2267                                        row.capability_id,
2268                                        row.tool_name.to_ascii_lowercase()
2269                                    ) == binding_key
2270                            })
2271                        })
2272                        .unwrap_or(false);
2273                    BugMonitorBindingCandidate {
2274                        capability_id: binding.capability_id.clone(),
2275                        binding_tool_name: binding.tool_name.clone(),
2276                        aliases: binding.tool_name_aliases.clone(),
2277                        matched,
2278                    }
2279                })
2280                .collect();
2281            status.selected_server_binding_candidates.sort_by(|a, b| {
2282                a.capability_id
2283                    .cmp(&b.capability_id)
2284                    .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2285            });
2286        }
2287        let capability_ready = |capability_id: &str| -> bool {
2288            capability_resolution
2289                .as_ref()
2290                .map(|resolved| {
2291                    resolved
2292                        .resolved
2293                        .iter()
2294                        .any(|row| row.capability_id == capability_id)
2295                })
2296                .unwrap_or(false)
2297        };
2298        if let Some(resolution) = capability_resolution.as_ref() {
2299            status.missing_required_capabilities = resolution.missing_required.clone();
2300            status.resolved_capabilities = resolution
2301                .resolved
2302                .iter()
2303                .map(|row| BugMonitorCapabilityMatch {
2304                    capability_id: row.capability_id.clone(),
2305                    provider: row.provider.clone(),
2306                    tool_name: row.tool_name.clone(),
2307                    binding_index: row.binding_index,
2308                })
2309                .collect();
2310        } else {
2311            status.missing_required_capabilities = required_capabilities.clone();
2312        }
2313        status.required_capabilities = BugMonitorCapabilityReadiness {
2314            github_list_issues: capability_ready("github.list_issues"),
2315            github_get_issue: capability_ready("github.get_issue"),
2316            github_create_issue: capability_ready("github.create_issue"),
2317            github_comment_on_issue: capability_ready("github.comment_on_issue"),
2318        };
2319        status.selected_model = selected_model;
2320        status.readiness = BugMonitorReadiness {
2321            config_valid: repo_valid
2322                && selected_server.is_some()
2323                && status.required_capabilities.github_list_issues
2324                && status.required_capabilities.github_get_issue
2325                && status.required_capabilities.github_create_issue
2326                && status.required_capabilities.github_comment_on_issue
2327                && selected_model_ready,
2328            repo_valid,
2329            mcp_server_present: selected_server.is_some(),
2330            mcp_connected: selected_server
2331                .as_ref()
2332                .map(|row| row.connected)
2333                .unwrap_or(false),
2334            github_read_ready: status.required_capabilities.github_list_issues
2335                && status.required_capabilities.github_get_issue,
2336            github_write_ready: status.required_capabilities.github_create_issue
2337                && status.required_capabilities.github_comment_on_issue,
2338            selected_model_ready,
2339            ingest_ready: config.enabled && !config.paused && repo_valid,
2340            publish_ready: config.enabled
2341                && !config.paused
2342                && repo_valid
2343                && selected_server
2344                    .as_ref()
2345                    .map(|row| row.connected)
2346                    .unwrap_or(false)
2347                && status.required_capabilities.github_list_issues
2348                && status.required_capabilities.github_get_issue
2349                && status.required_capabilities.github_create_issue
2350                && status.required_capabilities.github_comment_on_issue
2351                && selected_model_ready,
2352            runtime_ready: config.enabled
2353                && !config.paused
2354                && repo_valid
2355                && selected_server
2356                    .as_ref()
2357                    .map(|row| row.connected)
2358                    .unwrap_or(false)
2359                && status.required_capabilities.github_list_issues
2360                && status.required_capabilities.github_get_issue
2361                && status.required_capabilities.github_create_issue
2362                && status.required_capabilities.github_comment_on_issue
2363                && selected_model_ready,
2364        };
2365        if config.enabled {
2366            if config.paused {
2367                status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2368            } else if !repo_valid {
2369                status.last_error = Some("Target repo is missing or invalid.".to_string());
2370            } else if selected_server.is_none() {
2371                status.last_error = Some("Selected MCP server is missing.".to_string());
2372            } else if !status.readiness.mcp_connected {
2373                status.last_error = Some("Selected MCP server is disconnected.".to_string());
2374            } else if !selected_model_ready {
2375                status.last_error = Some(
2376                    "Selected provider/model is unavailable. Bug monitor is fail-closed."
2377                        .to_string(),
2378                );
2379            } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2380                let missing = if status.missing_required_capabilities.is_empty() {
2381                    "unknown".to_string()
2382                } else {
2383                    status.missing_required_capabilities.join(", ")
2384                };
2385                status.last_error = Some(format!(
2386                    "Selected MCP server is missing required GitHub capabilities: {missing}"
2387                ));
2388            }
2389        }
2390        status.runtime.monitoring_active = status.readiness.ingest_ready;
2391        status
2392    }
2393
2394    pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2395        if !self.workflow_runs_path.exists() {
2396            return Ok(());
2397        }
2398        let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2399        let parsed =
2400            serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2401                .unwrap_or_default();
2402        *self.workflow_runs.write().await = parsed;
2403        Ok(())
2404    }
2405
2406    pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2407        if let Some(parent) = self.workflow_runs_path.parent() {
2408            fs::create_dir_all(parent).await?;
2409        }
2410        let payload = {
2411            let guard = self.workflow_runs.read().await;
2412            serde_json::to_string_pretty(&*guard)?
2413        };
2414        fs::write(&self.workflow_runs_path, payload).await?;
2415        Ok(())
2416    }
2417
2418    pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2419        if !self.workflow_hook_overrides_path.exists() {
2420            return Ok(());
2421        }
2422        let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2423        let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2424            .unwrap_or_default();
2425        *self.workflow_hook_overrides.write().await = parsed;
2426        Ok(())
2427    }
2428
2429    pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2430        if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2431            fs::create_dir_all(parent).await?;
2432        }
2433        let payload = {
2434            let guard = self.workflow_hook_overrides.read().await;
2435            serde_json::to_string_pretty(&*guard)?
2436        };
2437        fs::write(&self.workflow_hook_overrides_path, payload).await?;
2438        Ok(())
2439    }
2440
2441    pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2442        let mut sources = Vec::new();
2443        sources.push(WorkflowLoadSource {
2444            root: config::paths::resolve_builtin_workflows_dir(),
2445            kind: WorkflowSourceKind::BuiltIn,
2446            pack_id: None,
2447        });
2448
2449        let workspace_root = self.workspace_index.snapshot().await.root;
2450        sources.push(WorkflowLoadSource {
2451            root: PathBuf::from(workspace_root).join(".tandem"),
2452            kind: WorkflowSourceKind::Workspace,
2453            pack_id: None,
2454        });
2455
2456        if let Ok(packs) = self.pack_manager.list().await {
2457            for pack in packs {
2458                sources.push(WorkflowLoadSource {
2459                    root: PathBuf::from(pack.install_path),
2460                    kind: WorkflowSourceKind::Pack,
2461                    pack_id: Some(pack.pack_id),
2462                });
2463            }
2464        }
2465
2466        let mut registry = load_workflow_registry(&sources)?;
2467        let overrides = self.workflow_hook_overrides.read().await.clone();
2468        for hook in &mut registry.hooks {
2469            if let Some(enabled) = overrides.get(&hook.binding_id) {
2470                hook.enabled = *enabled;
2471            }
2472        }
2473        for workflow in registry.workflows.values_mut() {
2474            workflow.hooks = registry
2475                .hooks
2476                .iter()
2477                .filter(|hook| hook.workflow_id == workflow.workflow_id)
2478                .cloned()
2479                .collect();
2480        }
2481        let messages = validate_workflow_registry(&registry);
2482        *self.workflows.write().await = registry;
2483        Ok(messages)
2484    }
2485
2486    pub async fn workflow_registry(&self) -> WorkflowRegistry {
2487        self.workflows.read().await.clone()
2488    }
2489
2490    pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2491        let mut rows = self
2492            .workflows
2493            .read()
2494            .await
2495            .workflows
2496            .values()
2497            .cloned()
2498            .collect::<Vec<_>>();
2499        rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2500        rows
2501    }
2502
2503    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2504        self.workflows
2505            .read()
2506            .await
2507            .workflows
2508            .get(workflow_id)
2509            .cloned()
2510    }
2511
2512    pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2513        let mut rows = self
2514            .workflows
2515            .read()
2516            .await
2517            .hooks
2518            .iter()
2519            .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2520            .cloned()
2521            .collect::<Vec<_>>();
2522        rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2523        rows
2524    }
2525
2526    pub async fn set_workflow_hook_enabled(
2527        &self,
2528        binding_id: &str,
2529        enabled: bool,
2530    ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2531        self.workflow_hook_overrides
2532            .write()
2533            .await
2534            .insert(binding_id.to_string(), enabled);
2535        self.persist_workflow_hook_overrides().await?;
2536        let _ = self.reload_workflows().await?;
2537        Ok(self
2538            .workflows
2539            .read()
2540            .await
2541            .hooks
2542            .iter()
2543            .find(|hook| hook.binding_id == binding_id)
2544            .cloned())
2545    }
2546
2547    pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2548        self.workflow_runs
2549            .write()
2550            .await
2551            .insert(run.run_id.clone(), run);
2552        self.persist_workflow_runs().await
2553    }
2554
2555    pub async fn update_workflow_run(
2556        &self,
2557        run_id: &str,
2558        update: impl FnOnce(&mut WorkflowRunRecord),
2559    ) -> Option<WorkflowRunRecord> {
2560        let mut guard = self.workflow_runs.write().await;
2561        let row = guard.get_mut(run_id)?;
2562        update(row);
2563        row.updated_at_ms = now_ms();
2564        if matches!(
2565            row.status,
2566            WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2567        ) {
2568            row.finished_at_ms.get_or_insert_with(now_ms);
2569        }
2570        let out = row.clone();
2571        drop(guard);
2572        let _ = self.persist_workflow_runs().await;
2573        Some(out)
2574    }
2575
2576    pub async fn list_workflow_runs(
2577        &self,
2578        workflow_id: Option<&str>,
2579        limit: usize,
2580    ) -> Vec<WorkflowRunRecord> {
2581        let mut rows = self
2582            .workflow_runs
2583            .read()
2584            .await
2585            .values()
2586            .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2587            .cloned()
2588            .collect::<Vec<_>>();
2589        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2590        rows.truncate(limit.clamp(1, 500));
2591        rows
2592    }
2593
2594    pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2595        self.workflow_runs.read().await.get(run_id).cloned()
2596    }
2597
2598    pub async fn put_automation_v2(
2599        &self,
2600        mut automation: AutomationV2Spec,
2601    ) -> anyhow::Result<AutomationV2Spec> {
2602        if automation.automation_id.trim().is_empty() {
2603            anyhow::bail!("automation_id is required");
2604        }
2605        for agent in &mut automation.agents {
2606            if agent.display_name.trim().is_empty() {
2607                agent.display_name = auto_generated_agent_name(&agent.agent_id);
2608            }
2609            agent.tool_policy.allowlist =
2610                config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2611            agent.tool_policy.denylist =
2612                config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2613            agent.mcp_policy.allowed_servers =
2614                normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2615            agent.mcp_policy.allowed_tools = agent
2616                .mcp_policy
2617                .allowed_tools
2618                .take()
2619                .map(normalize_allowed_tools);
2620        }
2621        let now = now_ms();
2622        if automation.created_at_ms == 0 {
2623            automation.created_at_ms = now;
2624        }
2625        automation.updated_at_ms = now;
2626        if automation.next_fire_at_ms.is_none() {
2627            automation.next_fire_at_ms =
2628                automation_schedule_next_fire_at_ms(&automation.schedule, now);
2629        }
2630        migrate_bundled_studio_research_split_automation(&mut automation);
2631        self.automations_v2
2632            .write()
2633            .await
2634            .insert(automation.automation_id.clone(), automation.clone());
2635        self.persist_automations_v2().await?;
2636        self.verify_automation_v2_persisted(&automation.automation_id, true)
2637            .await?;
2638        Ok(automation)
2639    }
2640
2641    pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2642        self.automations_v2.read().await.get(automation_id).cloned()
2643    }
2644
2645    pub fn automation_v2_runtime_context(
2646        &self,
2647        run: &AutomationV2RunRecord,
2648    ) -> Option<AutomationRuntimeContextMaterialization> {
2649        run.runtime_context.clone().or_else(|| {
2650            run.automation_snapshot.as_ref().and_then(|automation| {
2651                automation
2652                    .runtime_context_materialization()
2653                    .or_else(|| automation.approved_plan_runtime_context_materialization())
2654            })
2655        })
2656    }
2657
2658    fn merge_automation_runtime_context_materializations(
2659        base: Option<AutomationRuntimeContextMaterialization>,
2660        extra: Option<AutomationRuntimeContextMaterialization>,
2661    ) -> Option<AutomationRuntimeContextMaterialization> {
2662        let mut partitions = std::collections::BTreeMap::<
2663            String,
2664            tandem_plan_compiler::api::ProjectedRoutineContextPartition,
2665        >::new();
2666        let mut merge_partition =
2667            |partition: tandem_plan_compiler::api::ProjectedRoutineContextPartition| {
2668                let entry = partitions
2669                    .entry(partition.routine_id.clone())
2670                    .or_insert_with(|| {
2671                        tandem_plan_compiler::api::ProjectedRoutineContextPartition {
2672                            routine_id: partition.routine_id.clone(),
2673                            visible_context_objects: Vec::new(),
2674                            step_context_bindings: Vec::new(),
2675                        }
2676                    });
2677
2678                let mut seen_context_object_ids = entry
2679                    .visible_context_objects
2680                    .iter()
2681                    .map(|context_object| context_object.context_object_id.clone())
2682                    .collect::<std::collections::HashSet<_>>();
2683                for context_object in partition.visible_context_objects {
2684                    if seen_context_object_ids.insert(context_object.context_object_id.clone()) {
2685                        entry.visible_context_objects.push(context_object);
2686                    }
2687                }
2688                entry
2689                    .visible_context_objects
2690                    .sort_by(|left, right| left.context_object_id.cmp(&right.context_object_id));
2691
2692                let mut seen_step_ids = entry
2693                    .step_context_bindings
2694                    .iter()
2695                    .map(|binding| binding.step_id.clone())
2696                    .collect::<std::collections::HashSet<_>>();
2697                for binding in partition.step_context_bindings {
2698                    if seen_step_ids.insert(binding.step_id.clone()) {
2699                        entry.step_context_bindings.push(binding);
2700                    }
2701                }
2702                entry
2703                    .step_context_bindings
2704                    .sort_by(|left, right| left.step_id.cmp(&right.step_id));
2705            };
2706
2707        if let Some(base) = base {
2708            for partition in base.routines {
2709                merge_partition(partition);
2710            }
2711        }
2712        if let Some(extra) = extra {
2713            for partition in extra.routines {
2714                merge_partition(partition);
2715            }
2716        }
2717        if partitions.is_empty() {
2718            None
2719        } else {
2720            Some(AutomationRuntimeContextMaterialization {
2721                routines: partitions.into_values().collect(),
2722            })
2723        }
2724    }
2725
2726    async fn automation_v2_shared_context_runtime_context(
2727        &self,
2728        automation: &AutomationV2Spec,
2729    ) -> anyhow::Result<Option<AutomationRuntimeContextMaterialization>> {
2730        let pack_ids = crate::http::context_packs::shared_context_pack_ids_from_metadata(
2731            automation.metadata.as_ref(),
2732        );
2733        if pack_ids.is_empty() {
2734            return Ok(None);
2735        }
2736
2737        let mut contexts = Vec::new();
2738        for pack_id in pack_ids {
2739            let Some(pack) = self.get_context_pack(&pack_id).await else {
2740                anyhow::bail!("shared workflow context not found: {pack_id}");
2741            };
2742            if pack.state != crate::http::context_packs::ContextPackState::Published {
2743                anyhow::bail!("shared workflow context is not published: {pack_id}");
2744            }
2745            let pack_context = pack
2746                .manifest
2747                .runtime_context
2748                .clone()
2749                .and_then(|value| {
2750                    serde_json::from_value::<AutomationRuntimeContextMaterialization>(value).ok()
2751                })
2752                .or_else(|| {
2753                    pack.manifest
2754                        .plan_package
2755                        .as_ref()
2756                        .and_then(|value| {
2757                            serde_json::from_value::<tandem_plan_compiler::api::PlanPackage>(
2758                                value.clone(),
2759                            )
2760                            .ok()
2761                        })
2762                        .map(|plan_package| {
2763                            tandem_plan_compiler::api::project_plan_context_materialization(
2764                                &plan_package,
2765                            )
2766                        })
2767                });
2768            let Some(pack_context) = pack_context else {
2769                anyhow::bail!("shared workflow context lacks runtime context: {pack_id}");
2770            };
2771            contexts.push(pack_context);
2772        }
2773
2774        let mut merged: Option<AutomationRuntimeContextMaterialization> = None;
2775        for context in contexts {
2776            merged = Self::merge_automation_runtime_context_materializations(merged, Some(context));
2777        }
2778        Ok(merged)
2779    }
2780
2781    async fn automation_v2_effective_runtime_context(
2782        &self,
2783        automation: &AutomationV2Spec,
2784        base_runtime_context: Option<AutomationRuntimeContextMaterialization>,
2785    ) -> anyhow::Result<Option<AutomationRuntimeContextMaterialization>> {
2786        let shared_context = self
2787            .automation_v2_shared_context_runtime_context(automation)
2788            .await?;
2789        Ok(Self::merge_automation_runtime_context_materializations(
2790            base_runtime_context,
2791            shared_context,
2792        ))
2793    }
2794
2795    pub(crate) fn automation_v2_approved_plan_materialization(
2796        &self,
2797        run: &AutomationV2RunRecord,
2798    ) -> Option<tandem_plan_compiler::api::ApprovedPlanMaterialization> {
2799        run.automation_snapshot
2800            .as_ref()
2801            .and_then(AutomationV2Spec::approved_plan_materialization)
2802    }
2803
2804    pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2805        self.workflow_plans
2806            .write()
2807            .await
2808            .insert(plan.plan_id.clone(), plan);
2809    }
2810
2811    pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2812        self.workflow_plans.read().await.get(plan_id).cloned()
2813    }
2814
2815    pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2816        self.workflow_plan_drafts
2817            .write()
2818            .await
2819            .insert(draft.current_plan.plan_id.clone(), draft.clone());
2820        self.put_workflow_plan(draft.current_plan).await;
2821    }
2822
2823    pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2824        self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2825    }
2826
2827    pub async fn load_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2828        if !self.workflow_planner_sessions_path.exists() {
2829            return Ok(());
2830        }
2831        let raw = fs::read_to_string(&self.workflow_planner_sessions_path).await?;
2832        let parsed = serde_json::from_str::<
2833            std::collections::HashMap<
2834                String,
2835                crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2836            >,
2837        >(&raw)
2838        .unwrap_or_default();
2839        self.replace_workflow_planner_sessions(parsed).await?;
2840        Ok(())
2841    }
2842
2843    pub async fn persist_workflow_planner_sessions(&self) -> anyhow::Result<()> {
2844        if let Some(parent) = self.workflow_planner_sessions_path.parent() {
2845            fs::create_dir_all(parent).await?;
2846        }
2847        let payload = {
2848            let guard = self.workflow_planner_sessions.read().await;
2849            serde_json::to_string_pretty(&*guard)?
2850        };
2851        fs::write(&self.workflow_planner_sessions_path, payload).await?;
2852        Ok(())
2853    }
2854
2855    async fn replace_workflow_planner_sessions(
2856        &self,
2857        sessions: std::collections::HashMap<
2858            String,
2859            crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2860        >,
2861    ) -> anyhow::Result<()> {
2862        {
2863            let mut sessions_guard = self.workflow_planner_sessions.write().await;
2864            *sessions_guard = sessions.clone();
2865        }
2866        {
2867            let mut plans = self.workflow_plans.write().await;
2868            let mut drafts = self.workflow_plan_drafts.write().await;
2869            plans.clear();
2870            drafts.clear();
2871            for session in sessions.values() {
2872                if let Some(draft) = session.draft.as_ref() {
2873                    plans.insert(
2874                        draft.current_plan.plan_id.clone(),
2875                        draft.current_plan.clone(),
2876                    );
2877                    drafts.insert(draft.current_plan.plan_id.clone(), draft.clone());
2878                }
2879            }
2880        }
2881        Ok(())
2882    }
2883
2884    async fn sync_workflow_planner_session_cache(
2885        &self,
2886        session: &crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2887    ) {
2888        if let Some(draft) = session.draft.as_ref() {
2889            self.workflow_plans.write().await.insert(
2890                draft.current_plan.plan_id.clone(),
2891                draft.current_plan.clone(),
2892            );
2893            self.workflow_plan_drafts
2894                .write()
2895                .await
2896                .insert(draft.current_plan.plan_id.clone(), draft.clone());
2897        }
2898    }
2899
2900    pub async fn put_workflow_planner_session(
2901        &self,
2902        mut session: crate::http::workflow_planner::WorkflowPlannerSessionRecord,
2903    ) -> anyhow::Result<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2904        if session.session_id.trim().is_empty() {
2905            anyhow::bail!("session_id is required");
2906        }
2907        if session.project_slug.trim().is_empty() {
2908            anyhow::bail!("project_slug is required");
2909        }
2910        let now = now_ms();
2911        if session.created_at_ms == 0 {
2912            session.created_at_ms = now;
2913        }
2914        session.updated_at_ms = now;
2915        {
2916            self.workflow_planner_sessions
2917                .write()
2918                .await
2919                .insert(session.session_id.clone(), session.clone());
2920        }
2921        self.sync_workflow_planner_session_cache(&session).await;
2922        self.persist_workflow_planner_sessions().await?;
2923        Ok(session)
2924    }
2925
2926    pub async fn get_workflow_planner_session(
2927        &self,
2928        session_id: &str,
2929    ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2930        self.workflow_planner_sessions
2931            .read()
2932            .await
2933            .get(session_id)
2934            .cloned()
2935    }
2936
2937    pub async fn list_workflow_planner_sessions(
2938        &self,
2939        project_slug: Option<&str>,
2940    ) -> Vec<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2941        let mut rows = self
2942            .workflow_planner_sessions
2943            .read()
2944            .await
2945            .values()
2946            .filter(|session| {
2947                project_slug
2948                    .map(|slug| session.project_slug == slug)
2949                    .unwrap_or(true)
2950            })
2951            .cloned()
2952            .collect::<Vec<_>>();
2953        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2954        rows
2955    }
2956
2957    pub async fn delete_workflow_planner_session(
2958        &self,
2959        session_id: &str,
2960    ) -> Option<crate::http::workflow_planner::WorkflowPlannerSessionRecord> {
2961        let removed = self
2962            .workflow_planner_sessions
2963            .write()
2964            .await
2965            .remove(session_id);
2966        if let Some(session) = removed.as_ref() {
2967            if let Some(draft) = session.draft.as_ref() {
2968                self.workflow_plan_drafts
2969                    .write()
2970                    .await
2971                    .remove(&draft.current_plan.plan_id);
2972                self.workflow_plans
2973                    .write()
2974                    .await
2975                    .remove(&draft.current_plan.plan_id);
2976            }
2977        }
2978        let _ = self.persist_workflow_planner_sessions().await;
2979        removed
2980    }
2981
2982    pub async fn load_context_packs(&self) -> anyhow::Result<()> {
2983        if !self.context_packs_path.exists() {
2984            return Ok(());
2985        }
2986        let raw = fs::read_to_string(&self.context_packs_path).await?;
2987        let parsed = serde_json::from_str::<
2988            std::collections::HashMap<String, crate::http::context_packs::ContextPackRecord>,
2989        >(&raw)
2990        .unwrap_or_default();
2991        {
2992            let mut guard = self.context_packs.write().await;
2993            *guard = parsed;
2994        }
2995        Ok(())
2996    }
2997
2998    pub async fn persist_context_packs(&self) -> anyhow::Result<()> {
2999        if let Some(parent) = self.context_packs_path.parent() {
3000            fs::create_dir_all(parent).await?;
3001        }
3002        let payload = {
3003            let guard = self.context_packs.read().await;
3004            serde_json::to_string_pretty(&*guard)?
3005        };
3006        fs::write(&self.context_packs_path, payload).await?;
3007        Ok(())
3008    }
3009
3010    pub(crate) async fn put_context_pack(
3011        &self,
3012        mut pack: crate::http::context_packs::ContextPackRecord,
3013    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3014        if pack.pack_id.trim().is_empty() {
3015            anyhow::bail!("pack_id is required");
3016        }
3017        if pack.title.trim().is_empty() {
3018            anyhow::bail!("title is required");
3019        }
3020        if pack.workspace_root.trim().is_empty() {
3021            anyhow::bail!("workspace_root is required");
3022        }
3023        let now = now_ms();
3024        if pack.created_at_ms == 0 {
3025            pack.created_at_ms = now;
3026        }
3027        pack.updated_at_ms = now;
3028        {
3029            self.context_packs
3030                .write()
3031                .await
3032                .insert(pack.pack_id.clone(), pack.clone());
3033        }
3034        self.persist_context_packs().await?;
3035        Ok(pack)
3036    }
3037
3038    pub(crate) async fn get_context_pack(
3039        &self,
3040        pack_id: &str,
3041    ) -> Option<crate::http::context_packs::ContextPackRecord> {
3042        self.context_packs.read().await.get(pack_id).cloned()
3043    }
3044
3045    pub(crate) async fn list_context_packs(
3046        &self,
3047        project_key: Option<&str>,
3048        workspace_root: Option<&str>,
3049    ) -> Vec<crate::http::context_packs::ContextPackRecord> {
3050        let mut rows = self
3051            .context_packs
3052            .read()
3053            .await
3054            .values()
3055            .filter(|pack| {
3056                let project_ok =
3057                    crate::http::context_packs::context_pack_allows_project(pack, project_key);
3058                let workspace_ok = workspace_root
3059                    .map(|root| pack.workspace_root == root)
3060                    .unwrap_or(true);
3061                project_ok && workspace_ok
3062            })
3063            .cloned()
3064            .collect::<Vec<_>>();
3065        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3066        rows
3067    }
3068
3069    pub(crate) async fn update_context_pack(
3070        &self,
3071        pack_id: &str,
3072        update: impl FnOnce(&mut crate::http::context_packs::ContextPackRecord) -> anyhow::Result<()>,
3073    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3074        let mut guard = self.context_packs.write().await;
3075        let Some(pack) = guard.get_mut(pack_id) else {
3076            anyhow::bail!("shared workflow context not found");
3077        };
3078        update(pack)?;
3079        pack.updated_at_ms = now_ms();
3080        let next = pack.clone();
3081        drop(guard);
3082        self.persist_context_packs().await?;
3083        Ok(next)
3084    }
3085
3086    pub(crate) async fn revoke_context_pack(
3087        &self,
3088        pack_id: &str,
3089        revoked_actor_metadata: Option<Value>,
3090    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3091        self.update_context_pack(pack_id, move |pack| {
3092            pack.state = crate::http::context_packs::ContextPackState::Revoked;
3093            pack.revoked_at_ms = Some(now_ms());
3094            pack.revoked_actor_metadata = revoked_actor_metadata;
3095            Ok(())
3096        })
3097        .await
3098    }
3099
3100    pub(crate) async fn supersede_context_pack(
3101        &self,
3102        pack_id: &str,
3103        superseded_by_pack_id: String,
3104        superseded_actor_metadata: Option<Value>,
3105    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3106        self.update_context_pack(pack_id, move |pack| {
3107            pack.state = crate::http::context_packs::ContextPackState::Superseded;
3108            pack.superseded_by_pack_id = Some(superseded_by_pack_id);
3109            pack.superseded_at_ms = Some(now_ms());
3110            pack.superseded_actor_metadata = superseded_actor_metadata;
3111            Ok(())
3112        })
3113        .await
3114    }
3115
3116    pub(crate) async fn bind_context_pack(
3117        &self,
3118        pack_id: &str,
3119        binding: crate::http::context_packs::ContextPackBindingRecord,
3120    ) -> anyhow::Result<crate::http::context_packs::ContextPackRecord> {
3121        self.update_context_pack(pack_id, move |pack| {
3122            pack.bindings
3123                .retain(|row| row.binding_id != binding.binding_id);
3124            pack.bindings.push(binding);
3125            Ok(())
3126        })
3127        .await
3128    }
3129
3130    pub async fn put_optimization_campaign(
3131        &self,
3132        mut campaign: OptimizationCampaignRecord,
3133    ) -> anyhow::Result<OptimizationCampaignRecord> {
3134        if campaign.optimization_id.trim().is_empty() {
3135            anyhow::bail!("optimization_id is required");
3136        }
3137        if campaign.source_workflow_id.trim().is_empty() {
3138            anyhow::bail!("source_workflow_id is required");
3139        }
3140        if campaign.name.trim().is_empty() {
3141            anyhow::bail!("name is required");
3142        }
3143        let now = now_ms();
3144        if campaign.created_at_ms == 0 {
3145            campaign.created_at_ms = now;
3146        }
3147        campaign.updated_at_ms = now;
3148        campaign.source_workflow_snapshot_hash =
3149            optimization_snapshot_hash(&campaign.source_workflow_snapshot);
3150        campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
3151        self.optimization_campaigns
3152            .write()
3153            .await
3154            .insert(campaign.optimization_id.clone(), campaign.clone());
3155        self.persist_optimization_campaigns().await?;
3156        Ok(campaign)
3157    }
3158
3159    pub async fn get_optimization_campaign(
3160        &self,
3161        optimization_id: &str,
3162    ) -> Option<OptimizationCampaignRecord> {
3163        self.optimization_campaigns
3164            .read()
3165            .await
3166            .get(optimization_id)
3167            .cloned()
3168    }
3169
3170    pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
3171        let mut rows = self
3172            .optimization_campaigns
3173            .read()
3174            .await
3175            .values()
3176            .cloned()
3177            .collect::<Vec<_>>();
3178        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3179        rows
3180    }
3181
3182    pub async fn put_optimization_experiment(
3183        &self,
3184        mut experiment: OptimizationExperimentRecord,
3185    ) -> anyhow::Result<OptimizationExperimentRecord> {
3186        if experiment.experiment_id.trim().is_empty() {
3187            anyhow::bail!("experiment_id is required");
3188        }
3189        if experiment.optimization_id.trim().is_empty() {
3190            anyhow::bail!("optimization_id is required");
3191        }
3192        let now = now_ms();
3193        if experiment.created_at_ms == 0 {
3194            experiment.created_at_ms = now;
3195        }
3196        experiment.updated_at_ms = now;
3197        experiment.candidate_snapshot_hash =
3198            optimization_snapshot_hash(&experiment.candidate_snapshot);
3199        self.optimization_experiments
3200            .write()
3201            .await
3202            .insert(experiment.experiment_id.clone(), experiment.clone());
3203        self.persist_optimization_experiments().await?;
3204        Ok(experiment)
3205    }
3206
3207    pub async fn get_optimization_experiment(
3208        &self,
3209        optimization_id: &str,
3210        experiment_id: &str,
3211    ) -> Option<OptimizationExperimentRecord> {
3212        self.optimization_experiments
3213            .read()
3214            .await
3215            .get(experiment_id)
3216            .filter(|row| row.optimization_id == optimization_id)
3217            .cloned()
3218    }
3219
3220    pub async fn list_optimization_experiments(
3221        &self,
3222        optimization_id: &str,
3223    ) -> Vec<OptimizationExperimentRecord> {
3224        let mut rows = self
3225            .optimization_experiments
3226            .read()
3227            .await
3228            .values()
3229            .filter(|row| row.optimization_id == optimization_id)
3230            .cloned()
3231            .collect::<Vec<_>>();
3232        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
3233        rows
3234    }
3235
3236    pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
3237        self.optimization_experiments
3238            .read()
3239            .await
3240            .values()
3241            .filter(|row| row.optimization_id == optimization_id)
3242            .count()
3243    }
3244
3245    fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
3246        matches!(
3247            status,
3248            crate::AutomationRunStatus::Completed
3249                | crate::AutomationRunStatus::Blocked
3250                | crate::AutomationRunStatus::Failed
3251                | crate::AutomationRunStatus::Cancelled
3252        )
3253    }
3254
3255    fn optimization_consecutive_failure_count(
3256        experiments: &[OptimizationExperimentRecord],
3257    ) -> usize {
3258        let mut ordered = experiments.to_vec();
3259        ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
3260        ordered
3261            .iter()
3262            .rev()
3263            .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
3264            .count()
3265    }
3266
3267    fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
3268        match field {
3269            OptimizationMutableField::Objective => "objective",
3270            OptimizationMutableField::OutputContractSummaryGuidance => {
3271                "output_contract.summary_guidance"
3272            }
3273            OptimizationMutableField::TimeoutMs => "timeout_ms",
3274            OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
3275            OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
3276        }
3277    }
3278
3279    fn optimization_node_field_value(
3280        node: &crate::AutomationFlowNode,
3281        field: OptimizationMutableField,
3282    ) -> Result<Value, String> {
3283        match field {
3284            OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
3285            OptimizationMutableField::OutputContractSummaryGuidance => node
3286                .output_contract
3287                .as_ref()
3288                .and_then(|contract| contract.summary_guidance.clone())
3289                .map(Value::String)
3290                .ok_or_else(|| {
3291                    format!(
3292                        "node `{}` is missing output_contract.summary_guidance",
3293                        node.node_id
3294                    )
3295                }),
3296            OptimizationMutableField::TimeoutMs => node
3297                .timeout_ms
3298                .map(|value| json!(value))
3299                .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
3300            OptimizationMutableField::RetryPolicyMaxAttempts => node
3301                .retry_policy
3302                .as_ref()
3303                .and_then(Value::as_object)
3304                .and_then(|policy| policy.get("max_attempts"))
3305                .cloned()
3306                .ok_or_else(|| {
3307                    format!(
3308                        "node `{}` is missing retry_policy.max_attempts",
3309                        node.node_id
3310                    )
3311                }),
3312            OptimizationMutableField::RetryPolicyRetries => node
3313                .retry_policy
3314                .as_ref()
3315                .and_then(Value::as_object)
3316                .and_then(|policy| policy.get("retries"))
3317                .cloned()
3318                .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
3319        }
3320    }
3321
3322    fn set_optimization_node_field_value(
3323        node: &mut crate::AutomationFlowNode,
3324        field: OptimizationMutableField,
3325        value: &Value,
3326    ) -> Result<(), String> {
3327        match field {
3328            OptimizationMutableField::Objective => {
3329                node.objective = value
3330                    .as_str()
3331                    .ok_or_else(|| "objective apply value must be a string".to_string())?
3332                    .to_string();
3333            }
3334            OptimizationMutableField::OutputContractSummaryGuidance => {
3335                let guidance = value
3336                    .as_str()
3337                    .ok_or_else(|| {
3338                        "output_contract.summary_guidance apply value must be a string".to_string()
3339                    })?
3340                    .to_string();
3341                let contract = node.output_contract.as_mut().ok_or_else(|| {
3342                    format!(
3343                        "node `{}` is missing output_contract for apply",
3344                        node.node_id
3345                    )
3346                })?;
3347                contract.summary_guidance = Some(guidance);
3348            }
3349            OptimizationMutableField::TimeoutMs => {
3350                node.timeout_ms = Some(
3351                    value
3352                        .as_u64()
3353                        .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
3354                );
3355            }
3356            OptimizationMutableField::RetryPolicyMaxAttempts => {
3357                let next = value.as_i64().ok_or_else(|| {
3358                    "retry_policy.max_attempts apply value must be an integer".to_string()
3359                })?;
3360                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3361                let object = policy.as_object_mut().ok_or_else(|| {
3362                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
3363                })?;
3364                object.insert("max_attempts".to_string(), json!(next));
3365            }
3366            OptimizationMutableField::RetryPolicyRetries => {
3367                let next = value.as_i64().ok_or_else(|| {
3368                    "retry_policy.retries apply value must be an integer".to_string()
3369                })?;
3370                let policy = node.retry_policy.get_or_insert_with(|| json!({}));
3371                let object = policy.as_object_mut().ok_or_else(|| {
3372                    format!("node `{}` retry_policy must be a JSON object", node.node_id)
3373                })?;
3374                object.insert("retries".to_string(), json!(next));
3375            }
3376        }
3377        Ok(())
3378    }
3379
3380    fn append_optimization_apply_metadata(
3381        metadata: Option<Value>,
3382        record: Value,
3383    ) -> Result<Option<Value>, String> {
3384        let mut root = match metadata {
3385            Some(Value::Object(map)) => map,
3386            Some(_) => return Err("automation metadata must be a JSON object".to_string()),
3387            None => serde_json::Map::new(),
3388        };
3389        let history = root
3390            .entry("optimization_apply_history".to_string())
3391            .or_insert_with(|| Value::Array(Vec::new()));
3392        let Some(entries) = history.as_array_mut() else {
3393            return Err("optimization_apply_history metadata must be an array".to_string());
3394        };
3395        entries.push(record.clone());
3396        root.insert("last_optimization_apply".to_string(), record);
3397        Ok(Some(Value::Object(root)))
3398    }
3399
3400    fn build_optimization_apply_patch(
3401        baseline: &crate::AutomationV2Spec,
3402        candidate: &crate::AutomationV2Spec,
3403        mutation: &crate::OptimizationValidatedMutation,
3404        approved_at_ms: u64,
3405    ) -> Result<Value, String> {
3406        let baseline_node = baseline
3407            .flow
3408            .nodes
3409            .iter()
3410            .find(|node| node.node_id == mutation.node_id)
3411            .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
3412        let candidate_node = candidate
3413            .flow
3414            .nodes
3415            .iter()
3416            .find(|node| node.node_id == mutation.node_id)
3417            .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
3418        let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
3419        let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
3420        Ok(json!({
3421            "node_id": mutation.node_id,
3422            "field": mutation.field,
3423            "field_path": Self::optimization_mutation_field_path(mutation.field),
3424            "expected_before": before,
3425            "apply_value": after,
3426            "approved_at_ms": approved_at_ms,
3427        }))
3428    }
3429
3430    pub async fn apply_optimization_winner(
3431        &self,
3432        optimization_id: &str,
3433        experiment_id: &str,
3434    ) -> Result<
3435        (
3436            OptimizationCampaignRecord,
3437            OptimizationExperimentRecord,
3438            crate::AutomationV2Spec,
3439        ),
3440        String,
3441    > {
3442        let campaign = self
3443            .get_optimization_campaign(optimization_id)
3444            .await
3445            .ok_or_else(|| "optimization not found".to_string())?;
3446        let mut experiment = self
3447            .get_optimization_experiment(optimization_id, experiment_id)
3448            .await
3449            .ok_or_else(|| "experiment not found".to_string())?;
3450        if experiment.status != OptimizationExperimentStatus::PromotionApproved {
3451            return Err("only approved winner experiments may be applied".to_string());
3452        }
3453        if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
3454            return Err(
3455                "only the latest approved winner may be applied to the live workflow".to_string(),
3456            );
3457        }
3458        let patch = experiment
3459            .metadata
3460            .as_ref()
3461            .and_then(|metadata| metadata.get("apply_patch"))
3462            .cloned()
3463            .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
3464        let node_id = patch
3465            .get("node_id")
3466            .and_then(Value::as_str)
3467            .map(str::to_string)
3468            .filter(|value| !value.is_empty())
3469            .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
3470        let field: OptimizationMutableField = serde_json::from_value(
3471            patch
3472                .get("field")
3473                .cloned()
3474                .ok_or_else(|| "apply_patch.field is required".to_string())?,
3475        )
3476        .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
3477        let expected_before = patch
3478            .get("expected_before")
3479            .cloned()
3480            .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
3481        let apply_value = patch
3482            .get("apply_value")
3483            .cloned()
3484            .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
3485        let mut live = self
3486            .get_automation_v2(&campaign.source_workflow_id)
3487            .await
3488            .ok_or_else(|| "source workflow not found".to_string())?;
3489        let current_value = {
3490            let live_node = live
3491                .flow
3492                .nodes
3493                .iter()
3494                .find(|node| node.node_id == node_id)
3495                .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3496            Self::optimization_node_field_value(live_node, field)?
3497        };
3498        if current_value != expected_before {
3499            return Err(format!(
3500                "live workflow drift detected for node `{node_id}` {}",
3501                Self::optimization_mutation_field_path(field)
3502            ));
3503        }
3504        let live_node = live
3505            .flow
3506            .nodes
3507            .iter_mut()
3508            .find(|node| node.node_id == node_id)
3509            .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3510        Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
3511        let applied_at_ms = now_ms();
3512        let apply_record = json!({
3513            "optimization_id": campaign.optimization_id,
3514            "experiment_id": experiment.experiment_id,
3515            "node_id": node_id,
3516            "field": field,
3517            "field_path": Self::optimization_mutation_field_path(field),
3518            "previous_value": expected_before,
3519            "new_value": apply_value,
3520            "applied_at_ms": applied_at_ms,
3521        });
3522        live.metadata =
3523            Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
3524        let stored_live = self
3525            .put_automation_v2(live)
3526            .await
3527            .map_err(|error| error.to_string())?;
3528        let mut metadata = match experiment.metadata.take() {
3529            Some(Value::Object(map)) => map,
3530            Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
3531            None => serde_json::Map::new(),
3532        };
3533        metadata.insert(
3534            "applied_to_live".to_string(),
3535            json!({
3536                "automation_id": stored_live.automation_id,
3537                "applied_at_ms": applied_at_ms,
3538                "field": field,
3539                "node_id": node_id,
3540            }),
3541        );
3542        experiment.metadata = Some(Value::Object(metadata));
3543        let stored_experiment = self
3544            .put_optimization_experiment(experiment)
3545            .await
3546            .map_err(|error| error.to_string())?;
3547        Ok((campaign, stored_experiment, stored_live))
3548    }
3549
3550    fn optimization_objective_hint(text: &str) -> String {
3551        let cleaned = text
3552            .lines()
3553            .map(str::trim)
3554            .filter(|line| !line.is_empty() && !line.starts_with('#'))
3555            .collect::<Vec<_>>()
3556            .join(" ");
3557        let hint = if cleaned.is_empty() {
3558            "Prioritize validator-complete output with explicit evidence."
3559        } else {
3560            cleaned.as_str()
3561        };
3562        let trimmed = hint.trim();
3563        let clipped = if trimmed.len() > 140 {
3564            trimmed[..140].trim_end()
3565        } else {
3566            trimmed
3567        };
3568        let mut sentence = clipped.trim_end_matches('.').to_string();
3569        if sentence.is_empty() {
3570            sentence = "Prioritize validator-complete output with explicit evidence".to_string();
3571        }
3572        sentence.push('.');
3573        sentence
3574    }
3575
3576    fn build_phase1_candidate_options(
3577        baseline: &crate::AutomationV2Spec,
3578        phase1: &crate::OptimizationPhase1Config,
3579    ) -> Vec<(
3580        crate::AutomationV2Spec,
3581        crate::OptimizationValidatedMutation,
3582    )> {
3583        let mut options = Vec::new();
3584        let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
3585        for (index, node) in baseline.flow.nodes.iter().enumerate() {
3586            if phase1
3587                .mutation_policy
3588                .allowed_text_fields
3589                .contains(&OptimizationMutableField::Objective)
3590            {
3591                let addition = if node.objective.contains(&hint) {
3592                    "Prioritize validator-complete output with concrete evidence."
3593                } else {
3594                    &hint
3595                };
3596                let mut candidate = baseline.clone();
3597                candidate.flow.nodes[index].objective =
3598                    format!("{} {}", node.objective.trim(), addition.trim())
3599                        .trim()
3600                        .to_string();
3601                if let Ok(validated) =
3602                    validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3603                {
3604                    options.push((candidate, validated));
3605                }
3606            }
3607            if phase1
3608                .mutation_policy
3609                .allowed_text_fields
3610                .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
3611            {
3612                if let Some(summary_guidance) = node
3613                    .output_contract
3614                    .as_ref()
3615                    .and_then(|contract| contract.summary_guidance.as_ref())
3616                {
3617                    let addition = if summary_guidance.contains("Cite concrete evidence") {
3618                        "Keep evidence explicit."
3619                    } else {
3620                        "Cite concrete evidence in the summary."
3621                    };
3622                    let mut candidate = baseline.clone();
3623                    if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
3624                        contract.summary_guidance = Some(
3625                            format!("{} {}", summary_guidance.trim(), addition)
3626                                .trim()
3627                                .to_string(),
3628                        );
3629                    }
3630                    if let Ok(validated) =
3631                        validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3632                    {
3633                        options.push((candidate, validated));
3634                    }
3635                }
3636            }
3637            if phase1
3638                .mutation_policy
3639                .allowed_knob_fields
3640                .contains(&OptimizationMutableField::TimeoutMs)
3641            {
3642                if let Some(timeout_ms) = node.timeout_ms {
3643                    let delta_by_percent = ((timeout_ms as f64)
3644                        * phase1.mutation_policy.timeout_delta_percent)
3645                        .round() as u64;
3646                    let delta = delta_by_percent
3647                        .min(phase1.mutation_policy.timeout_delta_ms)
3648                        .max(1);
3649                    let next = timeout_ms
3650                        .saturating_add(delta)
3651                        .min(phase1.mutation_policy.timeout_max_ms);
3652                    if next != timeout_ms {
3653                        let mut candidate = baseline.clone();
3654                        candidate.flow.nodes[index].timeout_ms = Some(next);
3655                        if let Ok(validated) =
3656                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3657                        {
3658                            options.push((candidate, validated));
3659                        }
3660                    }
3661                }
3662            }
3663            if phase1
3664                .mutation_policy
3665                .allowed_knob_fields
3666                .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
3667            {
3668                let current = node
3669                    .retry_policy
3670                    .as_ref()
3671                    .and_then(Value::as_object)
3672                    .and_then(|row| row.get("max_attempts"))
3673                    .and_then(Value::as_i64);
3674                if let Some(before) = current {
3675                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3676                    if next != before {
3677                        let mut candidate = baseline.clone();
3678                        let policy = candidate.flow.nodes[index]
3679                            .retry_policy
3680                            .get_or_insert_with(|| json!({}));
3681                        if let Some(object) = policy.as_object_mut() {
3682                            object.insert("max_attempts".to_string(), json!(next));
3683                        }
3684                        if let Ok(validated) =
3685                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3686                        {
3687                            options.push((candidate, validated));
3688                        }
3689                    }
3690                }
3691            }
3692            if phase1
3693                .mutation_policy
3694                .allowed_knob_fields
3695                .contains(&OptimizationMutableField::RetryPolicyRetries)
3696            {
3697                let current = node
3698                    .retry_policy
3699                    .as_ref()
3700                    .and_then(Value::as_object)
3701                    .and_then(|row| row.get("retries"))
3702                    .and_then(Value::as_i64);
3703                if let Some(before) = current {
3704                    let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3705                    if next != before {
3706                        let mut candidate = baseline.clone();
3707                        let policy = candidate.flow.nodes[index]
3708                            .retry_policy
3709                            .get_or_insert_with(|| json!({}));
3710                        if let Some(object) = policy.as_object_mut() {
3711                            object.insert("retries".to_string(), json!(next));
3712                        }
3713                        if let Ok(validated) =
3714                            validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3715                        {
3716                            options.push((candidate, validated));
3717                        }
3718                    }
3719                }
3720            }
3721        }
3722        options
3723    }
3724
3725    async fn maybe_queue_phase1_candidate_experiment(
3726        &self,
3727        campaign: &mut OptimizationCampaignRecord,
3728    ) -> Result<bool, String> {
3729        let Some(phase1) = campaign.phase1.as_ref() else {
3730            return Ok(false);
3731        };
3732        let experiment_count = self
3733            .count_optimization_experiments(&campaign.optimization_id)
3734            .await;
3735        if experiment_count >= phase1.budget.max_experiments as usize {
3736            campaign.status = OptimizationCampaignStatus::Completed;
3737            campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
3738            campaign.updated_at_ms = now_ms();
3739            return Ok(true);
3740        }
3741        if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
3742        {
3743            return Ok(false);
3744        }
3745        let existing = self
3746            .list_optimization_experiments(&campaign.optimization_id)
3747            .await;
3748        let active_eval_exists = existing.iter().any(|experiment| {
3749            matches!(experiment.status, OptimizationExperimentStatus::Draft)
3750                && experiment
3751                    .metadata
3752                    .as_ref()
3753                    .and_then(|metadata| metadata.get("eval_run_id"))
3754                    .and_then(Value::as_str)
3755                    .is_some()
3756        });
3757        if active_eval_exists {
3758            return Ok(false);
3759        }
3760        let existing_hashes = existing
3761            .iter()
3762            .map(|experiment| experiment.candidate_snapshot_hash.clone())
3763            .collect::<std::collections::HashSet<_>>();
3764        let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
3765        let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
3766            !existing_hashes.contains(&optimization_snapshot_hash(candidate))
3767        }) else {
3768            campaign.status = OptimizationCampaignStatus::Completed;
3769            campaign.last_pause_reason = Some(
3770                "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
3771            );
3772            campaign.updated_at_ms = now_ms();
3773            return Ok(true);
3774        };
3775        let eval_run = self
3776            .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
3777            .await
3778            .map_err(|error| error.to_string())?;
3779        let now = now_ms();
3780        let experiment = OptimizationExperimentRecord {
3781            experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
3782            optimization_id: campaign.optimization_id.clone(),
3783            status: OptimizationExperimentStatus::Draft,
3784            candidate_snapshot: candidate_snapshot.clone(),
3785            candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
3786            baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
3787            mutation_summary: Some(mutation.summary.clone()),
3788            metrics: None,
3789            phase1_metrics: None,
3790            promotion_recommendation: None,
3791            promotion_decision: None,
3792            created_at_ms: now,
3793            updated_at_ms: now,
3794            metadata: Some(json!({
3795                "generator": "phase1_deterministic_v1",
3796                "eval_run_id": eval_run.run_id,
3797                "mutation": mutation,
3798            })),
3799        };
3800        self.put_optimization_experiment(experiment)
3801            .await
3802            .map_err(|error| error.to_string())?;
3803        campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
3804        campaign.updated_at_ms = now_ms();
3805        Ok(true)
3806    }
3807
3808    async fn reconcile_phase1_candidate_experiments(
3809        &self,
3810        campaign: &mut OptimizationCampaignRecord,
3811    ) -> Result<bool, String> {
3812        let Some(phase1) = campaign.phase1.as_ref() else {
3813            return Ok(false);
3814        };
3815        let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
3816            return Ok(false);
3817        };
3818        let experiments = self
3819            .list_optimization_experiments(&campaign.optimization_id)
3820            .await;
3821        let mut changed = false;
3822        for mut experiment in experiments {
3823            if experiment.status != OptimizationExperimentStatus::Draft {
3824                continue;
3825            }
3826            let Some(eval_run_id) = experiment
3827                .metadata
3828                .as_ref()
3829                .and_then(|metadata| metadata.get("eval_run_id"))
3830                .and_then(Value::as_str)
3831                .map(str::to_string)
3832            else {
3833                continue;
3834            };
3835            let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
3836                continue;
3837            };
3838            if !Self::automation_run_is_terminal(&run.status) {
3839                continue;
3840            }
3841            if run.status != crate::AutomationRunStatus::Completed {
3842                experiment.status = OptimizationExperimentStatus::Failed;
3843                let mut metadata = match experiment.metadata.take() {
3844                    Some(Value::Object(map)) => map,
3845                    Some(_) => serde_json::Map::new(),
3846                    None => serde_json::Map::new(),
3847                };
3848                metadata.insert(
3849                    "eval_failure".to_string(),
3850                    json!({
3851                        "run_id": run.run_id,
3852                        "status": run.status,
3853                    }),
3854                );
3855                experiment.metadata = Some(Value::Object(metadata));
3856                self.put_optimization_experiment(experiment)
3857                    .await
3858                    .map_err(|error| error.to_string())?;
3859                changed = true;
3860                continue;
3861            }
3862            if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3863                experiment.status = OptimizationExperimentStatus::Failed;
3864                let mut metadata = match experiment.metadata.take() {
3865                    Some(Value::Object(map)) => map,
3866                    Some(_) => serde_json::Map::new(),
3867                    None => serde_json::Map::new(),
3868                };
3869                metadata.insert(
3870                    "eval_failure".to_string(),
3871                    json!({
3872                        "run_id": run.run_id,
3873                        "status": run.status,
3874                        "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
3875                    }),
3876                );
3877                experiment.metadata = Some(Value::Object(metadata));
3878                self.put_optimization_experiment(experiment)
3879                    .await
3880                    .map_err(|error| error.to_string())?;
3881                changed = true;
3882                continue;
3883            }
3884            let metrics =
3885                match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
3886                    Ok(metrics) => metrics,
3887                    Err(error) => {
3888                        experiment.status = OptimizationExperimentStatus::Failed;
3889                        let mut metadata = match experiment.metadata.take() {
3890                            Some(Value::Object(map)) => map,
3891                            Some(_) => serde_json::Map::new(),
3892                            None => serde_json::Map::new(),
3893                        };
3894                        metadata.insert(
3895                            "eval_failure".to_string(),
3896                            json!({
3897                                "run_id": run.run_id,
3898                                "status": run.status,
3899                                "reason": error,
3900                            }),
3901                        );
3902                        experiment.metadata = Some(Value::Object(metadata));
3903                        self.put_optimization_experiment(experiment)
3904                            .await
3905                            .map_err(|error| error.to_string())?;
3906                        changed = true;
3907                        continue;
3908                    }
3909                };
3910            let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
3911            experiment.phase1_metrics = Some(metrics.clone());
3912            experiment.metrics = Some(json!({
3913                "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
3914                "unmet_requirement_count": metrics.unmet_requirement_count,
3915                "blocked_node_rate": metrics.blocked_node_rate,
3916                "budget_within_limits": metrics.budget_within_limits,
3917            }));
3918            experiment.promotion_recommendation = Some(
3919                match decision.decision {
3920                    OptimizationPromotionDecisionKind::Promote => "promote",
3921                    OptimizationPromotionDecisionKind::Discard => "discard",
3922                    OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3923                        "needs_operator_review"
3924                    }
3925                }
3926                .to_string(),
3927            );
3928            experiment.promotion_decision = Some(decision.clone());
3929            match decision.decision {
3930                OptimizationPromotionDecisionKind::Promote
3931                | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3932                    experiment.status = OptimizationExperimentStatus::PromotionRecommended;
3933                    campaign.pending_promotion_experiment_id =
3934                        Some(experiment.experiment_id.clone());
3935                    campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
3936                    campaign.last_pause_reason = Some(decision.reason.clone());
3937                }
3938                OptimizationPromotionDecisionKind::Discard => {
3939                    experiment.status = OptimizationExperimentStatus::Discarded;
3940                    if campaign.status == OptimizationCampaignStatus::Running {
3941                        campaign.last_pause_reason = Some(decision.reason.clone());
3942                    }
3943                }
3944            }
3945            self.put_optimization_experiment(experiment)
3946                .await
3947                .map_err(|error| error.to_string())?;
3948            changed = true;
3949        }
3950        let refreshed = self
3951            .list_optimization_experiments(&campaign.optimization_id)
3952            .await;
3953        let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
3954        if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
3955            && phase1.budget.max_consecutive_failures > 0
3956        {
3957            campaign.status = OptimizationCampaignStatus::Failed;
3958            campaign.last_pause_reason = Some(format!(
3959                "phase 1 candidate evaluations reached {} consecutive failures",
3960                consecutive_failures
3961            ));
3962            changed = true;
3963        }
3964        Ok(changed)
3965    }
3966
3967    async fn reconcile_pending_baseline_replays(
3968        &self,
3969        campaign: &mut OptimizationCampaignRecord,
3970    ) -> Result<bool, String> {
3971        let Some(phase1) = campaign.phase1.as_ref() else {
3972            return Ok(false);
3973        };
3974        let mut changed = false;
3975        let mut remaining = Vec::new();
3976        for run_id in campaign.pending_baseline_run_ids.clone() {
3977            let Some(run) = self.get_automation_v2_run(&run_id).await else {
3978                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3979                campaign.last_pause_reason = Some(format!(
3980                    "baseline replay run `{run_id}` was not found during optimization reconciliation"
3981                ));
3982                changed = true;
3983                continue;
3984            };
3985            if !Self::automation_run_is_terminal(&run.status) {
3986                remaining.push(run_id);
3987                continue;
3988            }
3989            if run.status != crate::AutomationRunStatus::Completed {
3990                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3991                campaign.last_pause_reason = Some(format!(
3992                    "baseline replay run `{}` finished with status `{:?}`",
3993                    run.run_id, run.status
3994                ));
3995                changed = true;
3996                continue;
3997            }
3998            if run.automation_id != campaign.source_workflow_id {
3999                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4000                campaign.last_pause_reason = Some(
4001                    "baseline replay run must belong to the optimization source workflow"
4002                        .to_string(),
4003                );
4004                changed = true;
4005                continue;
4006            }
4007            let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
4008                "baseline replay run must include an automation snapshot".to_string()
4009            })?;
4010            if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
4011                campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4012                campaign.last_pause_reason = Some(
4013                    "baseline replay run does not match the current campaign baseline snapshot"
4014                        .to_string(),
4015                );
4016                changed = true;
4017                continue;
4018            }
4019            let metrics =
4020                derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4021            let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4022            campaign
4023                .baseline_replays
4024                .push(OptimizationBaselineReplayRecord {
4025                    replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4026                    automation_run_id: Some(run.run_id.clone()),
4027                    phase1_metrics: metrics,
4028                    validator_case_outcomes,
4029                    experiment_count_at_recording: self
4030                        .count_optimization_experiments(&campaign.optimization_id)
4031                        .await as u64,
4032                    recorded_at_ms: now_ms(),
4033                });
4034            changed = true;
4035        }
4036        if remaining != campaign.pending_baseline_run_ids {
4037            campaign.pending_baseline_run_ids = remaining;
4038            changed = true;
4039        }
4040        Ok(changed)
4041    }
4042
4043    pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
4044        let campaigns = self.list_optimization_campaigns().await;
4045        let mut updated = 0usize;
4046        for campaign in campaigns {
4047            let Some(mut latest) = self
4048                .get_optimization_campaign(&campaign.optimization_id)
4049                .await
4050            else {
4051                continue;
4052            };
4053            let Some(phase1) = latest.phase1.clone() else {
4054                continue;
4055            };
4056            let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
4057            changed |= self
4058                .reconcile_phase1_candidate_experiments(&mut latest)
4059                .await?;
4060            let experiment_count = self
4061                .count_optimization_experiments(&latest.optimization_id)
4062                .await;
4063            if latest.pending_baseline_run_ids.is_empty() {
4064                if phase1_baseline_replay_due(
4065                    &latest.baseline_replays,
4066                    latest.pending_baseline_run_ids.len(),
4067                    &phase1,
4068                    experiment_count,
4069                    now_ms(),
4070                ) {
4071                    if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
4072                        latest.status = OptimizationCampaignStatus::Draft;
4073                        changed = true;
4074                    }
4075                } else if latest.baseline_replays.len()
4076                    >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
4077                {
4078                    match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
4079                        Ok(metrics) => {
4080                            if latest.baseline_metrics.as_ref() != Some(&metrics) {
4081                                latest.baseline_metrics = Some(metrics);
4082                                changed = true;
4083                            }
4084                            if matches!(
4085                                latest.status,
4086                                OptimizationCampaignStatus::Draft
4087                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
4088                            ) || (latest.status == OptimizationCampaignStatus::Running
4089                                && latest.last_pause_reason.is_some())
4090                            {
4091                                latest.status = OptimizationCampaignStatus::Running;
4092                                latest.last_pause_reason = None;
4093                                changed = true;
4094                            }
4095                        }
4096                        Err(error) => {
4097                            if matches!(
4098                                latest.status,
4099                                OptimizationCampaignStatus::Draft
4100                                    | OptimizationCampaignStatus::Running
4101                                    | OptimizationCampaignStatus::PausedEvaluatorUnstable
4102                            ) && (latest.status
4103                                != OptimizationCampaignStatus::PausedEvaluatorUnstable
4104                                || latest.last_pause_reason.as_deref() != Some(error.as_str()))
4105                            {
4106                                latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
4107                                latest.last_pause_reason = Some(error);
4108                                changed = true;
4109                            }
4110                        }
4111                    }
4112                }
4113            } else if latest.last_pause_reason.as_deref()
4114                != Some("waiting for phase 1 baseline replay completion")
4115            {
4116                latest.last_pause_reason =
4117                    Some("waiting for phase 1 baseline replay completion".to_string());
4118                changed = true;
4119            }
4120            if latest.status == OptimizationCampaignStatus::Running
4121                && latest.pending_baseline_run_ids.is_empty()
4122            {
4123                changed |= self
4124                    .maybe_queue_phase1_candidate_experiment(&mut latest)
4125                    .await?;
4126            }
4127            if changed {
4128                self.put_optimization_campaign(latest)
4129                    .await
4130                    .map_err(|error| error.to_string())?;
4131                updated = updated.saturating_add(1);
4132            }
4133        }
4134        Ok(updated)
4135    }
4136
4137    async fn maybe_queue_phase1_baseline_replay(
4138        &self,
4139        campaign: &mut OptimizationCampaignRecord,
4140    ) -> Result<bool, String> {
4141        let Some(phase1) = campaign.phase1.as_ref() else {
4142            return Ok(false);
4143        };
4144        if !campaign.pending_baseline_run_ids.is_empty() {
4145            campaign.last_pause_reason =
4146                Some("waiting for phase 1 baseline replay completion".into());
4147            campaign.updated_at_ms = now_ms();
4148            return Ok(true);
4149        }
4150        let experiment_count = self
4151            .count_optimization_experiments(&campaign.optimization_id)
4152            .await;
4153        if !phase1_baseline_replay_due(
4154            &campaign.baseline_replays,
4155            campaign.pending_baseline_run_ids.len(),
4156            phase1,
4157            experiment_count,
4158            now_ms(),
4159        ) {
4160            return Ok(false);
4161        }
4162        let replay_run = self
4163            .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
4164            .await
4165            .map_err(|error| error.to_string())?;
4166        if !campaign
4167            .pending_baseline_run_ids
4168            .iter()
4169            .any(|value| value == &replay_run.run_id)
4170        {
4171            campaign
4172                .pending_baseline_run_ids
4173                .push(replay_run.run_id.clone());
4174        }
4175        campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
4176        campaign.updated_at_ms = now_ms();
4177        Ok(true)
4178    }
4179
4180    async fn maybe_queue_initial_phase1_baseline_replay(
4181        &self,
4182        campaign: &mut OptimizationCampaignRecord,
4183    ) -> Result<bool, String> {
4184        let Some(phase1) = campaign.phase1.as_ref() else {
4185            return Ok(false);
4186        };
4187        let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
4188        if campaign.baseline_replays.len() >= required_runs {
4189            return Ok(false);
4190        }
4191        self.maybe_queue_phase1_baseline_replay(campaign).await
4192    }
4193
4194    pub async fn apply_optimization_action(
4195        &self,
4196        optimization_id: &str,
4197        action: &str,
4198        experiment_id: Option<&str>,
4199        run_id: Option<&str>,
4200        reason: Option<&str>,
4201    ) -> Result<OptimizationCampaignRecord, String> {
4202        let normalized = action.trim().to_ascii_lowercase();
4203        let mut campaign = self
4204            .get_optimization_campaign(optimization_id)
4205            .await
4206            .ok_or_else(|| "optimization not found".to_string())?;
4207        match normalized.as_str() {
4208            "start" => {
4209                if campaign.phase1.is_some() {
4210                    if self
4211                        .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
4212                        .await?
4213                    {
4214                        campaign.status = OptimizationCampaignStatus::Draft;
4215                    } else {
4216                        let phase1 = campaign
4217                            .phase1
4218                            .as_ref()
4219                            .ok_or_else(|| "phase 1 config is required".to_string())?;
4220                        match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
4221                            Ok(metrics) => {
4222                                campaign.baseline_metrics = Some(metrics);
4223                                campaign.status = OptimizationCampaignStatus::Running;
4224                                campaign.last_pause_reason = None;
4225                            }
4226                            Err(error) => {
4227                                campaign.status =
4228                                    OptimizationCampaignStatus::PausedEvaluatorUnstable;
4229                                campaign.last_pause_reason = Some(error);
4230                            }
4231                        }
4232                    }
4233                } else {
4234                    campaign.status = OptimizationCampaignStatus::Running;
4235                    campaign.last_pause_reason = None;
4236                }
4237            }
4238            "pause" => {
4239                campaign.status = OptimizationCampaignStatus::PausedManual;
4240                campaign.last_pause_reason = reason
4241                    .map(str::trim)
4242                    .filter(|value| !value.is_empty())
4243                    .map(str::to_string);
4244            }
4245            "resume" => {
4246                if self
4247                    .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
4248                    .await?
4249                {
4250                    campaign.status = OptimizationCampaignStatus::Draft;
4251                } else {
4252                    campaign.status = OptimizationCampaignStatus::Running;
4253                    campaign.last_pause_reason = None;
4254                }
4255            }
4256            "queue_baseline_replay" => {
4257                let replay_run = self
4258                    .create_automation_v2_run(
4259                        &campaign.baseline_snapshot,
4260                        "optimization_baseline_replay",
4261                    )
4262                    .await
4263                    .map_err(|error| error.to_string())?;
4264                if !campaign
4265                    .pending_baseline_run_ids
4266                    .iter()
4267                    .any(|value| value == &replay_run.run_id)
4268                {
4269                    campaign
4270                        .pending_baseline_run_ids
4271                        .push(replay_run.run_id.clone());
4272                }
4273                campaign.updated_at_ms = now_ms();
4274            }
4275            "record_baseline_replay" => {
4276                let run_id = run_id
4277                    .map(str::trim)
4278                    .filter(|value| !value.is_empty())
4279                    .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
4280                let phase1 = campaign
4281                    .phase1
4282                    .as_ref()
4283                    .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
4284                let run = self
4285                    .get_automation_v2_run(run_id)
4286                    .await
4287                    .ok_or_else(|| "automation run not found".to_string())?;
4288                if run.automation_id != campaign.source_workflow_id {
4289                    return Err(
4290                        "baseline replay run must belong to the optimization source workflow"
4291                            .to_string(),
4292                    );
4293                }
4294                let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
4295                    "baseline replay run must include an automation snapshot".to_string()
4296                })?;
4297                if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
4298                    return Err(
4299                        "baseline replay run does not match the current campaign baseline snapshot"
4300                            .to_string(),
4301                    );
4302                }
4303                let metrics =
4304                    derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
4305                let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
4306                campaign
4307                    .baseline_replays
4308                    .push(OptimizationBaselineReplayRecord {
4309                        replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
4310                        automation_run_id: Some(run.run_id.clone()),
4311                        phase1_metrics: metrics,
4312                        validator_case_outcomes,
4313                        experiment_count_at_recording: self
4314                            .count_optimization_experiments(&campaign.optimization_id)
4315                            .await as u64,
4316                        recorded_at_ms: now_ms(),
4317                    });
4318                campaign
4319                    .pending_baseline_run_ids
4320                    .retain(|value| value != run_id);
4321                campaign.updated_at_ms = now_ms();
4322            }
4323            "approve_winner" => {
4324                let experiment_id = experiment_id
4325                    .map(str::trim)
4326                    .filter(|value| !value.is_empty())
4327                    .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
4328                let mut experiment = self
4329                    .get_optimization_experiment(optimization_id, experiment_id)
4330                    .await
4331                    .ok_or_else(|| "experiment not found".to_string())?;
4332                if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
4333                    return Err(
4334                        "experiment baseline_snapshot_hash does not match current campaign baseline"
4335                            .to_string(),
4336                    );
4337                }
4338                if let Some(phase1) = campaign.phase1.as_ref() {
4339                    let validated = validate_phase1_candidate_mutation(
4340                        &campaign.baseline_snapshot,
4341                        &experiment.candidate_snapshot,
4342                        phase1,
4343                    )?;
4344                    if experiment.mutation_summary.is_none() {
4345                        experiment.mutation_summary = Some(validated.summary.clone());
4346                    }
4347                    let approved_at_ms = now_ms();
4348                    let apply_patch = Self::build_optimization_apply_patch(
4349                        &campaign.baseline_snapshot,
4350                        &experiment.candidate_snapshot,
4351                        &validated,
4352                        approved_at_ms,
4353                    )?;
4354                    let mut metadata = match experiment.metadata.take() {
4355                        Some(Value::Object(map)) => map,
4356                        Some(_) => {
4357                            return Err("experiment metadata must be a JSON object".to_string());
4358                        }
4359                        None => serde_json::Map::new(),
4360                    };
4361                    metadata.insert("apply_patch".to_string(), apply_patch);
4362                    experiment.metadata = Some(Value::Object(metadata));
4363                    if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
4364                        let candidate_metrics = experiment
4365                            .phase1_metrics
4366                            .clone()
4367                            .or_else(|| {
4368                                experiment
4369                                    .metrics
4370                                    .as_ref()
4371                                    .and_then(|metrics| parse_phase1_metrics(metrics).ok())
4372                            })
4373                            .ok_or_else(|| {
4374                                "phase 1 candidate is missing promotion metrics".to_string()
4375                            })?;
4376                        let decision =
4377                            evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
4378                        experiment.promotion_recommendation = Some(
4379                            match decision.decision {
4380                                OptimizationPromotionDecisionKind::Promote => "promote",
4381                                OptimizationPromotionDecisionKind::Discard => "discard",
4382                                OptimizationPromotionDecisionKind::NeedsOperatorReview => {
4383                                    "needs_operator_review"
4384                                }
4385                            }
4386                            .to_string(),
4387                        );
4388                        experiment.promotion_decision = Some(decision.clone());
4389                        if decision.decision != OptimizationPromotionDecisionKind::Promote {
4390                            let _ = self
4391                                .put_optimization_experiment(experiment)
4392                                .await
4393                                .map_err(|e| e.to_string())?;
4394                            return Err(decision.reason);
4395                        }
4396                        campaign.baseline_metrics = Some(candidate_metrics);
4397                    }
4398                }
4399                campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
4400                campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
4401                campaign.baseline_replays.clear();
4402                campaign.pending_baseline_run_ids.clear();
4403                campaign.pending_promotion_experiment_id = None;
4404                campaign.status = OptimizationCampaignStatus::Draft;
4405                campaign.last_pause_reason = None;
4406                experiment.status = OptimizationExperimentStatus::PromotionApproved;
4407                let _ = self
4408                    .put_optimization_experiment(experiment)
4409                    .await
4410                    .map_err(|e| e.to_string())?;
4411            }
4412            "reject_winner" => {
4413                let experiment_id = experiment_id
4414                    .map(str::trim)
4415                    .filter(|value| !value.is_empty())
4416                    .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
4417                let mut experiment = self
4418                    .get_optimization_experiment(optimization_id, experiment_id)
4419                    .await
4420                    .ok_or_else(|| "experiment not found".to_string())?;
4421                campaign.pending_promotion_experiment_id = None;
4422                campaign.status = OptimizationCampaignStatus::Draft;
4423                campaign.last_pause_reason = reason
4424                    .map(str::trim)
4425                    .filter(|value| !value.is_empty())
4426                    .map(str::to_string);
4427                experiment.status = OptimizationExperimentStatus::PromotionRejected;
4428                let _ = self
4429                    .put_optimization_experiment(experiment)
4430                    .await
4431                    .map_err(|e| e.to_string())?;
4432            }
4433            _ => return Err("unsupported optimization action".to_string()),
4434        }
4435        self.put_optimization_campaign(campaign)
4436            .await
4437            .map_err(|e| e.to_string())
4438    }
4439
4440    pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
4441        let mut rows = self
4442            .automations_v2
4443            .read()
4444            .await
4445            .values()
4446            .cloned()
4447            .collect::<Vec<_>>();
4448        rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
4449        rows
4450    }
4451
4452    pub async fn delete_automation_v2(
4453        &self,
4454        automation_id: &str,
4455    ) -> anyhow::Result<Option<AutomationV2Spec>> {
4456        let removed = self.automations_v2.write().await.remove(automation_id);
4457        let removed_run_count = {
4458            let mut runs = self.automation_v2_runs.write().await;
4459            let before = runs.len();
4460            runs.retain(|_, run| run.automation_id != automation_id);
4461            before.saturating_sub(runs.len())
4462        };
4463        self.persist_automations_v2().await?;
4464        if removed_run_count > 0 {
4465            self.persist_automation_v2_runs().await?;
4466        }
4467        self.verify_automation_v2_persisted(automation_id, false)
4468            .await?;
4469        Ok(removed)
4470    }
4471
4472    pub async fn create_automation_v2_run(
4473        &self,
4474        automation: &AutomationV2Spec,
4475        trigger_type: &str,
4476    ) -> anyhow::Result<AutomationV2RunRecord> {
4477        let now = now_ms();
4478        let runtime_context = self
4479            .automation_v2_effective_runtime_context(
4480                automation,
4481                automation
4482                    .runtime_context_materialization()
4483                    .or_else(|| automation.approved_plan_runtime_context_materialization()),
4484            )
4485            .await?;
4486        let pending_nodes = automation
4487            .flow
4488            .nodes
4489            .iter()
4490            .map(|n| n.node_id.clone())
4491            .collect::<Vec<_>>();
4492        let run = AutomationV2RunRecord {
4493            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4494            automation_id: automation.automation_id.clone(),
4495            tenant_context: TenantContext::local_implicit(),
4496            trigger_type: trigger_type.to_string(),
4497            status: AutomationRunStatus::Queued,
4498            created_at_ms: now,
4499            updated_at_ms: now,
4500            started_at_ms: None,
4501            finished_at_ms: None,
4502            active_session_ids: Vec::new(),
4503            latest_session_id: None,
4504            active_instance_ids: Vec::new(),
4505            checkpoint: AutomationRunCheckpoint {
4506                completed_nodes: Vec::new(),
4507                pending_nodes,
4508                node_outputs: std::collections::HashMap::new(),
4509                node_attempts: std::collections::HashMap::new(),
4510                blocked_nodes: Vec::new(),
4511                awaiting_gate: None,
4512                gate_history: Vec::new(),
4513                lifecycle_history: Vec::new(),
4514                last_failure: None,
4515            },
4516            runtime_context,
4517            automation_snapshot: Some(automation.clone()),
4518            pause_reason: None,
4519            resume_reason: None,
4520            detail: None,
4521            stop_kind: None,
4522            stop_reason: None,
4523            prompt_tokens: 0,
4524            completion_tokens: 0,
4525            total_tokens: 0,
4526            estimated_cost_usd: 0.0,
4527            scheduler: None,
4528            trigger_reason: None,
4529            consumed_handoff_id: None,
4530        };
4531        self.automation_v2_runs
4532            .write()
4533            .await
4534            .insert(run.run_id.clone(), run.clone());
4535        self.persist_automation_v2_runs().await?;
4536        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4537            .await
4538            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4539        Ok(run)
4540    }
4541
4542    pub async fn create_automation_v2_dry_run(
4543        &self,
4544        automation: &AutomationV2Spec,
4545        trigger_type: &str,
4546    ) -> anyhow::Result<AutomationV2RunRecord> {
4547        let now = now_ms();
4548        let runtime_context = self
4549            .automation_v2_effective_runtime_context(
4550                automation,
4551                automation
4552                    .runtime_context_materialization()
4553                    .or_else(|| automation.approved_plan_runtime_context_materialization()),
4554            )
4555            .await?;
4556        let run = AutomationV2RunRecord {
4557            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4558            automation_id: automation.automation_id.clone(),
4559            tenant_context: TenantContext::local_implicit(),
4560            trigger_type: format!("{trigger_type}_dry_run"),
4561            status: AutomationRunStatus::Completed,
4562            created_at_ms: now,
4563            updated_at_ms: now,
4564            started_at_ms: Some(now),
4565            finished_at_ms: Some(now),
4566            active_session_ids: Vec::new(),
4567            latest_session_id: None,
4568            active_instance_ids: Vec::new(),
4569            checkpoint: AutomationRunCheckpoint {
4570                completed_nodes: Vec::new(),
4571                pending_nodes: Vec::new(),
4572                node_outputs: std::collections::HashMap::new(),
4573                node_attempts: std::collections::HashMap::new(),
4574                blocked_nodes: Vec::new(),
4575                awaiting_gate: None,
4576                gate_history: Vec::new(),
4577                lifecycle_history: Vec::new(),
4578                last_failure: None,
4579            },
4580            runtime_context,
4581            automation_snapshot: Some(automation.clone()),
4582            pause_reason: None,
4583            resume_reason: None,
4584            detail: Some("dry_run".to_string()),
4585            stop_kind: None,
4586            stop_reason: None,
4587            prompt_tokens: 0,
4588            completion_tokens: 0,
4589            total_tokens: 0,
4590            estimated_cost_usd: 0.0,
4591            scheduler: None,
4592            trigger_reason: None,
4593            consumed_handoff_id: None,
4594        };
4595        self.automation_v2_runs
4596            .write()
4597            .await
4598            .insert(run.run_id.clone(), run.clone());
4599        self.persist_automation_v2_runs().await?;
4600        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4601            .await
4602            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4603        Ok(run)
4604    }
4605
4606    pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
4607        self.automation_v2_runs.read().await.get(run_id).cloned()
4608    }
4609
4610    pub async fn list_automation_v2_runs(
4611        &self,
4612        automation_id: Option<&str>,
4613        limit: usize,
4614    ) -> Vec<AutomationV2RunRecord> {
4615        let mut rows = self
4616            .automation_v2_runs
4617            .read()
4618            .await
4619            .values()
4620            .filter(|row| {
4621                if let Some(id) = automation_id {
4622                    row.automation_id == id
4623                } else {
4624                    true
4625                }
4626            })
4627            .cloned()
4628            .collect::<Vec<_>>();
4629        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
4630        rows.truncate(limit.clamp(1, 500));
4631        rows
4632    }
4633
4634    async fn automation_v2_run_workspace_root(
4635        &self,
4636        run: &AutomationV2RunRecord,
4637    ) -> Option<String> {
4638        if let Some(root) = run
4639            .automation_snapshot
4640            .as_ref()
4641            .and_then(|automation| automation.workspace_root.as_ref())
4642            .map(|value| value.trim())
4643            .filter(|value| !value.is_empty())
4644        {
4645            return Some(root.to_string());
4646        }
4647        self.get_automation_v2(&run.automation_id)
4648            .await
4649            .and_then(|automation| automation.workspace_root)
4650            .map(|value| value.trim().to_string())
4651            .filter(|value| !value.is_empty())
4652    }
4653
4654    async fn sync_automation_scheduler_for_run_transition(
4655        &self,
4656        previous_status: AutomationRunStatus,
4657        run: &AutomationV2RunRecord,
4658    ) {
4659        let had_capacity = automation_status_uses_scheduler_capacity(&previous_status);
4660        let has_capacity = automation_status_uses_scheduler_capacity(&run.status);
4661        let had_lock = automation_status_holds_workspace_lock(&previous_status);
4662        let has_lock = automation_status_holds_workspace_lock(&run.status);
4663        let workspace_root = self.automation_v2_run_workspace_root(run).await;
4664        let mut scheduler = self.automation_scheduler.write().await;
4665
4666        if (had_capacity || had_lock) && !has_capacity && !has_lock {
4667            scheduler.release_run(&run.run_id);
4668            return;
4669        }
4670        if had_capacity && !has_capacity {
4671            scheduler.release_capacity(&run.run_id);
4672        }
4673        if had_lock && !has_lock {
4674            scheduler.release_workspace(&run.run_id);
4675        }
4676        if !had_lock && has_lock {
4677            if has_capacity {
4678                scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4679            } else {
4680                scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4681            }
4682            return;
4683        }
4684        if !had_capacity && has_capacity {
4685            scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4686        }
4687    }
4688
4689    async fn automation_run_last_activity_at_ms(&self, run: &AutomationV2RunRecord) -> u64 {
4690        let mut last_activity_at_ms = automation::lifecycle::automation_last_activity_at_ms(run);
4691        for session_id in &run.active_session_ids {
4692            if let Some(session) = self.storage.get_session(session_id).await {
4693                last_activity_at_ms = last_activity_at_ms.max(
4694                    session
4695                        .time
4696                        .updated
4697                        .timestamp_millis()
4698                        .max(0)
4699                        .try_into()
4700                        .unwrap_or_default(),
4701                );
4702            }
4703        }
4704        last_activity_at_ms
4705    }
4706
4707    pub async fn reap_stale_running_automation_runs(&self, stale_after_ms: u64) -> usize {
4708        let now = now_ms();
4709        let candidate_runs = self
4710            .automation_v2_runs
4711            .read()
4712            .await
4713            .values()
4714            .filter(|run| run.status == AutomationRunStatus::Running)
4715            .cloned()
4716            .collect::<Vec<_>>();
4717        let mut runs = Vec::new();
4718        for run in candidate_runs {
4719            let last_activity_at_ms = self.automation_run_last_activity_at_ms(&run).await;
4720            if now.saturating_sub(last_activity_at_ms) >= stale_after_ms {
4721                runs.push(run);
4722            }
4723        }
4724        let mut reaped = 0usize;
4725        for run in runs {
4726            let run_id = run.run_id.clone();
4727            let session_ids = run.active_session_ids.clone();
4728            let instance_ids = run.active_instance_ids.clone();
4729            let stale_node_ids = automation::lifecycle::automation_in_progress_node_ids(&run);
4730            let detail = format!(
4731                "automation run paused after no provider activity for at least {}s",
4732                stale_after_ms / 1000
4733            );
4734            for session_id in &session_ids {
4735                let _ = self.cancellations.cancel(session_id).await;
4736            }
4737            for instance_id in instance_ids {
4738                let _ = self
4739                    .agent_teams
4740                    .cancel_instance(self, &instance_id, "paused by stale-run reaper")
4741                    .await;
4742            }
4743            self.forget_automation_v2_sessions(&session_ids).await;
4744            if self
4745                .update_automation_v2_run(&run_id, |row| {
4746                    let stale_node_detail = format!(
4747                        "node execution stalled after no provider activity for at least {}s",
4748                        stale_after_ms / 1000
4749                    );
4750                    let automation_snapshot = row.automation_snapshot.clone();
4751                    let mut annotated_nodes = Vec::new();
4752                    if let Some(automation) = automation_snapshot.as_ref() {
4753                        for node_id in &stale_node_ids {
4754                            if row.checkpoint.node_outputs.contains_key(node_id) {
4755                                continue;
4756                            }
4757                            let Some(node) = automation
4758                                .flow
4759                                .nodes
4760                                .iter()
4761                                .find(|candidate| &candidate.node_id == node_id)
4762                            else {
4763                                continue;
4764                            };
4765                            let attempts =
4766                                row.checkpoint.node_attempts.get(node_id).copied().unwrap_or(1);
4767                            let max_attempts = automation_node_max_attempts(node);
4768                            let terminal = attempts >= max_attempts;
4769                            row.checkpoint.node_outputs.insert(
4770                                node_id.clone(),
4771                                crate::automation_v2::executor::build_node_execution_error_output_with_category(
4772                                    node,
4773                                    &stale_node_detail,
4774                                    terminal,
4775                                    "execution_error",
4776                                ),
4777                            );
4778                            if row.checkpoint.last_failure.is_none() {
4779                                row.checkpoint.last_failure = Some(
4780                                    crate::automation_v2::types::AutomationFailureRecord {
4781                                        node_id: node_id.clone(),
4782                                        reason: stale_node_detail.clone(),
4783                                        failed_at_ms: now_ms(),
4784                                    },
4785                                );
4786                            }
4787                            annotated_nodes.push(node_id.clone());
4788                        }
4789                    }
4790                    row.status = AutomationRunStatus::Paused;
4791                    row.pause_reason = Some("stale_no_provider_activity".to_string());
4792                    row.detail = Some(if annotated_nodes.is_empty() {
4793                        detail.clone()
4794                    } else {
4795                        format!(
4796                            "{}; repairable node(s): {}",
4797                            detail,
4798                            annotated_nodes.join(", ")
4799                        )
4800                    });
4801                    row.stop_kind = Some(AutomationStopKind::StaleReaped);
4802                    row.stop_reason = Some(detail.clone());
4803                    row.active_session_ids.clear();
4804                    row.latest_session_id = None;
4805                    row.active_instance_ids.clear();
4806                    automation::record_automation_lifecycle_event(
4807                        row,
4808                        "run_paused_stale_no_provider_activity",
4809                        Some(detail.clone()),
4810                        Some(AutomationStopKind::StaleReaped),
4811                    );
4812                    if let Some(automation) = automation_snapshot.as_ref() {
4813                        automation::refresh_automation_runtime_state(automation, row);
4814                    }
4815                })
4816                .await
4817                .is_some()
4818            {
4819                reaped += 1;
4820            }
4821        }
4822        reaped
4823    }
4824
4825    pub async fn recover_in_flight_runs(&self) -> usize {
4826        let runs = self
4827            .automation_v2_runs
4828            .read()
4829            .await
4830            .values()
4831            .cloned()
4832            .collect::<Vec<_>>();
4833        let mut recovered = 0usize;
4834        for run in runs {
4835            match run.status {
4836                AutomationRunStatus::Running => {
4837                    let detail = "automation run interrupted by server restart".to_string();
4838                    if self
4839                        .update_automation_v2_run(&run.run_id, |row| {
4840                            row.status = AutomationRunStatus::Failed;
4841                            row.detail = Some(detail.clone());
4842                            row.stop_kind = Some(AutomationStopKind::ServerRestart);
4843                            row.stop_reason = Some(detail.clone());
4844                            automation::record_automation_lifecycle_event(
4845                                row,
4846                                "run_failed_server_restart",
4847                                Some(detail.clone()),
4848                                Some(AutomationStopKind::ServerRestart),
4849                            );
4850                        })
4851                        .await
4852                        .is_some()
4853                    {
4854                        recovered += 1;
4855                    }
4856                }
4857                AutomationRunStatus::Paused
4858                | AutomationRunStatus::Pausing
4859                | AutomationRunStatus::AwaitingApproval => {
4860                    let mut scheduler = self.automation_scheduler.write().await;
4861                    if automation_status_holds_workspace_lock(&run.status) {
4862                        let workspace_root = self.automation_v2_run_workspace_root(&run).await;
4863                        scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4864                    }
4865                    for (node_id, output) in &run.checkpoint.node_outputs {
4866                        if let Some((path, content_digest)) =
4867                            automation::node_output::automation_output_validated_artifact(output)
4868                        {
4869                            scheduler.preexisting_registry.register_validated(
4870                                &run.run_id,
4871                                node_id,
4872                                automation::scheduler::ValidatedArtifact {
4873                                    path,
4874                                    content_digest,
4875                                },
4876                            );
4877                        }
4878                    }
4879                }
4880                _ => {}
4881            }
4882        }
4883        recovered
4884    }
4885
4886    pub async fn auto_resume_stale_reaped_runs(&self) -> usize {
4887        let candidate_runs = self
4888            .automation_v2_runs
4889            .read()
4890            .await
4891            .values()
4892            .filter(|run| run.status == AutomationRunStatus::Paused)
4893            .filter(|run| run.stop_kind == Some(AutomationStopKind::StaleReaped))
4894            .cloned()
4895            .collect::<Vec<_>>();
4896        let mut resumed = 0usize;
4897        for run in candidate_runs {
4898            let auto_resume_count = run
4899                .checkpoint
4900                .lifecycle_history
4901                .iter()
4902                .filter(|event| event.event == "run_auto_resumed")
4903                .count();
4904            if auto_resume_count >= 2 {
4905                continue;
4906            }
4907            let automation = self.get_automation_v2(&run.automation_id).await;
4908            let automation = match automation.or(run.automation_snapshot.clone()) {
4909                Some(a) => a,
4910                None => continue,
4911            };
4912            let has_repairable_nodes = automation.flow.nodes.iter().any(|node| {
4913                if run.checkpoint.completed_nodes.contains(&node.node_id) {
4914                    return false;
4915                }
4916                if run.checkpoint.node_outputs.contains_key(&node.node_id) {
4917                    let status = run.checkpoint.node_outputs[&node.node_id]
4918                        .get("status")
4919                        .and_then(Value::as_str)
4920                        .unwrap_or_default()
4921                        .to_ascii_lowercase();
4922                    if status != "needs_repair" {
4923                        return false;
4924                    }
4925                } else {
4926                    return false;
4927                }
4928                let attempts = run
4929                    .checkpoint
4930                    .node_attempts
4931                    .get(&node.node_id)
4932                    .copied()
4933                    .unwrap_or(0);
4934                let max_attempts = automation_node_max_attempts(node);
4935                attempts < max_attempts
4936            });
4937            if !has_repairable_nodes {
4938                continue;
4939            }
4940            if self
4941                .update_automation_v2_run(&run.run_id, |row| {
4942                    row.status = AutomationRunStatus::Queued;
4943                    row.pause_reason = None;
4944                    row.detail = None;
4945                    row.stop_kind = None;
4946                    row.stop_reason = None;
4947                    automation::record_automation_lifecycle_event(
4948                        row,
4949                        "run_auto_resumed",
4950                        Some("auto_resume_after_stale_reap".to_string()),
4951                        None,
4952                    );
4953                })
4954                .await
4955                .is_some()
4956            {
4957                resumed += 1;
4958            }
4959        }
4960        resumed
4961    }
4962
4963    pub fn is_automation_scheduler_stopping(&self) -> bool {
4964        self.automation_scheduler_stopping.load(Ordering::Relaxed)
4965    }
4966
4967    pub fn set_automation_scheduler_stopping(&self, stopping: bool) {
4968        self.automation_scheduler_stopping
4969            .store(stopping, Ordering::Relaxed);
4970    }
4971
4972    pub async fn fail_running_automation_runs_for_shutdown(&self) -> usize {
4973        let run_ids = self
4974            .automation_v2_runs
4975            .read()
4976            .await
4977            .values()
4978            .filter(|run| matches!(run.status, AutomationRunStatus::Running))
4979            .map(|run| run.run_id.clone())
4980            .collect::<Vec<_>>();
4981        let mut failed = 0usize;
4982        for run_id in run_ids {
4983            let detail = "automation run stopped during server shutdown".to_string();
4984            if self
4985                .update_automation_v2_run(&run_id, |row| {
4986                    row.status = AutomationRunStatus::Failed;
4987                    row.detail = Some(detail.clone());
4988                    row.stop_kind = Some(AutomationStopKind::Shutdown);
4989                    row.stop_reason = Some(detail.clone());
4990                    automation::record_automation_lifecycle_event(
4991                        row,
4992                        "run_failed_shutdown",
4993                        Some(detail.clone()),
4994                        Some(AutomationStopKind::Shutdown),
4995                    );
4996                })
4997                .await
4998                .is_some()
4999            {
5000                failed += 1;
5001            }
5002        }
5003        failed
5004    }
5005
5006    pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
5007        let run_id = self
5008            .automation_v2_runs
5009            .read()
5010            .await
5011            .values()
5012            .filter(|row| row.status == AutomationRunStatus::Queued)
5013            .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
5014            .map(|row| row.run_id.clone())?;
5015        self.claim_specific_automation_v2_run(&run_id).await
5016    }
5017    pub async fn claim_specific_automation_v2_run(
5018        &self,
5019        run_id: &str,
5020    ) -> Option<AutomationV2RunRecord> {
5021        let (automation_snapshot, previous_status) = {
5022            let mut guard = self.automation_v2_runs.write().await;
5023            let run = guard.get_mut(run_id)?;
5024            if run.status != AutomationRunStatus::Queued {
5025                return None;
5026            }
5027            (run.automation_snapshot.clone(), run.status.clone())
5028        };
5029        let runtime_context_required = automation_snapshot
5030            .as_ref()
5031            .map(crate::automation_v2::types::AutomationV2Spec::requires_runtime_context)
5032            .unwrap_or(false);
5033        let runtime_context = match automation_snapshot.as_ref() {
5034            Some(automation) => self
5035                .automation_v2_effective_runtime_context(
5036                    automation,
5037                    automation
5038                        .runtime_context_materialization()
5039                        .or_else(|| automation.approved_plan_runtime_context_materialization()),
5040                )
5041                .await
5042                .ok()
5043                .flatten(),
5044            None => None,
5045        };
5046        if runtime_context_required && runtime_context.is_none() {
5047            let mut guard = self.automation_v2_runs.write().await;
5048            let run = guard.get_mut(run_id)?;
5049            if run.status != AutomationRunStatus::Queued {
5050                return None;
5051            }
5052            let previous_status = run.status.clone();
5053            let now = now_ms();
5054            run.status = AutomationRunStatus::Failed;
5055            run.updated_at_ms = now;
5056            run.finished_at_ms.get_or_insert(now);
5057            run.scheduler = None;
5058            run.detail = Some("runtime context partition missing for automation run".to_string());
5059            let claimed = run.clone();
5060            drop(guard);
5061            self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
5062                .await;
5063            let _ = self.persist_automation_v2_runs().await;
5064            return None;
5065        }
5066
5067        let mut guard = self.automation_v2_runs.write().await;
5068        let run = guard.get_mut(run_id)?;
5069        if run.status != AutomationRunStatus::Queued {
5070            return None;
5071        }
5072        let now = now_ms();
5073        run.runtime_context = runtime_context;
5074        run.status = AutomationRunStatus::Running;
5075        run.updated_at_ms = now;
5076        run.started_at_ms.get_or_insert(now);
5077        run.scheduler = None;
5078        let claimed = run.clone();
5079        drop(guard);
5080        self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
5081            .await;
5082        let _ = self.persist_automation_v2_runs().await;
5083        Some(claimed)
5084    }
5085    pub async fn update_automation_v2_run(
5086        &self,
5087        run_id: &str,
5088        update: impl FnOnce(&mut AutomationV2RunRecord),
5089    ) -> Option<AutomationV2RunRecord> {
5090        let mut guard = self.automation_v2_runs.write().await;
5091        let run = guard.get_mut(run_id)?;
5092        let previous_status = run.status.clone();
5093        update(run);
5094        if run.status != AutomationRunStatus::Queued {
5095            run.scheduler = None;
5096        }
5097        run.updated_at_ms = now_ms();
5098        if matches!(
5099            run.status,
5100            AutomationRunStatus::Completed
5101                | AutomationRunStatus::Blocked
5102                | AutomationRunStatus::Failed
5103                | AutomationRunStatus::Cancelled
5104        ) {
5105            run.finished_at_ms.get_or_insert_with(now_ms);
5106        }
5107        let out = run.clone();
5108        drop(guard);
5109        self.sync_automation_scheduler_for_run_transition(previous_status, &out)
5110            .await;
5111        let _ = self.persist_automation_v2_runs().await;
5112        let _ = self.persist_automation_v2_run_status_json(&out).await;
5113        Some(out)
5114    }
5115
5116    async fn persist_automation_v2_run_status_json(
5117        &self,
5118        run: &AutomationV2RunRecord,
5119    ) -> anyhow::Result<()> {
5120        let default_workspace = self.workspace_index.snapshot().await.root.clone();
5121        let automation = run.automation_snapshot.as_ref();
5122        let workspace_root = if let Some(ref a) = automation {
5123            if let Some(ref wr) = a.workspace_root {
5124                if !wr.trim().is_empty() {
5125                    wr.trim().to_string()
5126                } else {
5127                    a.metadata
5128                        .as_ref()
5129                        .and_then(|m| m.get("workspace_root"))
5130                        .and_then(Value::as_str)
5131                        .map(str::to_string)
5132                        .unwrap_or_else(|| default_workspace.clone())
5133                }
5134            } else {
5135                a.metadata
5136                    .as_ref()
5137                    .and_then(|m| m.get("workspace_root"))
5138                    .and_then(Value::as_str)
5139                    .map(str::to_string)
5140                    .unwrap_or_else(|| default_workspace.clone())
5141            }
5142        } else {
5143            default_workspace
5144        };
5145        let run_dir = PathBuf::from(&workspace_root)
5146            .join(".tandem")
5147            .join("runs")
5148            .join(&run.run_id);
5149        let status_path = run_dir.join("status.json");
5150        let status_json = json!({
5151            "run_id": run.run_id,
5152            "automation_id": run.automation_id,
5153            "status": run.status,
5154            "detail": run.detail,
5155            "completed_nodes": run.checkpoint.completed_nodes,
5156            "pending_nodes": run.checkpoint.pending_nodes,
5157            "blocked_nodes": run.checkpoint.blocked_nodes,
5158            "node_attempts": run.checkpoint.node_attempts,
5159            "last_failure": run.checkpoint.last_failure,
5160            "updated_at_ms": run.updated_at_ms,
5161        });
5162        fs::create_dir_all(&run_dir).await?;
5163        fs::write(&status_path, serde_json::to_string_pretty(&status_json)?).await?;
5164        Ok(())
5165    }
5166
5167    pub async fn set_automation_v2_run_scheduler_metadata(
5168        &self,
5169        run_id: &str,
5170        meta: automation::SchedulerMetadata,
5171    ) -> Option<AutomationV2RunRecord> {
5172        self.update_automation_v2_run(run_id, |row| {
5173            row.scheduler = Some(meta);
5174        })
5175        .await
5176    }
5177
5178    pub async fn clear_automation_v2_run_scheduler_metadata(
5179        &self,
5180        run_id: &str,
5181    ) -> Option<AutomationV2RunRecord> {
5182        self.update_automation_v2_run(run_id, |row| {
5183            row.scheduler = None;
5184        })
5185        .await
5186    }
5187
5188    pub async fn add_automation_v2_session(
5189        &self,
5190        run_id: &str,
5191        session_id: &str,
5192    ) -> Option<AutomationV2RunRecord> {
5193        let updated = self
5194            .update_automation_v2_run(run_id, |row| {
5195                if !row.active_session_ids.iter().any(|id| id == session_id) {
5196                    row.active_session_ids.push(session_id.to_string());
5197                }
5198                row.latest_session_id = Some(session_id.to_string());
5199            })
5200            .await;
5201        self.automation_v2_session_runs
5202            .write()
5203            .await
5204            .insert(session_id.to_string(), run_id.to_string());
5205        updated
5206    }
5207
5208    pub async fn set_automation_v2_session_mcp_servers(
5209        &self,
5210        session_id: &str,
5211        servers: Vec<String>,
5212    ) {
5213        if servers.is_empty() {
5214            self.automation_v2_session_mcp_servers
5215                .write()
5216                .await
5217                .remove(session_id);
5218        } else {
5219            self.automation_v2_session_mcp_servers
5220                .write()
5221                .await
5222                .insert(session_id.to_string(), servers);
5223        }
5224    }
5225
5226    pub async fn clear_automation_v2_session_mcp_servers(&self, session_id: &str) {
5227        self.automation_v2_session_mcp_servers
5228            .write()
5229            .await
5230            .remove(session_id);
5231    }
5232
5233    pub async fn clear_automation_v2_session(
5234        &self,
5235        run_id: &str,
5236        session_id: &str,
5237    ) -> Option<AutomationV2RunRecord> {
5238        self.automation_v2_session_runs
5239            .write()
5240            .await
5241            .remove(session_id);
5242        self.update_automation_v2_run(run_id, |row| {
5243            row.active_session_ids.retain(|id| id != session_id);
5244        })
5245        .await
5246    }
5247
5248    pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
5249        let mut guard = self.automation_v2_session_runs.write().await;
5250        for session_id in session_ids {
5251            guard.remove(session_id);
5252        }
5253        let mut mcp_guard = self.automation_v2_session_mcp_servers.write().await;
5254        for session_id in session_ids {
5255            mcp_guard.remove(session_id);
5256        }
5257    }
5258
5259    pub async fn add_automation_v2_instance(
5260        &self,
5261        run_id: &str,
5262        instance_id: &str,
5263    ) -> Option<AutomationV2RunRecord> {
5264        self.update_automation_v2_run(run_id, |row| {
5265            if !row.active_instance_ids.iter().any(|id| id == instance_id) {
5266                row.active_instance_ids.push(instance_id.to_string());
5267            }
5268        })
5269        .await
5270    }
5271
5272    pub async fn clear_automation_v2_instance(
5273        &self,
5274        run_id: &str,
5275        instance_id: &str,
5276    ) -> Option<AutomationV2RunRecord> {
5277        self.update_automation_v2_run(run_id, |row| {
5278            row.active_instance_ids.retain(|id| id != instance_id);
5279        })
5280        .await
5281    }
5282
5283    pub async fn apply_provider_usage_to_runs(
5284        &self,
5285        session_id: &str,
5286        prompt_tokens: u64,
5287        completion_tokens: u64,
5288        total_tokens: u64,
5289    ) {
5290        if let Some(policy) = self.routine_session_policy(session_id).await {
5291            let rate = self.token_cost_per_1k_usd.max(0.0);
5292            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
5293            let mut guard = self.routine_runs.write().await;
5294            if let Some(run) = guard.get_mut(&policy.run_id) {
5295                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
5296                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
5297                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
5298                run.estimated_cost_usd += delta_cost;
5299                run.updated_at_ms = now_ms();
5300            }
5301            drop(guard);
5302            let _ = self.persist_routine_runs().await;
5303        }
5304
5305        let maybe_v2_run_id = self
5306            .automation_v2_session_runs
5307            .read()
5308            .await
5309            .get(session_id)
5310            .cloned();
5311        if let Some(run_id) = maybe_v2_run_id {
5312            let rate = self.token_cost_per_1k_usd.max(0.0);
5313            let delta_cost = (total_tokens as f64 / 1000.0) * rate;
5314            let mut guard = self.automation_v2_runs.write().await;
5315            if let Some(run) = guard.get_mut(&run_id) {
5316                run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
5317                run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
5318                run.total_tokens = run.total_tokens.saturating_add(total_tokens);
5319                run.estimated_cost_usd += delta_cost;
5320                run.updated_at_ms = now_ms();
5321            }
5322            drop(guard);
5323            let _ = self.persist_automation_v2_runs().await;
5324        }
5325    }
5326
5327    pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
5328        let mut fired = Vec::new();
5329        let mut guard = self.automations_v2.write().await;
5330        for automation in guard.values_mut() {
5331            if automation.status != AutomationV2Status::Active {
5332                continue;
5333            }
5334            let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
5335                automation.next_fire_at_ms =
5336                    automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
5337                continue;
5338            };
5339            if now_ms < next_fire_at_ms {
5340                continue;
5341            }
5342            let run_count =
5343                automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
5344            let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
5345            automation.next_fire_at_ms = next;
5346            automation.last_fired_at_ms = Some(now_ms);
5347            for _ in 0..run_count {
5348                fired.push(automation.automation_id.clone());
5349            }
5350        }
5351        drop(guard);
5352        let _ = self.persist_automations_v2().await;
5353        fired
5354    }
5355
5356    /// Evaluate watch conditions for all active automations and return the IDs of
5357    /// automations whose conditions are met, along with a human-readable trigger reason
5358    /// and the handoff that triggered it (if any).
5359    ///
5360    /// An automation is skipped if it already has a `Queued` or `Running` run (dedup).
5361    pub async fn evaluate_automation_v2_watches(
5362        &self,
5363    ) -> Vec<(
5364        String,
5365        String,
5366        Option<crate::automation_v2::types::HandoffArtifact>,
5367    )> {
5368        use crate::automation_v2::types::{AutomationRunStatus, WatchCondition};
5369
5370        // Snapshot of automations that have watch conditions and are Active.
5371        let candidates: Vec<crate::automation_v2::types::AutomationV2Spec> = {
5372            let guard = self.automations_v2.read().await;
5373            guard
5374                .values()
5375                .filter(|a| {
5376                    a.status == crate::automation_v2::types::AutomationV2Status::Active
5377                        && a.has_watch_conditions()
5378                })
5379                .cloned()
5380                .collect()
5381        };
5382
5383        // Snapshot active run statuses to implement dedup.
5384        let active_automation_ids: std::collections::HashSet<String> = {
5385            let runs = self.automation_v2_runs.read().await;
5386            runs.values()
5387                .filter(|r| {
5388                    matches!(
5389                        r.status,
5390                        AutomationRunStatus::Queued | AutomationRunStatus::Running
5391                    )
5392                })
5393                .map(|r| r.automation_id.clone())
5394                .collect()
5395        };
5396
5397        let workspace_root = self.workspace_index.snapshot().await.root;
5398        let mut results = Vec::new();
5399
5400        'outer: for automation in candidates {
5401            // Dedup: skip if already queued or running.
5402            if active_automation_ids.contains(&automation.automation_id) {
5403                continue;
5404            }
5405
5406            let handoff_cfg = automation.effective_handoff_config();
5407            let approved_dir =
5408                std::path::Path::new(&workspace_root).join(&handoff_cfg.approved_dir);
5409
5410            for condition in &automation.watch_conditions {
5411                match condition {
5412                    WatchCondition::HandoffAvailable {
5413                        source_automation_id,
5414                        artifact_type,
5415                    } => {
5416                        if let Some(handoff) = find_matching_handoff(
5417                            &approved_dir,
5418                            &automation.automation_id,
5419                            source_automation_id.as_deref(),
5420                            artifact_type.as_deref(),
5421                        )
5422                        .await
5423                        {
5424                            let reason = format!(
5425                                "handoff `{}` of type `{}` from `{}` is available",
5426                                handoff.handoff_id,
5427                                handoff.artifact_type,
5428                                handoff.source_automation_id
5429                            );
5430                            results.push((automation.automation_id.clone(), reason, Some(handoff)));
5431                            continue 'outer;
5432                        }
5433                    }
5434                }
5435            }
5436        }
5437
5438        results
5439    }
5440
5441    /// Create a run triggered by a watch condition, recording the trigger reason
5442    /// and the consumed handoff ID (if any).
5443    pub async fn create_automation_v2_watch_run(
5444        &self,
5445        automation: &crate::automation_v2::types::AutomationV2Spec,
5446        trigger_reason: String,
5447        consumed_handoff_id: Option<String>,
5448    ) -> anyhow::Result<crate::automation_v2::types::AutomationV2RunRecord> {
5449        use crate::automation_v2::types::{
5450            AutomationRunCheckpoint, AutomationRunStatus, AutomationV2RunRecord,
5451        };
5452        let now = now_ms();
5453        let runtime_context = self
5454            .automation_v2_effective_runtime_context(
5455                automation,
5456                automation
5457                    .runtime_context_materialization()
5458                    .or_else(|| automation.approved_plan_runtime_context_materialization()),
5459            )
5460            .await?;
5461        let pending_nodes = automation
5462            .flow
5463            .nodes
5464            .iter()
5465            .map(|n| n.node_id.clone())
5466            .collect::<Vec<_>>();
5467        let run = AutomationV2RunRecord {
5468            run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
5469            automation_id: automation.automation_id.clone(),
5470            tenant_context: TenantContext::local_implicit(),
5471            trigger_type: "watch_condition".to_string(),
5472            status: AutomationRunStatus::Queued,
5473            created_at_ms: now,
5474            updated_at_ms: now,
5475            started_at_ms: None,
5476            finished_at_ms: None,
5477            active_session_ids: Vec::new(),
5478            latest_session_id: None,
5479            active_instance_ids: Vec::new(),
5480            checkpoint: AutomationRunCheckpoint {
5481                completed_nodes: Vec::new(),
5482                pending_nodes,
5483                node_outputs: std::collections::HashMap::new(),
5484                node_attempts: std::collections::HashMap::new(),
5485                blocked_nodes: Vec::new(),
5486                awaiting_gate: None,
5487                gate_history: Vec::new(),
5488                lifecycle_history: Vec::new(),
5489                last_failure: None,
5490            },
5491            runtime_context,
5492            automation_snapshot: Some(automation.clone()),
5493            pause_reason: None,
5494            resume_reason: None,
5495            detail: None,
5496            stop_kind: None,
5497            stop_reason: None,
5498            prompt_tokens: 0,
5499            completion_tokens: 0,
5500            total_tokens: 0,
5501            estimated_cost_usd: 0.0,
5502            scheduler: None,
5503            trigger_reason: Some(trigger_reason),
5504            consumed_handoff_id,
5505        };
5506        self.automation_v2_runs
5507            .write()
5508            .await
5509            .insert(run.run_id.clone(), run.clone());
5510        self.persist_automation_v2_runs().await?;
5511        crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
5512            .await
5513            .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
5514        Ok(run)
5515    }
5516
5517    /// Deposit a handoff artifact into the workspace `inbox/` directory.
5518    /// If `auto_approve` is true (Phase 1 default), the file is immediately
5519    /// moved to `approved/` so the downstream watch condition can fire on the next tick.
5520    pub async fn deposit_automation_v2_handoff(
5521        &self,
5522        workspace_root: &str,
5523        handoff: &crate::automation_v2::types::HandoffArtifact,
5524        handoff_cfg: &crate::automation_v2::types::AutomationHandoffConfig,
5525    ) -> anyhow::Result<()> {
5526        use tokio::fs;
5527        let root = std::path::Path::new(workspace_root);
5528        let inbox = root.join(&handoff_cfg.inbox_dir);
5529        fs::create_dir_all(&inbox).await?;
5530
5531        let filename = handoff_filename(&handoff.handoff_id);
5532        let content = serde_json::to_string_pretty(handoff)?;
5533
5534        if handoff_cfg.auto_approve {
5535            // Write directly to approved/ (bypass inbox).
5536            let approved = root.join(&handoff_cfg.approved_dir);
5537            fs::create_dir_all(&approved).await?;
5538            fs::write(approved.join(&filename), &content).await?;
5539            tracing::info!(
5540                handoff_id = %handoff.handoff_id,
5541                target = %handoff.target_automation_id,
5542                artifact_type = %handoff.artifact_type,
5543                "handoff deposited (auto-approved)"
5544            );
5545        } else {
5546            fs::write(inbox.join(&filename), &content).await?;
5547            tracing::info!(
5548                handoff_id = %handoff.handoff_id,
5549                target = %handoff.target_automation_id,
5550                artifact_type = %handoff.artifact_type,
5551                "handoff deposited to inbox (awaiting approval)"
5552            );
5553        }
5554        Ok(())
5555    }
5556
5557    /// Atomically consume a handoff artifact: rename it from `approved/` to
5558    /// `archived/`, stamping the consuming run's metadata into the file for audit.
5559    /// Returns the updated artifact. This is idempotent — if the file is already
5560    /// gone from `approved/`, it returns `None` (race-safe).
5561    pub async fn consume_automation_v2_handoff(
5562        &self,
5563        workspace_root: &str,
5564        handoff: &crate::automation_v2::types::HandoffArtifact,
5565        handoff_cfg: &crate::automation_v2::types::AutomationHandoffConfig,
5566        consuming_run_id: &str,
5567        consuming_automation_id: &str,
5568    ) -> anyhow::Result<Option<crate::automation_v2::types::HandoffArtifact>> {
5569        use tokio::fs;
5570        let root = std::path::Path::new(workspace_root);
5571        let filename = handoff_filename(&handoff.handoff_id);
5572        let approved_path = root.join(&handoff_cfg.approved_dir).join(&filename);
5573
5574        if !approved_path.exists() {
5575            // Already consumed by another run (race).
5576            tracing::warn!(
5577                handoff_id = %handoff.handoff_id,
5578                "handoff already consumed or missing from approved dir"
5579            );
5580            return Ok(None);
5581        }
5582
5583        let archived_dir = root.join(&handoff_cfg.archived_dir);
5584        fs::create_dir_all(&archived_dir).await?;
5585
5586        let mut archived = handoff.clone();
5587        archived.consumed_by_run_id = Some(consuming_run_id.to_string());
5588        archived.consumed_by_automation_id = Some(consuming_automation_id.to_string());
5589        archived.consumed_at_ms = Some(now_ms());
5590
5591        // Write the updated envelope to archived/ first, then remove from approved/.
5592        // This ordering means we never lose the record even if the remove fails.
5593        let archived_path = archived_dir.join(&filename);
5594        fs::write(&archived_path, serde_json::to_string_pretty(&archived)?).await?;
5595        let _ = fs::remove_file(&approved_path).await;
5596
5597        tracing::info!(
5598            handoff_id = %handoff.handoff_id,
5599            run_id = %consuming_run_id,
5600            "handoff consumed and archived"
5601        );
5602        Ok(Some(archived))
5603    }
5604}
5605
5606/// Returns the canonical filename for a handoff artifact JSON file.
5607fn handoff_filename(handoff_id: &str) -> String {
5608    // Sanitize the ID so it's safe as a filename component.
5609    let safe: String = handoff_id
5610        .chars()
5611        .map(|c| {
5612            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
5613                c
5614            } else {
5615                '_'
5616            }
5617        })
5618        .collect();
5619    format!("{safe}.json")
5620}
5621
5622/// Scan the `approved_dir` for a handoff that targets `target_automation_id` and
5623/// optionally matches `source_automation_id` and `artifact_type` filters.
5624/// Returns the first matching handoff (oldest by `created_at_ms`), or `None`.
5625///
5626/// Bounds: skips the scan entirely if the directory doesn't exist; caps the scan
5627/// at 256 entries to prevent scheduler stall on large directories.
5628async fn find_matching_handoff(
5629    approved_dir: &std::path::Path,
5630    target_automation_id: &str,
5631    source_filter: Option<&str>,
5632    artifact_type_filter: Option<&str>,
5633) -> Option<crate::automation_v2::types::HandoffArtifact> {
5634    use tokio::fs;
5635    if !approved_dir.exists() {
5636        return None;
5637    }
5638
5639    let mut entries = match fs::read_dir(approved_dir).await {
5640        Ok(entries) => entries,
5641        Err(err) => {
5642            tracing::warn!("handoff watch: failed to read approved dir: {err}");
5643            return None;
5644        }
5645    };
5646
5647    let mut candidates: Vec<crate::automation_v2::types::HandoffArtifact> = Vec::new();
5648    let mut scanned = 0usize;
5649
5650    while let Ok(Some(entry)) = entries.next_entry().await {
5651        if scanned >= 256 {
5652            break;
5653        }
5654        scanned += 1;
5655
5656        let path = entry.path();
5657        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
5658            continue;
5659        }
5660
5661        let raw = match fs::read_to_string(&path).await {
5662            Ok(raw) => raw,
5663            Err(_) => continue,
5664        };
5665        let handoff: crate::automation_v2::types::HandoffArtifact = match serde_json::from_str(&raw)
5666        {
5667            Ok(h) => h,
5668            Err(_) => continue,
5669        };
5670
5671        // Check target match (always required).
5672        if handoff.target_automation_id != target_automation_id {
5673            continue;
5674        }
5675        // Optional source filter.
5676        if let Some(src) = source_filter {
5677            if handoff.source_automation_id != src {
5678                continue;
5679            }
5680        }
5681        // Optional artifact type filter.
5682        if let Some(kind) = artifact_type_filter {
5683            if handoff.artifact_type != kind {
5684                continue;
5685            }
5686        }
5687        // Skip already-consumed handoffs (shouldn't be in approved/ but be defensive).
5688        if handoff.consumed_by_run_id.is_some() {
5689            continue;
5690        }
5691        candidates.push(handoff);
5692    }
5693
5694    // Return the oldest unmatched handoff so we process them in arrival order.
5695    candidates.into_iter().min_by_key(|h| h.created_at_ms)
5696}
5697
5698async fn build_channels_config(
5699    state: &AppState,
5700    channels: &ChannelsConfigFile,
5701) -> Option<ChannelsConfig> {
5702    if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
5703        return None;
5704    }
5705    Some(ChannelsConfig {
5706        telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
5707            bot_token: cfg.bot_token,
5708            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
5709            mention_only: cfg.mention_only,
5710            style_profile: cfg.style_profile,
5711            security_profile: cfg.security_profile,
5712        }),
5713        discord: channels.discord.clone().map(|cfg| DiscordConfig {
5714            bot_token: cfg.bot_token,
5715            guild_id: cfg.guild_id,
5716            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
5717            mention_only: cfg.mention_only,
5718            security_profile: cfg.security_profile,
5719        }),
5720        slack: channels.slack.clone().map(|cfg| SlackConfig {
5721            bot_token: cfg.bot_token,
5722            channel_id: cfg.channel_id,
5723            allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
5724            mention_only: cfg.mention_only,
5725            security_profile: cfg.security_profile,
5726        }),
5727        server_base_url: state.server_base_url(),
5728        api_token: state.api_token().await.unwrap_or_default(),
5729        tool_policy: channels.tool_policy.clone(),
5730    })
5731}
5732
5733// channel config normalization moved to crate::config::channels
5734
5735fn is_valid_owner_repo_slug(value: &str) -> bool {
5736    let trimmed = value.trim();
5737    if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
5738        return false;
5739    }
5740    let mut parts = trimmed.split('/');
5741    let Some(owner) = parts.next() else {
5742        return false;
5743    };
5744    let Some(repo) = parts.next() else {
5745        return false;
5746    };
5747    parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
5748}
5749
5750fn legacy_automations_v2_path() -> Option<PathBuf> {
5751    config::paths::resolve_legacy_root_file_path("automations_v2.json")
5752        .filter(|path| path != &config::paths::resolve_automations_v2_path())
5753}
5754
5755fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
5756    let mut candidates = vec![active_path.clone()];
5757    if let Some(legacy_path) = legacy_automations_v2_path() {
5758        if !candidates.contains(&legacy_path) {
5759            candidates.push(legacy_path);
5760        }
5761    }
5762    let default_path = config::paths::default_state_dir().join("automations_v2.json");
5763    if !candidates.contains(&default_path) {
5764        candidates.push(default_path);
5765    }
5766    candidates
5767}
5768
5769async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
5770    let Some(legacy_path) = legacy_automations_v2_path() else {
5771        return Ok(());
5772    };
5773    if legacy_path == *active_path || !legacy_path.exists() {
5774        return Ok(());
5775    }
5776    fs::remove_file(&legacy_path).await?;
5777    tracing::info!(
5778        active_path = active_path.display().to_string(),
5779        removed_path = legacy_path.display().to_string(),
5780        "removed stale legacy automation v2 file after canonical persistence"
5781    );
5782    Ok(())
5783}
5784
5785fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
5786    config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
5787        .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
5788}
5789
5790fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
5791    let mut candidates = vec![active_path.clone()];
5792    if let Some(legacy_path) = legacy_automation_v2_runs_path() {
5793        if !candidates.contains(&legacy_path) {
5794            candidates.push(legacy_path);
5795        }
5796    }
5797    let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
5798    if !candidates.contains(&default_path) {
5799        candidates.push(default_path);
5800    }
5801    candidates
5802}
5803
5804fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
5805    serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
5806        .unwrap_or_default()
5807}
5808
5809fn parse_automation_v2_runs_file(
5810    raw: &str,
5811) -> std::collections::HashMap<String, AutomationV2RunRecord> {
5812    serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
5813        .unwrap_or_default()
5814}
5815
5816fn parse_optimization_campaigns_file(
5817    raw: &str,
5818) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
5819    serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
5820        .unwrap_or_default()
5821}
5822
5823fn parse_optimization_experiments_file(
5824    raw: &str,
5825) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
5826    serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
5827        .unwrap_or_default()
5828}
5829
5830fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
5831    match schedule {
5832        RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
5833        RoutineSchedule::Cron { .. } => None,
5834    }
5835}
5836
5837fn parse_timezone(timezone: &str) -> Option<Tz> {
5838    timezone.trim().parse::<Tz>().ok()
5839}
5840
5841fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
5842    let tz = parse_timezone(timezone)?;
5843    let schedule = Schedule::from_str(expression).ok()?;
5844    let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
5845    let local_from = from_dt.with_timezone(&tz);
5846    let next = schedule.after(&local_from).next()?;
5847    Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
5848}
5849
5850fn compute_next_schedule_fire_at_ms(
5851    schedule: &RoutineSchedule,
5852    timezone: &str,
5853    from_ms: u64,
5854) -> Option<u64> {
5855    let _ = parse_timezone(timezone)?;
5856    match schedule {
5857        RoutineSchedule::IntervalSeconds { seconds } => {
5858            Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
5859        }
5860        RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
5861    }
5862}
5863
5864fn compute_misfire_plan_for_schedule(
5865    now_ms: u64,
5866    next_fire_at_ms: u64,
5867    schedule: &RoutineSchedule,
5868    timezone: &str,
5869    policy: &RoutineMisfirePolicy,
5870) -> (u32, u64) {
5871    match schedule {
5872        RoutineSchedule::IntervalSeconds { .. } => {
5873            let Some(interval_ms) = routine_interval_ms(schedule) else {
5874                return (0, next_fire_at_ms);
5875            };
5876            compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
5877        }
5878        RoutineSchedule::Cron { expression } => {
5879            let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
5880                .unwrap_or_else(|| now_ms.saturating_add(60_000));
5881            match policy {
5882                RoutineMisfirePolicy::Skip => (0, aligned_next),
5883                RoutineMisfirePolicy::RunOnce => (1, aligned_next),
5884                RoutineMisfirePolicy::CatchUp { max_runs } => {
5885                    let mut count = 0u32;
5886                    let mut cursor = next_fire_at_ms;
5887                    while cursor <= now_ms && count < *max_runs {
5888                        count = count.saturating_add(1);
5889                        let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
5890                            break;
5891                        };
5892                        if next <= cursor {
5893                            break;
5894                        }
5895                        cursor = next;
5896                    }
5897                    (count, aligned_next)
5898                }
5899            }
5900        }
5901    }
5902}
5903
5904fn compute_misfire_plan(
5905    now_ms: u64,
5906    next_fire_at_ms: u64,
5907    interval_ms: u64,
5908    policy: &RoutineMisfirePolicy,
5909) -> (u32, u64) {
5910    if now_ms < next_fire_at_ms || interval_ms == 0 {
5911        return (0, next_fire_at_ms);
5912    }
5913    let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
5914    let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
5915    match policy {
5916        RoutineMisfirePolicy::Skip => (0, aligned_next),
5917        RoutineMisfirePolicy::RunOnce => (1, aligned_next),
5918        RoutineMisfirePolicy::CatchUp { max_runs } => {
5919            let count = missed.min(u64::from(*max_runs)) as u32;
5920            (count, aligned_next)
5921        }
5922    }
5923}
5924
5925fn auto_generated_agent_name(agent_id: &str) -> String {
5926    let names = [
5927        "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
5928    ];
5929    let digest = Sha256::digest(agent_id.as_bytes());
5930    let idx = usize::from(digest[0]) % names.len();
5931    format!("{}-{:02x}", names[idx], digest[1])
5932}
5933
5934fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
5935    match schedule.schedule_type {
5936        AutomationV2ScheduleType::Manual => None,
5937        AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
5938            seconds: schedule.interval_seconds.unwrap_or(60),
5939        }),
5940        AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
5941            expression: schedule.cron_expression.clone().unwrap_or_default(),
5942        }),
5943    }
5944}
5945
5946fn automation_schedule_next_fire_at_ms(
5947    schedule: &AutomationV2Schedule,
5948    from_ms: u64,
5949) -> Option<u64> {
5950    let routine_schedule = schedule_from_automation_v2(schedule)?;
5951    compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
5952}
5953
5954fn automation_schedule_due_count(
5955    schedule: &AutomationV2Schedule,
5956    now_ms: u64,
5957    next_fire_at_ms: u64,
5958) -> u32 {
5959    let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
5960        return 0;
5961    };
5962    let (count, _) = compute_misfire_plan_for_schedule(
5963        now_ms,
5964        next_fire_at_ms,
5965        &routine_schedule,
5966        &schedule.timezone,
5967        &schedule.misfire_policy,
5968    );
5969    count.max(1)
5970}
5971
5972#[derive(Debug, Clone, PartialEq, Eq)]
5973pub enum RoutineExecutionDecision {
5974    Allowed,
5975    RequiresApproval { reason: String },
5976    Blocked { reason: String },
5977}
5978
5979pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
5980    let entrypoint = routine.entrypoint.to_ascii_lowercase();
5981    if entrypoint.starts_with("connector.")
5982        || entrypoint.starts_with("integration.")
5983        || entrypoint.contains("external")
5984    {
5985        return true;
5986    }
5987    routine
5988        .args
5989        .get("uses_external_integrations")
5990        .and_then(|v| v.as_bool())
5991        .unwrap_or(false)
5992        || routine
5993            .args
5994            .get("connector_id")
5995            .and_then(|v| v.as_str())
5996            .is_some()
5997}
5998
5999pub fn evaluate_routine_execution_policy(
6000    routine: &RoutineSpec,
6001    trigger_type: &str,
6002) -> RoutineExecutionDecision {
6003    if !routine_uses_external_integrations(routine) {
6004        return RoutineExecutionDecision::Allowed;
6005    }
6006    if !routine.external_integrations_allowed {
6007        return RoutineExecutionDecision::Blocked {
6008            reason: "external integrations are disabled by policy".to_string(),
6009        };
6010    }
6011    if routine.requires_approval {
6012        return RoutineExecutionDecision::RequiresApproval {
6013            reason: format!(
6014                "manual approval required before external side effects ({})",
6015                trigger_type
6016            ),
6017        };
6018    }
6019    RoutineExecutionDecision::Allowed
6020}
6021
6022fn is_valid_resource_key(key: &str) -> bool {
6023    let trimmed = key.trim();
6024    if trimmed.is_empty() {
6025        return false;
6026    }
6027    if trimmed == "swarm.active_tasks" {
6028        return true;
6029    }
6030    let allowed_prefix = ["run/", "mission/", "project/", "team/"];
6031    if !allowed_prefix
6032        .iter()
6033        .any(|prefix| trimmed.starts_with(prefix))
6034    {
6035        return false;
6036    }
6037    !trimmed.contains("//")
6038}
6039
6040impl Deref for AppState {
6041    type Target = RuntimeState;
6042
6043    fn deref(&self) -> &Self::Target {
6044        self.runtime
6045            .get()
6046            .expect("runtime accessed before startup completion")
6047    }
6048}
6049
6050#[derive(Clone)]
6051struct ServerPromptContextHook {
6052    state: AppState,
6053}
6054
6055impl ServerPromptContextHook {
6056    fn new(state: AppState) -> Self {
6057        Self { state }
6058    }
6059
6060    async fn open_memory_db(&self) -> Option<MemoryDatabase> {
6061        let paths = resolve_shared_paths().ok()?;
6062        MemoryDatabase::new(&paths.memory_db_path).await.ok()
6063    }
6064
6065    async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
6066        let paths = resolve_shared_paths().ok()?;
6067        tandem_memory::MemoryManager::new(&paths.memory_db_path)
6068            .await
6069            .ok()
6070    }
6071
6072    fn hash_query(input: &str) -> String {
6073        let mut hasher = Sha256::new();
6074        hasher.update(input.as_bytes());
6075        format!("{:x}", hasher.finalize())
6076    }
6077
6078    fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
6079        let mut out = vec!["<memory_context>".to_string()];
6080        let mut used = 0usize;
6081        for hit in hits {
6082            let text = hit
6083                .record
6084                .content
6085                .split_whitespace()
6086                .take(60)
6087                .collect::<Vec<_>>()
6088                .join(" ");
6089            let line = format!(
6090                "- [{:.3}] {} (source={}, run={})",
6091                hit.score, text, hit.record.source_type, hit.record.run_id
6092            );
6093            used = used.saturating_add(line.len());
6094            if used > 2200 {
6095                break;
6096            }
6097            out.push(line);
6098        }
6099        out.push("</memory_context>".to_string());
6100        out.join("\n")
6101    }
6102
6103    fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
6104        chunk
6105            .metadata
6106            .as_ref()
6107            .and_then(|meta| meta.get("source_url"))
6108            .and_then(Value::as_str)
6109            .map(str::trim)
6110            .filter(|v| !v.is_empty())
6111            .map(ToString::to_string)
6112    }
6113
6114    fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
6115        if let Some(path) = chunk
6116            .metadata
6117            .as_ref()
6118            .and_then(|meta| meta.get("relative_path"))
6119            .and_then(Value::as_str)
6120            .map(str::trim)
6121            .filter(|v| !v.is_empty())
6122        {
6123            return path.to_string();
6124        }
6125        chunk
6126            .source
6127            .strip_prefix("guide_docs:")
6128            .unwrap_or(chunk.source.as_str())
6129            .to_string()
6130    }
6131
6132    fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
6133        let mut out = vec!["<docs_context>".to_string()];
6134        let mut used = 0usize;
6135        for hit in hits {
6136            let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
6137            let path = Self::extract_docs_relative_path(&hit.chunk);
6138            let text = hit
6139                .chunk
6140                .content
6141                .split_whitespace()
6142                .take(70)
6143                .collect::<Vec<_>>()
6144                .join(" ");
6145            let line = format!(
6146                "- [{:.3}] {} (doc_path={}, source_url={})",
6147                hit.similarity, text, path, url
6148            );
6149            used = used.saturating_add(line.len());
6150            if used > 2800 {
6151                break;
6152            }
6153            out.push(line);
6154        }
6155        out.push("</docs_context>".to_string());
6156        out.join("\n")
6157    }
6158
6159    async fn search_embedded_docs(
6160        &self,
6161        query: &str,
6162        limit: usize,
6163    ) -> Vec<tandem_memory::types::MemorySearchResult> {
6164        let Some(manager) = self.open_memory_manager().await else {
6165            return Vec::new();
6166        };
6167        let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
6168        manager
6169            .search(
6170                query,
6171                Some(MemoryTier::Global),
6172                None,
6173                None,
6174                Some(search_limit),
6175            )
6176            .await
6177            .unwrap_or_default()
6178            .into_iter()
6179            .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
6180            .take(limit)
6181            .collect()
6182    }
6183
6184    fn should_skip_memory_injection(query: &str) -> bool {
6185        let trimmed = query.trim();
6186        if trimmed.is_empty() {
6187            return true;
6188        }
6189        let lower = trimmed.to_ascii_lowercase();
6190        let social = [
6191            "hi",
6192            "hello",
6193            "hey",
6194            "thanks",
6195            "thank you",
6196            "ok",
6197            "okay",
6198            "cool",
6199            "nice",
6200            "yo",
6201            "good morning",
6202            "good afternoon",
6203            "good evening",
6204        ];
6205        lower.len() <= 32 && social.contains(&lower.as_str())
6206    }
6207
6208    fn personality_preset_text(preset: &str) -> &'static str {
6209        match preset {
6210            "concise" => {
6211                "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
6212            }
6213            "friendly" => {
6214                "Default style: friendly and supportive while staying technically rigorous and concrete."
6215            }
6216            "mentor" => {
6217                "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
6218            }
6219            "critical" => {
6220                "Default style: critical and risk-first. Surface failure modes and assumptions early."
6221            }
6222            _ => {
6223                "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
6224            }
6225        }
6226    }
6227
6228    fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
6229        let allow_agent_override = agent_name
6230            .map(|name| !matches!(name, "compaction" | "title" | "summary"))
6231            .unwrap_or(false);
6232        let legacy_bot_name = config
6233            .get("bot_name")
6234            .and_then(Value::as_str)
6235            .map(str::trim)
6236            .filter(|v| !v.is_empty());
6237        let bot_name = config
6238            .get("identity")
6239            .and_then(|identity| identity.get("bot"))
6240            .and_then(|bot| bot.get("canonical_name"))
6241            .and_then(Value::as_str)
6242            .map(str::trim)
6243            .filter(|v| !v.is_empty())
6244            .or(legacy_bot_name)
6245            .unwrap_or("Tandem");
6246
6247        let default_profile = config
6248            .get("identity")
6249            .and_then(|identity| identity.get("personality"))
6250            .and_then(|personality| personality.get("default"));
6251        let default_preset = default_profile
6252            .and_then(|profile| profile.get("preset"))
6253            .and_then(Value::as_str)
6254            .map(str::trim)
6255            .filter(|v| !v.is_empty())
6256            .unwrap_or("balanced");
6257        let default_custom = default_profile
6258            .and_then(|profile| profile.get("custom_instructions"))
6259            .and_then(Value::as_str)
6260            .map(str::trim)
6261            .filter(|v| !v.is_empty())
6262            .map(ToString::to_string);
6263        let legacy_persona = config
6264            .get("persona")
6265            .and_then(Value::as_str)
6266            .map(str::trim)
6267            .filter(|v| !v.is_empty())
6268            .map(ToString::to_string);
6269
6270        let per_agent_profile = if allow_agent_override {
6271            agent_name.and_then(|name| {
6272                config
6273                    .get("identity")
6274                    .and_then(|identity| identity.get("personality"))
6275                    .and_then(|personality| personality.get("per_agent"))
6276                    .and_then(|per_agent| per_agent.get(name))
6277            })
6278        } else {
6279            None
6280        };
6281        let preset = per_agent_profile
6282            .and_then(|profile| profile.get("preset"))
6283            .and_then(Value::as_str)
6284            .map(str::trim)
6285            .filter(|v| !v.is_empty())
6286            .unwrap_or(default_preset);
6287        let custom = per_agent_profile
6288            .and_then(|profile| profile.get("custom_instructions"))
6289            .and_then(Value::as_str)
6290            .map(str::trim)
6291            .filter(|v| !v.is_empty())
6292            .map(ToString::to_string)
6293            .or(default_custom)
6294            .or(legacy_persona);
6295
6296        let mut lines = vec![
6297            format!("You are {bot_name}, an AI assistant."),
6298            Self::personality_preset_text(preset).to_string(),
6299        ];
6300        if let Some(custom) = custom {
6301            lines.push(format!("Additional personality instructions: {custom}"));
6302        }
6303        Some(lines.join("\n"))
6304    }
6305
6306    fn build_memory_scope_block(
6307        session_id: &str,
6308        project_id: Option<&str>,
6309        workspace_root: Option<&str>,
6310    ) -> String {
6311        let mut lines = vec![
6312            "<memory_scope>".to_string(),
6313            format!("- current_session_id: {}", session_id),
6314        ];
6315        if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
6316            lines.push(format!("- current_project_id: {}", project_id));
6317        }
6318        if let Some(workspace_root) = workspace_root
6319            .map(str::trim)
6320            .filter(|value| !value.is_empty())
6321        {
6322            lines.push(format!("- workspace_root: {}", workspace_root));
6323        }
6324        lines.push(
6325            "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
6326                .to_string(),
6327        );
6328        lines.push(
6329            "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
6330                .to_string(),
6331        );
6332        lines.push(
6333            "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
6334                .to_string(),
6335        );
6336        lines.push("</memory_scope>".to_string());
6337        lines.join("\n")
6338    }
6339}
6340
6341impl PromptContextHook for ServerPromptContextHook {
6342    fn augment_provider_messages(
6343        &self,
6344        ctx: PromptContextHookContext,
6345        mut messages: Vec<ChatMessage>,
6346    ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
6347        let this = self.clone();
6348        Box::pin(async move {
6349            // Startup can invoke prompt plumbing before RuntimeState is installed.
6350            // Never panic from context hooks; fail-open and continue without augmentation.
6351            if !this.state.is_ready() {
6352                return Ok(messages);
6353            }
6354            let run = this.state.run_registry.get(&ctx.session_id).await;
6355            let Some(run) = run else {
6356                return Ok(messages);
6357            };
6358            let config = this.state.config.get_effective_value().await;
6359            if let Some(identity_block) =
6360                Self::resolve_identity_block(&config, run.agent_profile.as_deref())
6361            {
6362                messages.push(ChatMessage {
6363                    role: "system".to_string(),
6364                    content: identity_block,
6365                    attachments: Vec::new(),
6366                });
6367            }
6368            if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
6369                messages.push(ChatMessage {
6370                    role: "system".to_string(),
6371                    content: Self::build_memory_scope_block(
6372                        &ctx.session_id,
6373                        session.project_id.as_deref(),
6374                        session.workspace_root.as_deref(),
6375                    ),
6376                    attachments: Vec::new(),
6377                });
6378            }
6379            let run_id = run.run_id;
6380            let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
6381            let query = messages
6382                .iter()
6383                .rev()
6384                .find(|m| m.role == "user")
6385                .map(|m| m.content.clone())
6386                .unwrap_or_default();
6387            if query.trim().is_empty() {
6388                return Ok(messages);
6389            }
6390            if Self::should_skip_memory_injection(&query) {
6391                return Ok(messages);
6392            }
6393
6394            let docs_hits = this.search_embedded_docs(&query, 6).await;
6395            if !docs_hits.is_empty() {
6396                let docs_block = Self::build_docs_memory_block(&docs_hits);
6397                messages.push(ChatMessage {
6398                    role: "system".to_string(),
6399                    content: docs_block.clone(),
6400                    attachments: Vec::new(),
6401                });
6402                this.state.event_bus.publish(EngineEvent::new(
6403                    "memory.docs.context.injected",
6404                    json!({
6405                        "runID": run_id,
6406                        "sessionID": ctx.session_id,
6407                        "messageID": ctx.message_id,
6408                        "iteration": ctx.iteration,
6409                        "count": docs_hits.len(),
6410                        "tokenSizeApprox": docs_block.split_whitespace().count(),
6411                        "sourcePrefix": "guide_docs:"
6412                    }),
6413                ));
6414                return Ok(messages);
6415            }
6416
6417            let Some(db) = this.open_memory_db().await else {
6418                return Ok(messages);
6419            };
6420            let started = now_ms();
6421            let hits = db
6422                .search_global_memory(&user_id, &query, 8, None, None, None)
6423                .await
6424                .unwrap_or_default();
6425            let latency_ms = now_ms().saturating_sub(started);
6426            let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
6427            this.state.event_bus.publish(EngineEvent::new(
6428                "memory.search.performed",
6429                json!({
6430                    "runID": run_id,
6431                    "sessionID": ctx.session_id,
6432                    "messageID": ctx.message_id,
6433                    "providerID": ctx.provider_id,
6434                    "modelID": ctx.model_id,
6435                    "iteration": ctx.iteration,
6436                    "queryHash": Self::hash_query(&query),
6437                    "resultCount": hits.len(),
6438                    "scoreMin": scores.iter().copied().reduce(f64::min),
6439                    "scoreMax": scores.iter().copied().reduce(f64::max),
6440                    "scores": scores,
6441                    "latencyMs": latency_ms,
6442                    "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
6443                }),
6444            ));
6445
6446            if hits.is_empty() {
6447                return Ok(messages);
6448            }
6449
6450            let memory_block = Self::build_memory_block(&hits);
6451            messages.push(ChatMessage {
6452                role: "system".to_string(),
6453                content: memory_block.clone(),
6454                attachments: Vec::new(),
6455            });
6456            this.state.event_bus.publish(EngineEvent::new(
6457                "memory.context.injected",
6458                json!({
6459                    "runID": run_id,
6460                    "sessionID": ctx.session_id,
6461                    "messageID": ctx.message_id,
6462                    "iteration": ctx.iteration,
6463                    "count": hits.len(),
6464                    "tokenSizeApprox": memory_block.split_whitespace().count(),
6465                }),
6466            ));
6467            Ok(messages)
6468        })
6469    }
6470}
6471
6472fn extract_event_session_id(properties: &Value) -> Option<String> {
6473    properties
6474        .get("sessionID")
6475        .or_else(|| properties.get("sessionId"))
6476        .or_else(|| properties.get("id"))
6477        .or_else(|| {
6478            properties
6479                .get("part")
6480                .and_then(|part| part.get("sessionID"))
6481        })
6482        .or_else(|| {
6483            properties
6484                .get("part")
6485                .and_then(|part| part.get("sessionId"))
6486        })
6487        .and_then(|v| v.as_str())
6488        .map(|s| s.to_string())
6489}
6490
6491fn extract_event_run_id(properties: &Value) -> Option<String> {
6492    properties
6493        .get("runID")
6494        .or_else(|| properties.get("run_id"))
6495        .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
6496        .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
6497        .and_then(|v| v.as_str())
6498        .map(|s| s.to_string())
6499}
6500
6501pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
6502    let part = properties.get("part")?;
6503    let part_type = part
6504        .get("type")
6505        .and_then(|v| v.as_str())
6506        .unwrap_or_default()
6507        .to_ascii_lowercase();
6508    if part_type != "tool"
6509        && part_type != "tool-invocation"
6510        && part_type != "tool-result"
6511        && part_type != "tool_invocation"
6512        && part_type != "tool_result"
6513    {
6514        return None;
6515    }
6516    let part_state = part
6517        .get("state")
6518        .and_then(|v| v.as_str())
6519        .unwrap_or_default()
6520        .to_ascii_lowercase();
6521    let has_result = part.get("result").is_some_and(|value| !value.is_null());
6522    let has_error = part
6523        .get("error")
6524        .and_then(|v| v.as_str())
6525        .is_some_and(|value| !value.trim().is_empty());
6526    // Skip transient "running" deltas to avoid persistence storms from streamed
6527    // tool-argument chunks; keep actionable/final updates.
6528    if part_state == "running" && !has_result && !has_error {
6529        return None;
6530    }
6531    let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
6532    let message_id = part
6533        .get("messageID")
6534        .or_else(|| part.get("message_id"))
6535        .and_then(|v| v.as_str())?
6536        .to_string();
6537    let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
6538    if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
6539        if let Some(preview) = properties
6540            .get("toolCallDelta")
6541            .and_then(|delta| delta.get("parsedArgsPreview"))
6542            .cloned()
6543        {
6544            let preview_nonempty = !preview.is_null()
6545                && !preview.as_object().is_some_and(|value| value.is_empty())
6546                && !preview
6547                    .as_str()
6548                    .map(|value| value.trim().is_empty())
6549                    .unwrap_or(false);
6550            if preview_nonempty {
6551                args = preview;
6552            }
6553        }
6554        if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
6555            if let Some(raw_preview) = properties
6556                .get("toolCallDelta")
6557                .and_then(|delta| delta.get("rawArgsPreview"))
6558                .and_then(|value| value.as_str())
6559                .map(str::trim)
6560                .filter(|value| !value.is_empty())
6561            {
6562                args = Value::String(raw_preview.to_string());
6563            }
6564        }
6565    }
6566    if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
6567    {
6568        tracing::info!(
6569            message_id = %message_id,
6570            has_tool_call_delta = properties.get("toolCallDelta").is_some(),
6571            part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
6572            has_result = part.get("result").is_some(),
6573            has_error = part.get("error").is_some(),
6574            "persistable write tool part still has empty args"
6575        );
6576    }
6577    let result = part.get("result").cloned().filter(|value| !value.is_null());
6578    let error = part
6579        .get("error")
6580        .and_then(|v| v.as_str())
6581        .map(|value| value.to_string());
6582    Some((
6583        message_id,
6584        MessagePart::ToolInvocation {
6585            tool,
6586            args,
6587            result,
6588            error,
6589        },
6590    ))
6591}
6592
6593pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
6594    let session_id = extract_event_session_id(&event.properties)?;
6595    let run_id = extract_event_run_id(&event.properties);
6596    let key = format!("run/{session_id}/status");
6597
6598    let mut base = serde_json::Map::new();
6599    base.insert("sessionID".to_string(), Value::String(session_id));
6600    if let Some(run_id) = run_id {
6601        base.insert("runID".to_string(), Value::String(run_id));
6602    }
6603
6604    match event.event_type.as_str() {
6605        "session.run.started" => {
6606            base.insert("state".to_string(), Value::String("running".to_string()));
6607            base.insert("phase".to_string(), Value::String("run".to_string()));
6608            base.insert(
6609                "eventType".to_string(),
6610                Value::String("session.run.started".to_string()),
6611            );
6612            Some(StatusIndexUpdate {
6613                key,
6614                value: Value::Object(base),
6615            })
6616        }
6617        "session.run.finished" => {
6618            base.insert("state".to_string(), Value::String("finished".to_string()));
6619            base.insert("phase".to_string(), Value::String("run".to_string()));
6620            if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
6621                base.insert("result".to_string(), Value::String(status.to_string()));
6622            }
6623            base.insert(
6624                "eventType".to_string(),
6625                Value::String("session.run.finished".to_string()),
6626            );
6627            Some(StatusIndexUpdate {
6628                key,
6629                value: Value::Object(base),
6630            })
6631        }
6632        "message.part.updated" => {
6633            let part = event.properties.get("part")?;
6634            let part_type = part.get("type").and_then(|v| v.as_str())?;
6635            let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
6636            let (phase, tool_active) = match (part_type, part_state) {
6637                ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
6638                ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
6639                _ => return None,
6640            };
6641            base.insert("state".to_string(), Value::String("running".to_string()));
6642            base.insert("phase".to_string(), Value::String(phase.to_string()));
6643            base.insert("toolActive".to_string(), Value::Bool(tool_active));
6644            if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
6645                base.insert("tool".to_string(), Value::String(tool_name.to_string()));
6646            }
6647            if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
6648                base.insert(
6649                    "toolState".to_string(),
6650                    Value::String(tool_state.to_string()),
6651                );
6652            }
6653            if let Some(tool_error) = part
6654                .get("error")
6655                .and_then(|v| v.as_str())
6656                .map(str::trim)
6657                .filter(|value| !value.is_empty())
6658            {
6659                base.insert(
6660                    "toolError".to_string(),
6661                    Value::String(tool_error.to_string()),
6662                );
6663            }
6664            if let Some(tool_call_id) = part
6665                .get("id")
6666                .and_then(|v| v.as_str())
6667                .map(str::trim)
6668                .filter(|value| !value.is_empty())
6669            {
6670                base.insert(
6671                    "toolCallID".to_string(),
6672                    Value::String(tool_call_id.to_string()),
6673                );
6674            }
6675            if let Some(args_preview) = part
6676                .get("args")
6677                .filter(|value| {
6678                    !value.is_null()
6679                        && !value.as_object().is_some_and(|map| map.is_empty())
6680                        && !value
6681                            .as_str()
6682                            .map(|text| text.trim().is_empty())
6683                            .unwrap_or(false)
6684                })
6685                .map(|value| truncate_text(&value.to_string(), 500))
6686            {
6687                base.insert(
6688                    "toolArgsPreview".to_string(),
6689                    Value::String(args_preview.to_string()),
6690                );
6691            }
6692            base.insert(
6693                "eventType".to_string(),
6694                Value::String("message.part.updated".to_string()),
6695            );
6696            Some(StatusIndexUpdate {
6697                key,
6698                value: Value::Object(base),
6699            })
6700        }
6701        _ => None,
6702    }
6703}
6704
6705pub async fn run_session_part_persister(state: AppState) {
6706    crate::app::tasks::run_session_part_persister(state).await
6707}
6708
6709pub async fn run_status_indexer(state: AppState) {
6710    crate::app::tasks::run_status_indexer(state).await
6711}
6712
6713pub async fn run_agent_team_supervisor(state: AppState) {
6714    crate::app::tasks::run_agent_team_supervisor(state).await
6715}
6716
6717pub async fn run_bug_monitor(state: AppState) {
6718    crate::app::tasks::run_bug_monitor(state).await
6719}
6720
6721pub async fn run_usage_aggregator(state: AppState) {
6722    crate::app::tasks::run_usage_aggregator(state).await
6723}
6724
6725pub async fn run_optimization_scheduler(state: AppState) {
6726    crate::app::tasks::run_optimization_scheduler(state).await
6727}
6728
6729pub async fn process_bug_monitor_event(
6730    state: &AppState,
6731    event: &EngineEvent,
6732    config: &BugMonitorConfig,
6733) -> anyhow::Result<BugMonitorIncidentRecord> {
6734    let submission =
6735        crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
6736            .await?;
6737    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
6738        state,
6739        submission.repo.as_deref().unwrap_or_default(),
6740        submission.fingerprint.as_deref().unwrap_or_default(),
6741        submission.title.as_deref(),
6742        submission.detail.as_deref(),
6743        &submission.excerpt,
6744        3,
6745    )
6746    .await;
6747    let fingerprint = submission
6748        .fingerprint
6749        .clone()
6750        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
6751    let default_workspace_root = state.workspace_index.snapshot().await.root;
6752    let workspace_root = config
6753        .workspace_root
6754        .clone()
6755        .unwrap_or(default_workspace_root);
6756    let now = now_ms();
6757
6758    let existing = state
6759        .bug_monitor_incidents
6760        .read()
6761        .await
6762        .values()
6763        .find(|row| row.fingerprint == fingerprint)
6764        .cloned();
6765
6766    let mut incident = if let Some(mut row) = existing {
6767        row.occurrence_count = row.occurrence_count.saturating_add(1);
6768        row.updated_at_ms = now;
6769        row.last_seen_at_ms = Some(now);
6770        if row.excerpt.is_empty() {
6771            row.excerpt = submission.excerpt.clone();
6772        }
6773        row
6774    } else {
6775        BugMonitorIncidentRecord {
6776            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
6777            fingerprint: fingerprint.clone(),
6778            event_type: event.event_type.clone(),
6779            status: "queued".to_string(),
6780            repo: submission.repo.clone().unwrap_or_default(),
6781            workspace_root,
6782            title: submission
6783                .title
6784                .clone()
6785                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
6786            detail: submission.detail.clone(),
6787            excerpt: submission.excerpt.clone(),
6788            source: submission.source.clone(),
6789            run_id: submission.run_id.clone(),
6790            session_id: submission.session_id.clone(),
6791            correlation_id: submission.correlation_id.clone(),
6792            component: submission.component.clone(),
6793            level: submission.level.clone(),
6794            occurrence_count: 1,
6795            created_at_ms: now,
6796            updated_at_ms: now,
6797            last_seen_at_ms: Some(now),
6798            draft_id: None,
6799            triage_run_id: None,
6800            last_error: None,
6801            duplicate_summary: None,
6802            duplicate_matches: None,
6803            event_payload: Some(event.properties.clone()),
6804        }
6805    };
6806    state.put_bug_monitor_incident(incident.clone()).await?;
6807
6808    if !duplicate_matches.is_empty() {
6809        incident.status = "duplicate_suppressed".to_string();
6810        let duplicate_summary =
6811            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
6812        incident.duplicate_summary = Some(duplicate_summary.clone());
6813        incident.duplicate_matches = Some(duplicate_matches.clone());
6814        incident.updated_at_ms = now_ms();
6815        state.put_bug_monitor_incident(incident.clone()).await?;
6816        state.event_bus.publish(EngineEvent::new(
6817            "bug_monitor.incident.duplicate_suppressed",
6818            serde_json::json!({
6819                "incident_id": incident.incident_id,
6820                "fingerprint": incident.fingerprint,
6821                "eventType": incident.event_type,
6822                "status": incident.status,
6823                "duplicate_summary": duplicate_summary,
6824                "duplicate_matches": duplicate_matches,
6825            }),
6826        ));
6827        return Ok(incident);
6828    }
6829
6830    let draft = match state.submit_bug_monitor_draft(submission).await {
6831        Ok(draft) => draft,
6832        Err(error) => {
6833            incident.status = "draft_failed".to_string();
6834            incident.last_error = Some(truncate_text(&error.to_string(), 500));
6835            incident.updated_at_ms = now_ms();
6836            state.put_bug_monitor_incident(incident.clone()).await?;
6837            state.event_bus.publish(EngineEvent::new(
6838                "bug_monitor.incident.detected",
6839                serde_json::json!({
6840                    "incident_id": incident.incident_id,
6841                    "fingerprint": incident.fingerprint,
6842                    "eventType": incident.event_type,
6843                    "draft_id": incident.draft_id,
6844                    "triage_run_id": incident.triage_run_id,
6845                    "status": incident.status,
6846                    "detail": incident.last_error,
6847                }),
6848            ));
6849            return Ok(incident);
6850        }
6851    };
6852    incident.draft_id = Some(draft.draft_id.clone());
6853    incident.status = "draft_created".to_string();
6854    state.put_bug_monitor_incident(incident.clone()).await?;
6855
6856    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
6857        state.clone(),
6858        &draft.draft_id,
6859        true,
6860    )
6861    .await
6862    {
6863        Ok((updated_draft, _run_id, _deduped)) => {
6864            incident.triage_run_id = updated_draft.triage_run_id.clone();
6865            if incident.triage_run_id.is_some() {
6866                incident.status = "triage_queued".to_string();
6867            }
6868            incident.last_error = None;
6869        }
6870        Err(error) => {
6871            incident.status = "draft_created".to_string();
6872            incident.last_error = Some(truncate_text(&error.to_string(), 500));
6873        }
6874    }
6875
6876    if let Some(draft_id) = incident.draft_id.clone() {
6877        let latest_draft = state
6878            .get_bug_monitor_draft(&draft_id)
6879            .await
6880            .unwrap_or(draft.clone());
6881        match crate::bug_monitor_github::publish_draft(
6882            state,
6883            &draft_id,
6884            Some(&incident.incident_id),
6885            crate::bug_monitor_github::PublishMode::Auto,
6886        )
6887        .await
6888        {
6889            Ok(outcome) => {
6890                incident.status = outcome.action;
6891                incident.last_error = None;
6892            }
6893            Err(error) => {
6894                let detail = truncate_text(&error.to_string(), 500);
6895                incident.last_error = Some(detail.clone());
6896                let mut failed_draft = latest_draft;
6897                failed_draft.status = "github_post_failed".to_string();
6898                failed_draft.github_status = Some("github_post_failed".to_string());
6899                failed_draft.last_post_error = Some(detail.clone());
6900                let evidence_digest = failed_draft.evidence_digest.clone();
6901                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
6902                let _ = crate::bug_monitor_github::record_post_failure(
6903                    state,
6904                    &failed_draft,
6905                    Some(&incident.incident_id),
6906                    "auto_post",
6907                    evidence_digest.as_deref(),
6908                    &detail,
6909                )
6910                .await;
6911            }
6912        }
6913    }
6914
6915    incident.updated_at_ms = now_ms();
6916    state.put_bug_monitor_incident(incident.clone()).await?;
6917    state.event_bus.publish(EngineEvent::new(
6918        "bug_monitor.incident.detected",
6919        serde_json::json!({
6920            "incident_id": incident.incident_id,
6921            "fingerprint": incident.fingerprint,
6922            "eventType": incident.event_type,
6923            "draft_id": incident.draft_id,
6924            "triage_run_id": incident.triage_run_id,
6925            "status": incident.status,
6926        }),
6927    ));
6928    Ok(incident)
6929}
6930
6931pub fn sha256_hex(parts: &[&str]) -> String {
6932    let mut hasher = Sha256::new();
6933    for part in parts {
6934        hasher.update(part.as_bytes());
6935        hasher.update([0u8]);
6936    }
6937    format!("{:x}", hasher.finalize())
6938}
6939
6940fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
6941    matches!(status, AutomationRunStatus::Running)
6942}
6943
6944fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
6945    matches!(
6946        status,
6947        AutomationRunStatus::Running | AutomationRunStatus::Pausing
6948    )
6949}
6950
6951pub async fn run_routine_scheduler(state: AppState) {
6952    crate::app::tasks::run_routine_scheduler(state).await
6953}
6954
6955pub async fn run_routine_executor(state: AppState) {
6956    crate::app::tasks::run_routine_executor(state).await
6957}
6958
6959pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
6960    crate::app::routines::build_routine_prompt(state, run).await
6961}
6962
6963pub fn truncate_text(input: &str, max_len: usize) -> String {
6964    if input.len() <= max_len {
6965        return input.to_string();
6966    }
6967    let mut out = input[..max_len].to_string();
6968    out.push_str("...<truncated>");
6969    out
6970}
6971
6972pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
6973    crate::app::routines::append_configured_output_artifacts(state, run).await
6974}
6975
6976pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
6977    let provider_id = config
6978        .get("default_provider")
6979        .and_then(|v| v.as_str())
6980        .map(str::trim)
6981        .filter(|v| !v.is_empty())?;
6982    let model_id = config
6983        .get("providers")
6984        .and_then(|v| v.get(provider_id))
6985        .and_then(|v| v.get("default_model"))
6986        .and_then(|v| v.as_str())
6987        .map(str::trim)
6988        .filter(|v| !v.is_empty())?;
6989    Some(ModelSpec {
6990        provider_id: provider_id.to_string(),
6991        model_id: model_id.to_string(),
6992    })
6993}
6994
6995pub async fn resolve_routine_model_spec_for_run(
6996    state: &AppState,
6997    run: &RoutineRunRecord,
6998) -> (Option<ModelSpec>, String) {
6999    crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
7000}
7001
7002fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
7003    let mut out = Vec::new();
7004    let mut seen = std::collections::HashSet::new();
7005    for item in raw {
7006        let normalized = item.trim().to_string();
7007        if normalized.is_empty() {
7008            continue;
7009        }
7010        if seen.insert(normalized.clone()) {
7011            out.push(normalized);
7012        }
7013    }
7014    out
7015}
7016
7017#[cfg(not(feature = "browser"))]
7018impl AppState {
7019    pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
7020        0
7021    }
7022
7023    pub async fn close_all_browser_sessions(&self) -> usize {
7024        0
7025    }
7026
7027    pub async fn browser_status(&self) -> serde_json::Value {
7028        serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
7029    }
7030
7031    pub async fn browser_smoke_test(
7032        &self,
7033        _url: Option<String>,
7034    ) -> anyhow::Result<serde_json::Value> {
7035        anyhow::bail!("browser feature disabled")
7036    }
7037
7038    pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
7039        anyhow::bail!("browser feature disabled")
7040    }
7041
7042    pub async fn browser_health_summary(&self) -> serde_json::Value {
7043        serde_json::json!({ "enabled": false })
7044    }
7045}
7046
7047pub mod automation;
7048pub use automation::*;
7049
7050#[cfg(test)]
7051mod tests;