Skip to main content

tandem_server/app/state/app_state_impl_parts/
part01.rs

1#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
2struct BugMonitorLogWatcherStateFile {
3    #[serde(default)]
4    schema_version: u32,
5    #[serde(default)]
6    sources: std::collections::HashMap<String, BugMonitorLogSourceState>,
7}
8
9fn bug_monitor_log_source_state_key(project_id: &str, source_id: &str) -> String {
10    format!("{}/{}", project_id.trim(), source_id.trim())
11}
12
13fn is_slug_like(value: &str) -> bool {
14    !value.is_empty()
15        && value
16            .chars()
17            .all(|ch| ch.is_ascii_alphanumeric() || ch == '.' || ch == '_' || ch == '-')
18}
19
20async fn write_state_file_atomically(path: &PathBuf, payload: String) -> anyhow::Result<()> {
21    let tmp = path.with_extension("tmp");
22    fs::write(&tmp, payload).await?;
23    fs::rename(&tmp, path).await?;
24    Ok(())
25}
26
27async fn validate_bug_monitor_monitored_projects(
28    state: &AppState,
29    config: &mut BugMonitorConfig,
30) -> anyhow::Result<()> {
31    let needs_mcp_validation = config.monitored_projects.iter().any(|project| {
32        project
33            .mcp_server
34            .as_ref()
35            .is_some_and(|value| !value.trim().is_empty())
36    });
37    let servers = if needs_mcp_validation && state.is_ready() {
38        Some(state.mcp.list().await)
39    } else {
40        None
41    };
42    let mut project_ids = std::collections::HashSet::new();
43    for project in &mut config.monitored_projects {
44        project.project_id = project.project_id.trim().to_string();
45        project.name = project.name.trim().to_string();
46        project.repo = project.repo.trim().to_string();
47        project.workspace_root = project.workspace_root.trim().to_string();
48        project.mcp_server = project
49            .mcp_server
50            .as_ref()
51            .map(|value| value.trim().to_string())
52            .filter(|value| !value.is_empty());
53
54        if !is_slug_like(&project.project_id) {
55            anyhow::bail!("monitored project id must be ASCII slug-like");
56        }
57        if !project_ids.insert(project.project_id.clone()) {
58            anyhow::bail!("duplicate monitored project id `{}`", project.project_id);
59        }
60        if project.name.is_empty() {
61            anyhow::bail!(
62                "monitored project `{}` name is required",
63                project.project_id
64            );
65        }
66        if !is_valid_owner_repo_slug(&project.repo) {
67            anyhow::bail!(
68                "monitored project `{}` repo must be in owner/repo format",
69                project.project_id
70            );
71        }
72        crate::normalize_absolute_workspace_root(&project.workspace_root)
73            .map_err(anyhow::Error::msg)?;
74        if let Some(server) = project.mcp_server.as_ref() {
75            if let Some(servers) = servers.as_ref() {
76                if !servers.contains_key(server) {
77                    anyhow::bail!(
78                        "monitored project `{}` references unknown mcp server `{server}`",
79                        project.project_id
80                    );
81                }
82            } else if !state.is_ready() {
83                // Unit tests often validate config before runtime wiring exists.
84                // Runtime-backed MCP validation still runs in normal ready state.
85            } else {
86                anyhow::bail!(
87                    "monitored project `{}` references unknown mcp server `{server}`",
88                    project.project_id
89                );
90            }
91        }
92        if let Some(model_policy) = project.model_policy.as_ref() {
93            crate::http::routines_automations::validate_model_policy(model_policy)
94                .map_err(anyhow::Error::msg)?;
95        }
96
97        let mut source_ids = std::collections::HashSet::new();
98        for source in &mut project.log_sources {
99            source.source_id = source.source_id.trim().to_string();
100            source.path = source.path.trim().to_string();
101            if !is_slug_like(&source.source_id) {
102                anyhow::bail!(
103                    "log source id for monitored project `{}` must be ASCII slug-like",
104                    project.project_id
105                );
106            }
107            if !source_ids.insert(source.source_id.clone()) {
108                anyhow::bail!(
109                    "duplicate log source id `{}` in monitored project `{}`",
110                    source.source_id,
111                    project.project_id
112                );
113            }
114            if source.path.is_empty() {
115                anyhow::bail!(
116                    "log source `{}` in monitored project `{}` path is required",
117                    source.source_id,
118                    project.project_id
119                );
120            }
121            let path_project = BugMonitorMonitoredProject {
122                project_id: project.project_id.clone(),
123                name: project.name.clone(),
124                repo: project.repo.clone(),
125                workspace_root: project.workspace_root.clone(),
126                ..BugMonitorMonitoredProject::default()
127            };
128            crate::bug_monitor::log_watcher::resolve_log_source_path(&path_project, source)?;
129            source.watch_interval_seconds = source.watch_interval_seconds.clamp(1, 86_400);
130            source.max_bytes_per_poll = source.max_bytes_per_poll.clamp(1_024, 10 * 1024 * 1024);
131            source.max_candidates_per_poll = source.max_candidates_per_poll.clamp(1, 200);
132        }
133    }
134    Ok(())
135}
136
137impl AppState {
138    pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
139        #[cfg(feature = "premium-governance")]
140        let governance_engine: Arc<
141            dyn tandem_enterprise_contract::governance::GovernancePolicyEngine,
142        > = Arc::new(tandem_governance_engine::DefaultGovernanceEngine);
143        #[cfg(not(feature = "premium-governance"))]
144        let governance_engine: Arc<
145            dyn tandem_enterprise_contract::governance::GovernancePolicyEngine,
146        > = Arc::new(crate::app::state::governance::UnavailableGovernanceEngine);
147        Self {
148            runtime: Arc::new(OnceLock::new()),
149            startup: Arc::new(RwLock::new(StartupState {
150                status: StartupStatus::Starting,
151                phase: "boot".to_string(),
152                started_at_ms: now_ms(),
153                attempt_id,
154                last_error: None,
155            })),
156            in_process_mode: Arc::new(AtomicBool::new(in_process)),
157            api_token: Arc::new(RwLock::new(None)),
158            engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
159            managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
160            run_registry: RunRegistry::new(),
161            run_stale_ms: config::env::resolve_run_stale_ms(),
162            memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
163            memory_audit_log: Arc::new(RwLock::new(Vec::new())),
164            memory_audit_path: config::paths::resolve_memory_audit_path(),
165            protected_audit_path: config::paths::resolve_protected_audit_path(),
166            missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
167            shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
168            shared_resources_path: config::paths::resolve_shared_resources_path(),
169            routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
170            routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
171            routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
172            automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
173            channel_automation_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
174            automation_governance: Arc::new(RwLock::new(
175                crate::automation_v2::governance::GovernanceState::default(),
176            )),
177            governance_engine,
178            automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
179            automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
180                config::env::resolve_scheduler_max_concurrent_runs(),
181            ))),
182            automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
183            automations_v2_persistence: Arc::new(tokio::sync::Mutex::new(())),
184            workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
185            workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
186            workflow_planner_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
187            workflow_learning_candidates: Arc::new(RwLock::new(std::collections::HashMap::new())),
188            context_packs: Arc::new(RwLock::new(std::collections::HashMap::new())),
189            optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
190            optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
191            bug_monitor_config: Arc::new(
192                RwLock::new(config::env::resolve_bug_monitor_env_config()),
193            ),
194            bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
195            bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
196            bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
197            bug_monitor_log_watcher_state_path:
198                config::paths::resolve_bug_monitor_log_watcher_state_path(),
199            bug_monitor_log_source_states: Arc::new(RwLock::new(std::collections::HashMap::new())),
200            bug_monitor_log_watcher_status: Arc::new(RwLock::new(
201                BugMonitorLogWatcherStatus::default(),
202            )),
203            bug_monitor_log_evidence_dir: config::paths::resolve_bug_monitor_log_evidence_dir(),
204            bug_monitor_intake_keys: Arc::new(RwLock::new(std::collections::HashMap::new())),
205            bug_monitor_intake_keys_path: config::paths::resolve_bug_monitor_intake_keys_path(),
206            external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
207            bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
208            provider_oauth_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
209            mcp_oauth_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
210            workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
211            workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
212            workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
213            workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
214            routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
215            automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
216            automation_v2_session_mcp_servers: Arc::new(RwLock::new(
217                std::collections::HashMap::new(),
218            )),
219            routines_path: config::paths::resolve_routines_path(),
220            routine_history_path: config::paths::resolve_routine_history_path(),
221            routine_runs_path: config::paths::resolve_routine_runs_path(),
222            automations_v2_path: config::paths::resolve_automations_v2_path(),
223            channel_automation_drafts_path: config::paths::resolve_channel_automation_drafts_path(),
224            automation_governance_path: config::paths::resolve_automation_governance_path(),
225            automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
226            automation_v2_runs_archive_path: config::paths::resolve_automation_v2_runs_archive_path(
227            ),
228            optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
229            optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
230            bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
231            bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
232            bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
233            bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
234            external_actions_path: config::paths::resolve_external_actions_path(),
235            workflow_runs_path: config::paths::resolve_workflow_runs_path(),
236            workflow_planner_sessions_path: config::paths::resolve_workflow_planner_sessions_path(),
237            workflow_learning_candidates_path:
238                config::paths::resolve_workflow_learning_candidates_path(),
239            context_packs_path: config::paths::resolve_context_packs_path(),
240            workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
241            agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
242            web_ui_enabled: Arc::new(AtomicBool::new(false)),
243            web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
244            server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
245            channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
246            host_runtime_context: detect_host_runtime_context(),
247            token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
248            pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
249            capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
250            preset_registry: Arc::new(PresetRegistry::new(
251                PackManager::default_root(),
252                resolve_shared_paths()
253                    .map(|paths| paths.canonical_root)
254                    .unwrap_or_else(|_| {
255                        dirs::home_dir()
256                            .unwrap_or_else(|| PathBuf::from("."))
257                            .join(".tandem")
258                    }),
259            )),
260        }
261    }
262
263    pub fn is_ready(&self) -> bool {
264        self.runtime.get().is_some()
265    }
266
267    pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
268        for _ in 0..attempts {
269            let startup = self.startup_snapshot().await;
270            if matches!(startup.status, StartupStatus::Ready) {
271                return true;
272            }
273            if matches!(startup.status, StartupStatus::Failed) {
274                return false;
275            }
276            tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
277        }
278        matches!(self.startup_snapshot().await.status, StartupStatus::Ready)
279    }
280
281    pub fn mode_label(&self) -> &'static str {
282        if self.in_process_mode.load(Ordering::Relaxed) {
283            "in-process"
284        } else {
285            "sidecar"
286        }
287    }
288
289    pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
290        self.web_ui_enabled.store(enabled, Ordering::Relaxed);
291        if let Ok(mut guard) = self.web_ui_prefix.write() {
292            *guard = config::webui::normalize_web_ui_prefix(&prefix);
293        }
294    }
295
296    pub fn web_ui_enabled(&self) -> bool {
297        self.web_ui_enabled.load(Ordering::Relaxed)
298    }
299
300    pub fn web_ui_prefix(&self) -> String {
301        self.web_ui_prefix
302            .read()
303            .map(|v| v.clone())
304            .unwrap_or_else(|_| "/admin".to_string())
305    }
306
307    pub fn set_server_base_url(&self, base_url: String) {
308        if let Ok(mut guard) = self.server_base_url.write() {
309            *guard = base_url;
310        }
311    }
312
313    pub fn server_base_url(&self) -> String {
314        self.server_base_url
315            .read()
316            .map(|v| v.clone())
317            .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
318    }
319
320    pub async fn api_token(&self) -> Option<String> {
321        self.api_token.read().await.clone()
322    }
323
324    pub async fn set_api_token(&self, token: Option<String>) {
325        *self.api_token.write().await = token;
326    }
327
328    pub async fn startup_snapshot(&self) -> StartupSnapshot {
329        let state = self.startup.read().await.clone();
330        StartupSnapshot {
331            elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
332            status: state.status,
333            phase: state.phase,
334            started_at_ms: state.started_at_ms,
335            attempt_id: state.attempt_id,
336            last_error: state.last_error,
337        }
338    }
339
340    pub fn host_runtime_context(&self) -> HostRuntimeContext {
341        self.runtime
342            .get()
343            .map(|runtime| runtime.host_runtime_context.clone())
344            .unwrap_or_else(|| self.host_runtime_context.clone())
345    }
346
347    pub async fn set_phase(&self, phase: impl Into<String>) {
348        let mut startup = self.startup.write().await;
349        startup.phase = phase.into();
350    }
351
352    pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
353        self.runtime
354            .set(runtime)
355            .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
356        self.tools
357            .register_tool(
358                "pack_builder".to_string(),
359                Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
360            )
361            .await;
362        self.tools
363            .register_tool(
364                "mcp_list".to_string(),
365                Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
366            )
367            .await;
368        self.tools
369            .register_tool(
370                "mcp_list_catalog".to_string(),
371                Arc::new(crate::http::mcp_discovery::McpListCatalogTool::new(
372                    self.clone(),
373                )),
374            )
375            .await;
376        self.tools
377            .register_tool(
378                "mcp_request_capability".to_string(),
379                Arc::new(crate::http::mcp_discovery::McpRequestCapabilityTool::new(
380                    self.clone(),
381                )),
382            )
383            .await;
384        self.engine_loop
385            .set_spawn_agent_hook(std::sync::Arc::new(
386                crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
387            ))
388            .await;
389        self.engine_loop
390            .set_tool_policy_hook(std::sync::Arc::new(
391                crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
392            ))
393            .await;
394        self.engine_loop
395            .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
396                self.clone(),
397            )))
398            .await;
399        let _ = self.load_shared_resources().await;
400        self.load_routines().await?;
401        let _ = self.load_routine_history().await;
402        let _ = self.load_routine_runs().await;
403        self.load_automations_v2().await?;
404        let _ = self.load_channel_automation_drafts().await;
405        let _ = self.load_automation_governance().await;
406        let _ = self.bootstrap_automation_governance().await;
407        let _ = self.load_automation_v2_runs().await;
408        let _ = self.load_optimization_campaigns().await;
409        let _ = self.load_optimization_experiments().await;
410        let _ = self.load_bug_monitor_config().await;
411        let _ = self.load_bug_monitor_drafts().await;
412        let _ = self.load_bug_monitor_incidents().await;
413        let _ = self.load_bug_monitor_posts().await;
414        let _ = self.load_bug_monitor_log_watcher_state().await;
415        let _ = self.load_bug_monitor_intake_keys().await;
416        let _ = self.load_external_actions().await;
417        let _ = self.load_workflow_planner_sessions().await;
418        let _ = self.load_workflow_learning_candidates().await;
419        let _ = self.load_context_packs().await;
420        let _ = self.load_workflow_runs().await;
421        let _ = self.load_workflow_hook_overrides().await;
422        let _ = self.reload_workflows().await;
423        let workspace_root = self.workspace_index.snapshot().await.root;
424        let _ = self
425            .agent_teams
426            .ensure_loaded_for_workspace(&workspace_root)
427            .await;
428        let mut startup = self.startup.write().await;
429        startup.status = StartupStatus::Ready;
430        startup.phase = "ready".to_string();
431        startup.last_error = None;
432        drop(startup);
433        #[cfg(feature = "browser")]
434        {
435            let state = self.clone();
436            tokio::spawn(async move {
437                if let Err(err) = state.register_browser_tools().await {
438                    tracing::warn!("browser tool registration skipped: {}", err);
439                }
440            });
441        }
442        Ok(())
443    }
444
445    pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
446        let mut startup = self.startup.write().await;
447        startup.status = StartupStatus::Failed;
448        startup.phase = phase.into();
449        startup.last_error = Some(error.into());
450    }
451
452    pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
453        let runtime = self.channels_runtime.lock().await;
454        let mut status = runtime.statuses.clone();
455        let diagnostics = runtime.diagnostics.read().await;
456        for spec in registered_channels() {
457            let entry = status
458                .entry(spec.name.to_string())
459                .or_insert(ChannelStatus {
460                    enabled: false,
461                    connected: false,
462                    last_error: None,
463                    active_sessions: 0,
464                    meta: json!({}),
465                });
466            let mut meta = entry.meta.as_object().cloned().unwrap_or_default();
467            if let Some(diag) = diagnostics.get(spec.name) {
468                entry.last_error = diag.last_error.clone().or_else(|| entry.last_error.clone());
469                meta.insert("state".to_string(), Value::String(diag.state.to_string()));
470                meta.insert(
471                    "last_error_code".to_string(),
472                    diag.last_error_code
473                        .map(|code| Value::String(code.to_string()))
474                        .unwrap_or(Value::Null),
475                );
476                meta.insert(
477                    "last_reconnect_at".to_string(),
478                    diag.last_reconnect_at
479                        .map(|value| Value::Number(value.into()))
480                        .unwrap_or(Value::Null),
481                );
482                meta.insert(
483                    "listener_start_count".to_string(),
484                    Value::Number(serde_json::Number::from(diag.listener_start_count)),
485                );
486            } else {
487                meta.insert("state".to_string(), Value::String("stopped".to_string()));
488                meta.insert("last_error_code".to_string(), Value::Null);
489                meta.insert("last_reconnect_at".to_string(), Value::Null);
490                meta.insert(
491                    "listener_start_count".to_string(),
492                    Value::Number(0u64.into()),
493                );
494            }
495            entry.meta = Value::Object(meta);
496        }
497        status
498    }
499
500    pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
501        let effective = self.config.get_effective_value().await;
502        let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
503        self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
504
505        let diagnostics = tandem_channels::new_channel_runtime_diagnostics();
506
507        let mut runtime = self.channels_runtime.lock().await;
508        if let Some(listeners) = runtime.listeners.as_mut() {
509            listeners.abort_all();
510        }
511        runtime.listeners = None;
512        runtime.diagnostics = diagnostics.clone();
513        runtime.statuses.clear();
514        let channels_config_value = serde_json::to_value(&parsed.channels)
515            .ok()
516            .and_then(|channels| channels.as_object().cloned());
517
518        let mut status_map = std::collections::HashMap::new();
519        for spec in registered_channels() {
520            let enabled = channels_config_value
521                .as_ref()
522                .and_then(|channels| channels.get(spec.config_key))
523                .and_then(Value::as_object)
524                .is_some();
525            status_map.insert(
526                spec.name.to_string(),
527                ChannelStatus {
528                    enabled,
529                    connected: false,
530                    last_error: None,
531                    active_sessions: 0,
532                    meta: serde_json::json!({}),
533                },
534            );
535        }
536
537        if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
538            let listeners = tandem_channels::start_channel_listeners_with_diagnostics(
539                channels_cfg,
540                diagnostics.clone(),
541            )
542            .await;
543            runtime.listeners = Some(listeners);
544            for status in status_map.values_mut() {
545                if status.enabled {
546                    status.connected = true;
547                }
548            }
549        }
550
551        runtime.statuses = status_map.clone();
552        drop(runtime);
553
554        self.event_bus.publish(EngineEvent::new(
555            "channel.status.changed",
556            serde_json::json!({ "channels": status_map }),
557        ));
558        Ok(())
559    }
560
561    pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
562        let Some(raw) =
563            read_state_file_with_legacy(&self.shared_resources_path, "shared_resources.json")
564                .await?
565        else {
566            return Ok(());
567        };
568        let parsed =
569            serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
570                .unwrap_or_default();
571        let mut guard = self.shared_resources.write().await;
572        *guard = parsed;
573        Ok(())
574    }
575
576    pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
577        if let Some(parent) = self.shared_resources_path.parent() {
578            fs::create_dir_all(parent).await?;
579        }
580        let payload = {
581            let guard = self.shared_resources.read().await;
582            serde_json::to_string_pretty(&*guard)?
583        };
584        fs::write(&self.shared_resources_path, payload).await?;
585        Ok(())
586    }
587
588    pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
589        self.shared_resources.read().await.get(key).cloned()
590    }
591
592    pub async fn list_shared_resources(
593        &self,
594        prefix: Option<&str>,
595        limit: usize,
596    ) -> Vec<SharedResourceRecord> {
597        let limit = limit.clamp(1, 500);
598        let mut rows = self
599            .shared_resources
600            .read()
601            .await
602            .values()
603            .filter(|record| {
604                if let Some(prefix) = prefix {
605                    record.key.starts_with(prefix)
606                } else {
607                    true
608                }
609            })
610            .cloned()
611            .collect::<Vec<_>>();
612        rows.sort_by(|a, b| a.key.cmp(&b.key));
613        rows.truncate(limit);
614        rows
615    }
616
617    pub async fn put_shared_resource(
618        &self,
619        key: String,
620        value: Value,
621        if_match_rev: Option<u64>,
622        updated_by: String,
623        ttl_ms: Option<u64>,
624    ) -> Result<SharedResourceRecord, ResourceStoreError> {
625        if !is_valid_resource_key(&key) {
626            return Err(ResourceStoreError::InvalidKey { key });
627        }
628
629        let now = now_ms();
630        let mut guard = self.shared_resources.write().await;
631        let existing = guard.get(&key).cloned();
632
633        if let Some(expected) = if_match_rev {
634            let current = existing.as_ref().map(|row| row.rev);
635            if current != Some(expected) {
636                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
637                    key,
638                    expected_rev: Some(expected),
639                    current_rev: current,
640                }));
641            }
642        }
643
644        let next_rev = existing
645            .as_ref()
646            .map(|row| row.rev.saturating_add(1))
647            .unwrap_or(1);
648
649        let record = SharedResourceRecord {
650            key: key.clone(),
651            value,
652            rev: next_rev,
653            updated_at_ms: now,
654            updated_by,
655            ttl_ms,
656        };
657
658        let previous = guard.insert(key.clone(), record.clone());
659        drop(guard);
660
661        if let Err(error) = self.persist_shared_resources().await {
662            let mut rollback = self.shared_resources.write().await;
663            if let Some(previous) = previous {
664                rollback.insert(key, previous);
665            } else {
666                rollback.remove(&key);
667            }
668            return Err(ResourceStoreError::PersistFailed {
669                message: error.to_string(),
670            });
671        }
672
673        Ok(record)
674    }
675
676    pub async fn delete_shared_resource(
677        &self,
678        key: &str,
679        if_match_rev: Option<u64>,
680    ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
681        if !is_valid_resource_key(key) {
682            return Err(ResourceStoreError::InvalidKey {
683                key: key.to_string(),
684            });
685        }
686
687        let mut guard = self.shared_resources.write().await;
688        let current = guard.get(key).cloned();
689        if let Some(expected) = if_match_rev {
690            let current_rev = current.as_ref().map(|row| row.rev);
691            if current_rev != Some(expected) {
692                return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
693                    key: key.to_string(),
694                    expected_rev: Some(expected),
695                    current_rev,
696                }));
697            }
698        }
699
700        let removed = guard.remove(key);
701        drop(guard);
702
703        if let Err(error) = self.persist_shared_resources().await {
704            if let Some(record) = removed.clone() {
705                self.shared_resources
706                    .write()
707                    .await
708                    .insert(record.key.clone(), record);
709            }
710            return Err(ResourceStoreError::PersistFailed {
711                message: error.to_string(),
712            });
713        }
714
715        Ok(removed)
716    }
717
718    pub async fn load_routines(&self) -> anyhow::Result<()> {
719        let Some(raw) = read_state_file_with_legacy(&self.routines_path, "routines.json").await?
720        else {
721            return Ok(());
722        };
723        match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
724            Ok(parsed) => {
725                let mut guard = self.routines.write().await;
726                *guard = parsed;
727                Ok(())
728            }
729            Err(primary_err) => {
730                let backup_path = config::paths::sibling_backup_path(&self.routines_path);
731                if backup_path.exists() {
732                    let backup_raw = fs::read_to_string(&backup_path).await?;
733                    if let Ok(parsed_backup) = serde_json::from_str::<
734                        std::collections::HashMap<String, RoutineSpec>,
735                    >(&backup_raw)
736                    {
737                        let mut guard = self.routines.write().await;
738                        *guard = parsed_backup;
739                        return Ok(());
740                    }
741                }
742                Err(anyhow::anyhow!(
743                    "failed to parse routines store {}: {primary_err}",
744                    self.routines_path.display()
745                ))
746            }
747        }
748    }
749
750    pub async fn load_routine_history(&self) -> anyhow::Result<()> {
751        if !self.routine_history_path.exists() {
752            return Ok(());
753        }
754        let raw = fs::read_to_string(&self.routine_history_path).await?;
755        let parsed = serde_json::from_str::<
756            std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
757        >(&raw)
758        .unwrap_or_default();
759        let mut guard = self.routine_history.write().await;
760        *guard = parsed;
761        Ok(())
762    }
763
764    pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
765        let Some(raw) =
766            read_state_file_with_legacy(&self.routine_runs_path, "routine_runs.json").await?
767        else {
768            return Ok(());
769        };
770        let parsed =
771            serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
772                .unwrap_or_default();
773        let mut guard = self.routine_runs.write().await;
774        *guard = parsed;
775        Ok(())
776    }
777
778    async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
779        if let Some(parent) = self.routines_path.parent() {
780            fs::create_dir_all(parent).await?;
781        }
782        let (payload, is_empty) = {
783            let guard = self.routines.read().await;
784            (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
785        };
786        if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
787            let existing_raw = fs::read_to_string(&self.routines_path)
788                .await
789                .unwrap_or_default();
790            let existing_has_rows = serde_json::from_str::<
791                std::collections::HashMap<String, RoutineSpec>,
792            >(&existing_raw)
793            .map(|rows| !rows.is_empty())
794            .unwrap_or(true);
795            if existing_has_rows {
796                return Err(anyhow::anyhow!(
797                    "refusing to overwrite non-empty routines store {} with empty in-memory state",
798                    self.routines_path.display()
799                ));
800            }
801        }
802        let backup_path = config::paths::sibling_backup_path(&self.routines_path);
803        if self.routines_path.exists() {
804            let _ = fs::copy(&self.routines_path, &backup_path).await;
805        }
806        let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
807        fs::write(&tmp_path, payload).await?;
808        fs::rename(&tmp_path, &self.routines_path).await?;
809        Ok(())
810    }
811
812    pub async fn persist_routines(&self) -> anyhow::Result<()> {
813        self.persist_routines_inner(false).await
814    }
815
816    pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
817        if let Some(parent) = self.routine_history_path.parent() {
818            fs::create_dir_all(parent).await?;
819        }
820        let payload = {
821            let guard = self.routine_history.read().await;
822            serde_json::to_string_pretty(&*guard)?
823        };
824        fs::write(&self.routine_history_path, payload).await?;
825        Ok(())
826    }
827
828    pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
829        if let Some(parent) = self.routine_runs_path.parent() {
830            fs::create_dir_all(parent).await?;
831        }
832        let payload = {
833            let guard = self.routine_runs.read().await;
834            serde_json::to_string_pretty(&*guard)?
835        };
836        fs::write(&self.routine_runs_path, payload).await?;
837        Ok(())
838    }
839
840    pub async fn put_routine(
841        &self,
842        mut routine: RoutineSpec,
843    ) -> Result<RoutineSpec, RoutineStoreError> {
844        if routine.routine_id.trim().is_empty() {
845            return Err(RoutineStoreError::InvalidRoutineId {
846                routine_id: routine.routine_id,
847            });
848        }
849
850        routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
851        routine.output_targets = normalize_non_empty_list(routine.output_targets);
852
853        let now = now_ms();
854        let next_schedule_fire =
855            compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
856                .ok_or_else(|| RoutineStoreError::InvalidSchedule {
857                    detail: "invalid schedule or timezone".to_string(),
858                })?;
859        match routine.schedule {
860            RoutineSchedule::IntervalSeconds { seconds } => {
861                if seconds == 0 {
862                    return Err(RoutineStoreError::InvalidSchedule {
863                        detail: "interval_seconds must be > 0".to_string(),
864                    });
865                }
866                let _ = seconds;
867            }
868            RoutineSchedule::Cron { .. } => {}
869        }
870        if routine.next_fire_at_ms.is_none() {
871            routine.next_fire_at_ms = Some(next_schedule_fire);
872        }
873
874        let mut guard = self.routines.write().await;
875        let previous = guard.insert(routine.routine_id.clone(), routine.clone());
876        drop(guard);
877
878        if let Err(error) = self.persist_routines().await {
879            let mut rollback = self.routines.write().await;
880            if let Some(previous) = previous {
881                rollback.insert(previous.routine_id.clone(), previous);
882            } else {
883                rollback.remove(&routine.routine_id);
884            }
885            return Err(RoutineStoreError::PersistFailed {
886                message: error.to_string(),
887            });
888        }
889
890        Ok(routine)
891    }
892
893    pub async fn list_routines(&self) -> Vec<RoutineSpec> {
894        let mut rows = self
895            .routines
896            .read()
897            .await
898            .values()
899            .cloned()
900            .collect::<Vec<_>>();
901        rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
902        rows
903    }
904
905    pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
906        self.routines.read().await.get(routine_id).cloned()
907    }
908
909    pub async fn delete_routine(
910        &self,
911        routine_id: &str,
912    ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
913        let mut guard = self.routines.write().await;
914        let removed = guard.remove(routine_id);
915        drop(guard);
916
917        let allow_empty_overwrite = self.routines.read().await.is_empty();
918        if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
919            if let Some(removed) = removed.clone() {
920                self.routines
921                    .write()
922                    .await
923                    .insert(removed.routine_id.clone(), removed);
924            }
925            return Err(RoutineStoreError::PersistFailed {
926                message: error.to_string(),
927            });
928        }
929        Ok(removed)
930    }
931
932    pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
933        let mut plans = Vec::new();
934        let mut guard = self.routines.write().await;
935        for routine in guard.values_mut() {
936            if routine.status != RoutineStatus::Active {
937                continue;
938            }
939            let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
940                continue;
941            };
942            if now_ms < next_fire_at_ms {
943                continue;
944            }
945            let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
946                now_ms,
947                next_fire_at_ms,
948                &routine.schedule,
949                &routine.timezone,
950                &routine.misfire_policy,
951            );
952            routine.next_fire_at_ms = Some(next_fire_at_ms);
953            if run_count == 0 {
954                continue;
955            }
956            plans.push(RoutineTriggerPlan {
957                routine_id: routine.routine_id.clone(),
958                run_count,
959                scheduled_at_ms: now_ms,
960                next_fire_at_ms,
961            });
962        }
963        drop(guard);
964        let _ = self.persist_routines().await;
965        plans
966    }
967
968    pub async fn mark_routine_fired(
969        &self,
970        routine_id: &str,
971        fired_at_ms: u64,
972    ) -> Option<RoutineSpec> {
973        let mut guard = self.routines.write().await;
974        let routine = guard.get_mut(routine_id)?;
975        routine.last_fired_at_ms = Some(fired_at_ms);
976        let updated = routine.clone();
977        drop(guard);
978        let _ = self.persist_routines().await;
979        Some(updated)
980    }
981
982    pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
983        let mut history = self.routine_history.write().await;
984        history
985            .entry(event.routine_id.clone())
986            .or_default()
987            .push(event);
988        drop(history);
989        let _ = self.persist_routine_history().await;
990    }
991
992    pub async fn list_routine_history(
993        &self,
994        routine_id: &str,
995        limit: usize,
996    ) -> Vec<RoutineHistoryEvent> {
997        let limit = limit.clamp(1, 500);
998        let mut rows = self
999            .routine_history
1000            .read()
1001            .await
1002            .get(routine_id)
1003            .cloned()
1004            .unwrap_or_default();
1005        rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1006        rows.truncate(limit);
1007        rows
1008    }
1009
1010    pub async fn create_routine_run(
1011        &self,
1012        routine: &RoutineSpec,
1013        trigger_type: &str,
1014        run_count: u32,
1015        status: RoutineRunStatus,
1016        detail: Option<String>,
1017    ) -> RoutineRunRecord {
1018        let now = now_ms();
1019        let record = RoutineRunRecord {
1020            run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1021            routine_id: routine.routine_id.clone(),
1022            trigger_type: trigger_type.to_string(),
1023            run_count,
1024            status,
1025            created_at_ms: now,
1026            updated_at_ms: now,
1027            fired_at_ms: Some(now),
1028            started_at_ms: None,
1029            finished_at_ms: None,
1030            requires_approval: routine.requires_approval,
1031            approval_reason: None,
1032            denial_reason: None,
1033            paused_reason: None,
1034            detail,
1035            entrypoint: routine.entrypoint.clone(),
1036            args: routine.args.clone(),
1037            allowed_tools: routine.allowed_tools.clone(),
1038            output_targets: routine.output_targets.clone(),
1039            artifacts: Vec::new(),
1040            active_session_ids: Vec::new(),
1041            latest_session_id: None,
1042            prompt_tokens: 0,
1043            completion_tokens: 0,
1044            total_tokens: 0,
1045            estimated_cost_usd: 0.0,
1046        };
1047        self.routine_runs
1048            .write()
1049            .await
1050            .insert(record.run_id.clone(), record.clone());
1051        let _ = self.persist_routine_runs().await;
1052        record
1053    }
1054
1055    pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1056        self.routine_runs.read().await.get(run_id).cloned()
1057    }
1058
1059    pub async fn list_routine_runs(
1060        &self,
1061        routine_id: Option<&str>,
1062        limit: usize,
1063    ) -> Vec<RoutineRunRecord> {
1064        let mut rows = self
1065            .routine_runs
1066            .read()
1067            .await
1068            .values()
1069            .filter(|row| {
1070                if let Some(id) = routine_id {
1071                    row.routine_id == id
1072                } else {
1073                    true
1074                }
1075            })
1076            .cloned()
1077            .collect::<Vec<_>>();
1078        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1079        rows.truncate(limit.clamp(1, 500));
1080        rows
1081    }
1082
1083    pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1084        let mut guard = self.routine_runs.write().await;
1085        let next_run_id = guard
1086            .values()
1087            .filter(|row| row.status == RoutineRunStatus::Queued)
1088            .min_by(|a, b| {
1089                a.created_at_ms
1090                    .cmp(&b.created_at_ms)
1091                    .then_with(|| a.run_id.cmp(&b.run_id))
1092            })
1093            .map(|row| row.run_id.clone())?;
1094        let now = now_ms();
1095        let row = guard.get_mut(&next_run_id)?;
1096        row.status = RoutineRunStatus::Running;
1097        row.updated_at_ms = now;
1098        row.started_at_ms = Some(now);
1099        let claimed = row.clone();
1100        drop(guard);
1101        let _ = self.persist_routine_runs().await;
1102        Some(claimed)
1103    }
1104
1105    pub async fn set_routine_session_policy(
1106        &self,
1107        session_id: String,
1108        run_id: String,
1109        routine_id: String,
1110        allowed_tools: Vec<String>,
1111    ) {
1112        let policy = RoutineSessionPolicy {
1113            session_id: session_id.clone(),
1114            run_id,
1115            routine_id,
1116            allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1117        };
1118        self.routine_session_policies
1119            .write()
1120            .await
1121            .insert(session_id, policy);
1122    }
1123
1124    pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1125        self.routine_session_policies
1126            .read()
1127            .await
1128            .get(session_id)
1129            .cloned()
1130    }
1131
1132    pub async fn clear_routine_session_policy(&self, session_id: &str) {
1133        self.routine_session_policies
1134            .write()
1135            .await
1136            .remove(session_id);
1137    }
1138
1139    pub async fn update_routine_run_status(
1140        &self,
1141        run_id: &str,
1142        status: RoutineRunStatus,
1143        reason: Option<String>,
1144    ) -> Option<RoutineRunRecord> {
1145        let mut guard = self.routine_runs.write().await;
1146        let row = guard.get_mut(run_id)?;
1147        row.status = status.clone();
1148        row.updated_at_ms = now_ms();
1149        match status {
1150            RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1151            RoutineRunStatus::Running => {
1152                row.started_at_ms.get_or_insert_with(now_ms);
1153                if let Some(detail) = reason {
1154                    row.detail = Some(detail);
1155                }
1156            }
1157            RoutineRunStatus::Denied => row.denial_reason = reason,
1158            RoutineRunStatus::Paused => row.paused_reason = reason,
1159            RoutineRunStatus::Completed
1160            | RoutineRunStatus::Failed
1161            | RoutineRunStatus::Cancelled => {
1162                row.finished_at_ms = Some(now_ms());
1163                if let Some(detail) = reason {
1164                    row.detail = Some(detail);
1165                }
1166            }
1167            _ => {
1168                if let Some(detail) = reason {
1169                    row.detail = Some(detail);
1170                }
1171            }
1172        }
1173        let updated = row.clone();
1174        drop(guard);
1175        let _ = self.persist_routine_runs().await;
1176        Some(updated)
1177    }
1178
1179    pub async fn append_routine_run_artifact(
1180        &self,
1181        run_id: &str,
1182        artifact: RoutineRunArtifact,
1183    ) -> Option<RoutineRunRecord> {
1184        let mut guard = self.routine_runs.write().await;
1185        let row = guard.get_mut(run_id)?;
1186        row.updated_at_ms = now_ms();
1187        row.artifacts.push(artifact);
1188        let updated = row.clone();
1189        drop(guard);
1190        let _ = self.persist_routine_runs().await;
1191        Some(updated)
1192    }
1193
1194    pub async fn add_active_session_id(
1195        &self,
1196        run_id: &str,
1197        session_id: String,
1198    ) -> Option<RoutineRunRecord> {
1199        let mut guard = self.routine_runs.write().await;
1200        let row = guard.get_mut(run_id)?;
1201        if !row.active_session_ids.iter().any(|id| id == &session_id) {
1202            row.active_session_ids.push(session_id);
1203        }
1204        row.latest_session_id = row.active_session_ids.last().cloned();
1205        row.updated_at_ms = now_ms();
1206        let updated = row.clone();
1207        drop(guard);
1208        let _ = self.persist_routine_runs().await;
1209        Some(updated)
1210    }
1211
1212    pub async fn clear_active_session_id(
1213        &self,
1214        run_id: &str,
1215        session_id: &str,
1216    ) -> Option<RoutineRunRecord> {
1217        let mut guard = self.routine_runs.write().await;
1218        let row = guard.get_mut(run_id)?;
1219        row.active_session_ids.retain(|id| id != session_id);
1220        row.updated_at_ms = now_ms();
1221        let updated = row.clone();
1222        drop(guard);
1223        let _ = self.persist_routine_runs().await;
1224        Some(updated)
1225    }
1226
1227    pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1228        let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1229        let mut loaded_from_alternate = false;
1230        let mut migrated = false;
1231        let mut path_counts = Vec::new();
1232        let mut canonical_loaded = false;
1233        if self.automations_v2_path.exists() {
1234            let raw = fs::read_to_string(&self.automations_v2_path).await?;
1235            if raw.trim().is_empty() || raw.trim() == "{}" {
1236                path_counts.push((self.automations_v2_path.clone(), 0usize));
1237            } else {
1238                let parsed = parse_automation_v2_file(&raw);
1239                path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1240                canonical_loaded = !parsed.is_empty();
1241                merged = parsed;
1242            }
1243        } else {
1244            path_counts.push((self.automations_v2_path.clone(), 0usize));
1245        }
1246        if !canonical_loaded {
1247            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1248                if path == self.automations_v2_path {
1249                    continue;
1250                }
1251                if !path.exists() {
1252                    path_counts.push((path, 0usize));
1253                    continue;
1254                }
1255                let raw = fs::read_to_string(&path).await?;
1256                if raw.trim().is_empty() || raw.trim() == "{}" {
1257                    path_counts.push((path, 0usize));
1258                    continue;
1259                }
1260                let parsed = parse_automation_v2_file(&raw);
1261                path_counts.push((path.clone(), parsed.len()));
1262                if !parsed.is_empty() {
1263                    loaded_from_alternate = true;
1264                }
1265                for (automation_id, automation) in parsed {
1266                    match merged.get(&automation_id) {
1267                        Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1268                        _ => {
1269                            merged.insert(automation_id, automation);
1270                        }
1271                    }
1272                }
1273            }
1274        } else {
1275            for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1276                if path == self.automations_v2_path {
1277                    continue;
1278                }
1279                if !path.exists() {
1280                    path_counts.push((path, 0usize));
1281                    continue;
1282                }
1283                let raw = fs::read_to_string(&path).await?;
1284                let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1285                    0usize
1286                } else {
1287                    parse_automation_v2_file(&raw).len()
1288                };
1289                path_counts.push((path, count));
1290            }
1291        }
1292        let active_path = self.automations_v2_path.display().to_string();
1293        let path_count_summary = path_counts
1294            .iter()
1295            .map(|(path, count)| format!("{}={count}", path.display()))
1296            .collect::<Vec<_>>();
1297        tracing::info!(
1298            active_path,
1299            canonical_loaded,
1300            path_counts = ?path_count_summary,
1301            merged_count = merged.len(),
1302            "loaded automation v2 definitions"
1303        );
1304        for automation in merged.values_mut() {
1305            migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1306            migrated = canonicalize_automation_output_paths(automation) || migrated;
1307            migrated = repair_automation_output_contracts(automation) || migrated;
1308        }
1309        *self.automations_v2.write().await = merged;
1310        if loaded_from_alternate || migrated {
1311            let _ = self.persist_automations_v2().await;
1312        } else if canonical_loaded {
1313            let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1314        }
1315        Ok(())
1316    }
1317
1318    pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1319        let _guard = self.automations_v2_persistence.lock().await;
1320        self.persist_automations_v2_locked().await
1321    }
1322
1323    async fn persist_automations_v2_locked(&self) -> anyhow::Result<()> {
1324        let payload = {
1325            let guard = self.automations_v2.read().await;
1326            serde_json::to_string_pretty(&*guard)?
1327        };
1328        if let Some(parent) = self.automations_v2_path.parent() {
1329            fs::create_dir_all(parent).await?;
1330        }
1331        write_string_atomic(&self.automations_v2_path, &payload).await?;
1332        let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1333        Ok(())
1334    }
1335
1336    pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1337        let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1338        let mut loaded_from_alternate = false;
1339        let mut canonical_loaded = false;
1340        let mut path_counts = Vec::new();
1341        if self.automation_v2_runs_path.exists() {
1342            let raw = fs::read_to_string(&self.automation_v2_runs_path).await?;
1343            if raw.trim().is_empty() || raw.trim() == "{}" {
1344                path_counts.push((self.automation_v2_runs_path.clone(), 0usize));
1345            } else {
1346                let parsed = parse_automation_v2_runs_file(&raw);
1347                path_counts.push((self.automation_v2_runs_path.clone(), parsed.len()));
1348                canonical_loaded = !parsed.is_empty();
1349                merged = parsed;
1350            }
1351        } else {
1352            path_counts.push((self.automation_v2_runs_path.clone(), 0usize));
1353        }
1354        if !canonical_loaded {
1355            for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1356                if path == self.automation_v2_runs_path {
1357                    continue;
1358                }
1359                if !path.exists() {
1360                    path_counts.push((path, 0usize));
1361                    continue;
1362                }
1363                let raw = fs::read_to_string(&path).await?;
1364                if raw.trim().is_empty() || raw.trim() == "{}" {
1365                    path_counts.push((path, 0usize));
1366                    continue;
1367                }
1368                let parsed = parse_automation_v2_runs_file(&raw);
1369                path_counts.push((path.clone(), parsed.len()));
1370                if !parsed.is_empty() {
1371                    loaded_from_alternate = true;
1372                }
1373                for (run_id, run) in parsed {
1374                    match merged.get(&run_id) {
1375                        Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1376                        _ => {
1377                            merged.insert(run_id, run);
1378                        }
1379                    }
1380                }
1381            }
1382        } else {
1383            for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1384                if path == self.automation_v2_runs_path {
1385                    continue;
1386                }
1387                path_counts.push((path.clone(), usize::from(path.exists())));
1388            }
1389        }
1390        let active_runs_path = self.automation_v2_runs_path.display().to_string();
1391        let run_path_count_summary = path_counts
1392            .iter()
1393            .map(|(path, count)| format!("{}={count}", path.display()))
1394            .collect::<Vec<_>>();
1395        tracing::info!(
1396            active_path = active_runs_path,
1397            canonical_loaded,
1398            path_counts = ?run_path_count_summary,
1399            merged_count = merged.len(),
1400            "loaded automation v2 runs"
1401        );
1402        *self.automation_v2_runs.write().await = merged;
1403        let recovered = self
1404            .recover_automation_definitions_from_run_snapshots()
1405            .await?;
1406        let automation_count = self.automations_v2.read().await.len();
1407        let run_count = self.automation_v2_runs.read().await.len();
1408        if automation_count == 0 && run_count > 0 {
1409            let active_automations_path = self.automations_v2_path.display().to_string();
1410            let active_runs_path = self.automation_v2_runs_path.display().to_string();
1411            tracing::warn!(
1412                active_automations_path,
1413                active_runs_path,
1414                run_count,
1415                "automation v2 definitions are empty while run history exists"
1416            );
1417        }
1418        if loaded_from_alternate || recovered > 0 {
1419            let _ = self.persist_automation_v2_runs().await;
1420        } else if canonical_loaded {
1421            let _ =
1422                cleanup_stale_legacy_automation_v2_runs_file(&self.automation_v2_runs_path).await;
1423        }
1424        Ok(())
1425    }
1426
1427    pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1428        let (runs_snapshot, automations_snapshot) = {
1429            let runs = self.automation_v2_runs.read().await;
1430            let automations = self.automations_v2.read().await;
1431            (runs.clone(), automations.clone())
1432        };
1433        for run in runs_snapshot.values() {
1434            write_automation_v2_run_history_shard(&self.automation_v2_runs_path, run).await?;
1435        }
1436        let mut compacted = runs_snapshot;
1437        compact_automation_v2_runs_for_hot_storage(
1438            &mut compacted,
1439            &automations_snapshot,
1440            automation_v2_hot_cutoff_ms(),
1441        );
1442        let payload = serde_json::to_string_pretty(&compacted)?;
1443        if let Some(parent) = self.automation_v2_runs_path.parent() {
1444            fs::create_dir_all(parent).await?;
1445        }
1446        fs::write(&self.automation_v2_runs_path, &payload).await?;
1447        let _ = cleanup_stale_legacy_automation_v2_runs_file(&self.automation_v2_runs_path).await;
1448        Ok(())
1449    }
1450
1451    // Move old terminal automation runs out of the hot in-memory/index set.
1452    // Full run records are preserved as per-run history shards under
1453    // data/automation-runs/YYYY/MM/.
1454    pub async fn archive_stale_automation_v2_runs(
1455        &self,
1456        retention_days: u64,
1457    ) -> anyhow::Result<usize> {
1458        let cutoff_ms = {
1459            let now = now_ms();
1460            let window = retention_days.saturating_mul(24 * 60 * 60 * 1000);
1461            now.saturating_sub(window)
1462        };
1463        let archived: std::collections::HashMap<String, AutomationV2RunRecord> = {
1464            let mut guard = self.automation_v2_runs.write().await;
1465            let stale_ids: Vec<String> = guard
1466                .iter()
1467                .filter(|(_, run)| {
1468                    matches!(
1469                        run.status,
1470                        AutomationRunStatus::Completed
1471                            | AutomationRunStatus::Failed
1472                            | AutomationRunStatus::Blocked
1473                            | AutomationRunStatus::Cancelled
1474                    ) && run.updated_at_ms <= cutoff_ms
1475                })
1476                .map(|(id, _)| id.clone())
1477                .collect();
1478            let mut archived = std::collections::HashMap::new();
1479            for id in &stale_ids {
1480                if let Some(run) = guard.remove(id) {
1481                    archived.insert(id.clone(), run);
1482                }
1483            }
1484            archived
1485        };
1486        if archived.is_empty() {
1487            return Ok(0);
1488        }
1489        let archived_count = archived.len();
1490        for run in archived.values() {
1491            write_automation_v2_run_history_shard(&self.automation_v2_runs_path, run).await?;
1492        }
1493
1494        // Persist the shrunk hot file so the next startup loads a small map.
1495        self.persist_automation_v2_runs().await?;
1496
1497        tracing::info!(
1498            archived = archived_count,
1499            retention_days,
1500            history_root = %automation_v2_run_history_root(&self.automation_v2_runs_path).display(),
1501            "moved stale automation v2 runs to history shards"
1502        );
1503        Ok(archived_count)
1504    }
1505
1506    pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1507        if !self.optimization_campaigns_path.exists() {
1508            return Ok(());
1509        }
1510        let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1511        let parsed = parse_optimization_campaigns_file(&raw);
1512        *self.optimization_campaigns.write().await = parsed;
1513        Ok(())
1514    }
1515
1516    pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1517        let payload = {
1518            let guard = self.optimization_campaigns.read().await;
1519            serde_json::to_string_pretty(&*guard)?
1520        };
1521        if let Some(parent) = self.optimization_campaigns_path.parent() {
1522            fs::create_dir_all(parent).await?;
1523        }
1524        fs::write(&self.optimization_campaigns_path, payload).await?;
1525        Ok(())
1526    }
1527
1528    pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1529        if !self.optimization_experiments_path.exists() {
1530            return Ok(());
1531        }
1532        let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1533        let parsed = parse_optimization_experiments_file(&raw);
1534        *self.optimization_experiments.write().await = parsed;
1535        Ok(())
1536    }
1537
1538    pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1539        let payload = {
1540            let guard = self.optimization_experiments.read().await;
1541            serde_json::to_string_pretty(&*guard)?
1542        };
1543        if let Some(parent) = self.optimization_experiments_path.parent() {
1544            fs::create_dir_all(parent).await?;
1545        }
1546        fs::write(&self.optimization_experiments_path, payload).await?;
1547        Ok(())
1548    }
1549
1550    async fn verify_automation_v2_persisted_locked(
1551        &self,
1552        automation_id: &str,
1553        expected_present: bool,
1554    ) -> anyhow::Result<()> {
1555        let active_raw = if self.automations_v2_path.exists() {
1556            fs::read_to_string(&self.automations_v2_path).await?
1557        } else {
1558            String::new()
1559        };
1560        let active_parsed = parse_automation_v2_file_strict(&active_raw).map_err(|error| {
1561            anyhow::anyhow!(
1562                "failed to parse automation v2 persistence file `{}` during verification: {}",
1563                self.automations_v2_path.display(),
1564                error
1565            )
1566        })?;
1567        let active_present = active_parsed.contains_key(automation_id);
1568        if active_present != expected_present {
1569            let active_path = self.automations_v2_path.display().to_string();
1570            tracing::error!(
1571                automation_id,
1572                expected_present,
1573                actual_present = active_present,
1574                count = active_parsed.len(),
1575                active_path,
1576                "automation v2 persistence verification failed"
1577            );
1578            anyhow::bail!(
1579                "automation v2 persistence verification failed for `{}`",
1580                automation_id
1581            );
1582        }
1583        let mut alternate_mismatches = Vec::new();
1584        for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1585            if path == self.automations_v2_path {
1586                continue;
1587            }
1588            let raw = if path.exists() {
1589                fs::read_to_string(&path).await?
1590            } else {
1591                String::new()
1592            };
1593            let parsed = match parse_automation_v2_file_strict(&raw) {
1594                Ok(parsed) => parsed,
1595                Err(error) => {
1596                    alternate_mismatches.push(format!(
1597                        "{} expected_present={} parse_error={error}",
1598                        path.display(),
1599                        expected_present
1600                    ));
1601                    continue;
1602                }
1603            };
1604            let present = parsed.contains_key(automation_id);
1605            if present != expected_present {
1606                alternate_mismatches.push(format!(
1607                    "{} expected_present={} actual_present={} count={}",
1608                    path.display(),
1609                    expected_present,
1610                    present,
1611                    parsed.len()
1612                ));
1613            }
1614        }
1615        if !alternate_mismatches.is_empty() {
1616            let active_path = self.automations_v2_path.display().to_string();
1617            tracing::warn!(
1618                automation_id,
1619                expected_present,
1620                mismatches = ?alternate_mismatches,
1621                active_path,
1622                "automation v2 alternate persistence paths are stale"
1623            );
1624        }
1625        Ok(())
1626    }
1627
1628    async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1629        let runs = self
1630            .automation_v2_runs
1631            .read()
1632            .await
1633            .values()
1634            .cloned()
1635            .collect::<Vec<_>>();
1636        let mut guard = self.automations_v2.write().await;
1637        let mut recovered = 0usize;
1638        for run in runs {
1639            let Some(snapshot) = run.automation_snapshot.clone() else {
1640                continue;
1641            };
1642            let should_replace = match guard.get(&run.automation_id) {
1643                Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1644                None => true,
1645            };
1646            if should_replace {
1647                if !guard.contains_key(&run.automation_id) {
1648                    recovered += 1;
1649                }
1650                guard.insert(run.automation_id.clone(), snapshot);
1651            }
1652        }
1653        drop(guard);
1654        if recovered > 0 {
1655            let active_path = self.automations_v2_path.display().to_string();
1656            tracing::warn!(
1657                recovered,
1658                active_path,
1659                "recovered automation v2 definitions from run snapshots"
1660            );
1661            self.persist_automations_v2().await?;
1662        }
1663        Ok(recovered)
1664    }
1665
1666    pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1667        let path = if self.bug_monitor_config_path.exists() {
1668            self.bug_monitor_config_path.clone()
1669        } else if let Some(path) =
1670            config::paths::resolve_legacy_root_file_path("bug_monitor_config.json")
1671        {
1672            if path.exists() {
1673                path
1674            } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1675                .exists()
1676            {
1677                config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1678            } else {
1679                return Ok(());
1680            }
1681        } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1682            .exists()
1683        {
1684            config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1685        } else {
1686            return Ok(());
1687        };
1688        let raw = fs::read_to_string(path).await?;
1689        let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1690            .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1691        *self.bug_monitor_config.write().await = parsed;
1692        Ok(())
1693    }
1694
1695    pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1696        if let Some(parent) = self.bug_monitor_config_path.parent() {
1697            fs::create_dir_all(parent).await?;
1698        }
1699        let payload = {
1700            let guard = self.bug_monitor_config.read().await;
1701            serde_json::to_string_pretty(&*guard)?
1702        };
1703        fs::write(&self.bug_monitor_config_path, payload).await?;
1704        Ok(())
1705    }
1706
1707    pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1708        self.bug_monitor_config.read().await.clone()
1709    }
1710
1711    pub async fn put_bug_monitor_config(
1712        &self,
1713        mut config: BugMonitorConfig,
1714    ) -> anyhow::Result<BugMonitorConfig> {
1715        config.workspace_root = config
1716            .workspace_root
1717            .as_ref()
1718            .map(|v| v.trim().to_string())
1719            .filter(|v| !v.is_empty());
1720        if let Some(repo) = config.repo.as_ref() {
1721            if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1722                anyhow::bail!("repo must be in owner/repo format");
1723            }
1724        }
1725        if let Some(server) = config.mcp_server.as_ref() {
1726            let servers = self.mcp.list().await;
1727            if !servers.contains_key(server) {
1728                anyhow::bail!("unknown mcp server `{server}`");
1729            }
1730        }
1731        if let Some(model_policy) = config.model_policy.as_ref() {
1732            crate::http::routines_automations::validate_model_policy(model_policy)
1733                .map_err(anyhow::Error::msg)?;
1734        }
1735        validate_bug_monitor_monitored_projects(self, &mut config).await?;
1736        config.updated_at_ms = now_ms();
1737        *self.bug_monitor_config.write().await = config.clone();
1738        self.persist_bug_monitor_config().await?;
1739        Ok(config)
1740    }
1741
1742    pub async fn load_bug_monitor_log_watcher_state(&self) -> anyhow::Result<()> {
1743        if !self.bug_monitor_log_watcher_state_path.exists() {
1744            return Ok(());
1745        }
1746        let raw = fs::read_to_string(&self.bug_monitor_log_watcher_state_path).await?;
1747        let parsed =
1748            serde_json::from_str::<BugMonitorLogWatcherStateFile>(&raw).unwrap_or_default();
1749        *self.bug_monitor_log_source_states.write().await = parsed.sources;
1750        Ok(())
1751    }
1752
1753    pub async fn persist_bug_monitor_log_watcher_state(&self) -> anyhow::Result<()> {
1754        if let Some(parent) = self.bug_monitor_log_watcher_state_path.parent() {
1755            fs::create_dir_all(parent).await?;
1756        }
1757        let payload = {
1758            let guard = self.bug_monitor_log_source_states.read().await;
1759            serde_json::to_string_pretty(&BugMonitorLogWatcherStateFile {
1760                schema_version: 1,
1761                sources: guard.clone(),
1762            })?
1763        };
1764        write_state_file_atomically(&self.bug_monitor_log_watcher_state_path, payload).await
1765    }
1766
1767    pub async fn get_bug_monitor_log_source_state(
1768        &self,
1769        project_id: &str,
1770        source_id: &str,
1771    ) -> Option<BugMonitorLogSourceState> {
1772        self.bug_monitor_log_source_states
1773            .read()
1774            .await
1775            .get(&bug_monitor_log_source_state_key(project_id, source_id))
1776            .cloned()
1777    }
1778
1779    pub async fn put_bug_monitor_log_source_state(
1780        &self,
1781        source_state: BugMonitorLogSourceState,
1782    ) -> anyhow::Result<BugMonitorLogSourceState> {
1783        let key =
1784            bug_monitor_log_source_state_key(&source_state.project_id, &source_state.source_id);
1785        self.bug_monitor_log_source_states
1786            .write()
1787            .await
1788            .insert(key, source_state.clone());
1789        self.persist_bug_monitor_log_watcher_state().await?;
1790        Ok(source_state)
1791    }
1792
1793    pub async fn update_bug_monitor_log_watcher_status(
1794        &self,
1795        update: impl FnOnce(&mut BugMonitorLogWatcherStatus),
1796    ) -> BugMonitorLogWatcherStatus {
1797        let mut guard = self.bug_monitor_log_watcher_status.write().await;
1798        update(&mut guard);
1799        guard.clone()
1800    }
1801
1802    pub async fn load_bug_monitor_intake_keys(&self) -> anyhow::Result<()> {
1803        if !self.bug_monitor_intake_keys_path.exists() {
1804            return Ok(());
1805        }
1806        let raw = fs::read_to_string(&self.bug_monitor_intake_keys_path).await?;
1807        let parsed = serde_json::from_str::<
1808            std::collections::HashMap<String, BugMonitorProjectIntakeKey>,
1809        >(&raw)
1810        .unwrap_or_default();
1811        *self.bug_monitor_intake_keys.write().await = parsed;
1812        Ok(())
1813    }
1814
1815    pub async fn persist_bug_monitor_intake_keys(&self) -> anyhow::Result<()> {
1816        if let Some(parent) = self.bug_monitor_intake_keys_path.parent() {
1817            fs::create_dir_all(parent).await?;
1818        }
1819        let payload = {
1820            let guard = self.bug_monitor_intake_keys.read().await;
1821            serde_json::to_string_pretty(&*guard)?
1822        };
1823        write_state_file_atomically(&self.bug_monitor_intake_keys_path, payload).await
1824    }
1825
1826    pub async fn list_bug_monitor_intake_keys(&self) -> Vec<BugMonitorProjectIntakeKey> {
1827        let mut rows = self
1828            .bug_monitor_intake_keys
1829            .read()
1830            .await
1831            .values()
1832            .cloned()
1833            .collect::<Vec<_>>();
1834        rows.sort_by(|a, b| a.project_id.cmp(&b.project_id).then(a.name.cmp(&b.name)));
1835        rows
1836    }
1837
1838    pub async fn put_bug_monitor_intake_key(
1839        &self,
1840        key: BugMonitorProjectIntakeKey,
1841    ) -> anyhow::Result<BugMonitorProjectIntakeKey> {
1842        self.bug_monitor_intake_keys
1843            .write()
1844            .await
1845            .insert(key.key_id.clone(), key.clone());
1846        self.persist_bug_monitor_intake_keys().await?;
1847        Ok(key)
1848    }
1849
1850    pub async fn validate_bug_monitor_intake_key(
1851        &self,
1852        raw_key: &str,
1853        project_id: &str,
1854        required_scope: &str,
1855    ) -> Option<BugMonitorProjectIntakeKey> {
1856        let key_hash = crate::sha256_hex(&[raw_key.trim()]);
1857        let mut matched = {
1858            self.bug_monitor_intake_keys
1859                .read()
1860                .await
1861                .values()
1862                .find(|row| {
1863                    row.enabled
1864                        && row.project_id == project_id
1865                        && row.key_hash == key_hash
1866                        && row.scopes.iter().any(|scope| scope == required_scope)
1867                })
1868                .cloned()
1869        }?;
1870        matched.last_used_at_ms = Some(now_ms());
1871        let _ = self.put_bug_monitor_intake_key(matched.clone()).await;
1872        Some(matched)
1873    }
1874
1875    pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1876        let path = if self.bug_monitor_drafts_path.exists() {
1877            self.bug_monitor_drafts_path.clone()
1878        } else if let Some(path) =
1879            config::paths::resolve_legacy_root_file_path("bug_monitor_drafts.json")
1880        {
1881            if path.exists() {
1882                path
1883            } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1884                .exists()
1885            {
1886                config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1887            } else {
1888                return Ok(());
1889            }
1890        } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1891            .exists()
1892        {
1893            config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1894        } else {
1895            return Ok(());
1896        };
1897        let raw = fs::read_to_string(path).await?;
1898        let parsed =
1899            serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1900                .unwrap_or_default();
1901        *self.bug_monitor_drafts.write().await = parsed;
1902        Ok(())
1903    }
1904
1905    pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1906        if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1907            fs::create_dir_all(parent).await?;
1908        }
1909        let payload = {
1910            let guard = self.bug_monitor_drafts.read().await;
1911            serde_json::to_string_pretty(&*guard)?
1912        };
1913        fs::write(&self.bug_monitor_drafts_path, payload).await?;
1914        Ok(())
1915    }
1916
1917    pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1918        let path = if self.bug_monitor_incidents_path.exists() {
1919            self.bug_monitor_incidents_path.clone()
1920        } else if let Some(path) =
1921            config::paths::resolve_legacy_root_file_path("bug_monitor_incidents.json")
1922        {
1923            if path.exists() {
1924                path
1925            } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1926                .exists()
1927            {
1928                config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1929            } else {
1930                return Ok(());
1931            }
1932        } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1933            .exists()
1934        {
1935            config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1936        } else {
1937            return Ok(());
1938        };
1939        let raw = fs::read_to_string(path).await?;
1940        let parsed = serde_json::from_str::<
1941            std::collections::HashMap<String, BugMonitorIncidentRecord>,
1942        >(&raw)
1943        .unwrap_or_default();
1944        *self.bug_monitor_incidents.write().await = parsed;
1945        Ok(())
1946    }
1947
1948    pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1949        if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1950            fs::create_dir_all(parent).await?;
1951        }
1952        let payload = {
1953            let guard = self.bug_monitor_incidents.read().await;
1954            serde_json::to_string_pretty(&*guard)?
1955        };
1956        fs::write(&self.bug_monitor_incidents_path, payload).await?;
1957        Ok(())
1958    }
1959
1960    pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1961        let path = if self.bug_monitor_posts_path.exists() {
1962            self.bug_monitor_posts_path.clone()
1963        } else if let Some(path) =
1964            config::paths::resolve_legacy_root_file_path("bug_monitor_posts.json")
1965        {
1966            if path.exists() {
1967                path
1968            } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1969                .exists()
1970            {
1971                config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1972            } else {
1973                return Ok(());
1974            }
1975        } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1976            .exists()
1977        {
1978            config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1979        } else {
1980            return Ok(());
1981        };
1982        let raw = fs::read_to_string(path).await?;
1983        let parsed =
1984            serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1985                .unwrap_or_default();
1986        *self.bug_monitor_posts.write().await = parsed;
1987        Ok(())
1988    }
1989
1990    pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1991        if let Some(parent) = self.bug_monitor_posts_path.parent() {
1992            fs::create_dir_all(parent).await?;
1993        }
1994        let payload = {
1995            let guard = self.bug_monitor_posts.read().await;
1996            serde_json::to_string_pretty(&*guard)?
1997        };
1998        fs::write(&self.bug_monitor_posts_path, payload).await?;
1999        Ok(())
2000    }
2001
2002    pub async fn load_external_actions(&self) -> anyhow::Result<()> {
2003        let Some(raw) =
2004            read_state_file_with_legacy(&self.external_actions_path, "external_actions.json")
2005                .await?
2006        else {
2007            return Ok(());
2008        };
2009        let parsed =
2010            serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
2011                .unwrap_or_default();
2012        *self.external_actions.write().await = parsed;
2013        Ok(())
2014    }
2015
2016    pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
2017        if let Some(parent) = self.external_actions_path.parent() {
2018            fs::create_dir_all(parent).await?;
2019        }
2020        let payload = {
2021            let guard = self.external_actions.read().await;
2022            serde_json::to_string_pretty(&*guard)?
2023        };
2024        fs::write(&self.external_actions_path, payload).await?;
2025        Ok(())
2026    }
2027
2028    pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
2029        let mut rows = self
2030            .bug_monitor_incidents
2031            .read()
2032            .await
2033            .values()
2034            .cloned()
2035            .collect::<Vec<_>>();
2036        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2037        rows.truncate(limit.clamp(1, 200));
2038        rows
2039    }
2040
2041    pub async fn get_bug_monitor_incident(
2042        &self,
2043        incident_id: &str,
2044    ) -> Option<BugMonitorIncidentRecord> {
2045        self.bug_monitor_incidents
2046            .read()
2047            .await
2048            .get(incident_id)
2049            .cloned()
2050    }
2051
2052    pub async fn put_bug_monitor_incident(
2053        &self,
2054        incident: BugMonitorIncidentRecord,
2055    ) -> anyhow::Result<BugMonitorIncidentRecord> {
2056        self.bug_monitor_incidents
2057            .write()
2058            .await
2059            .insert(incident.incident_id.clone(), incident.clone());
2060        self.persist_bug_monitor_incidents().await?;
2061        Ok(incident)
2062    }
2063
2064    pub async fn delete_bug_monitor_incidents(&self, ids: &[String]) -> anyhow::Result<usize> {
2065        let mut removed = 0usize;
2066        {
2067            let mut guard = self.bug_monitor_incidents.write().await;
2068            for id in ids {
2069                if guard.remove(id).is_some() {
2070                    removed += 1;
2071                }
2072            }
2073        }
2074        if removed > 0 {
2075            self.persist_bug_monitor_incidents().await?;
2076        }
2077        Ok(removed)
2078    }
2079
2080    pub async fn clear_bug_monitor_incidents(&self) -> anyhow::Result<usize> {
2081        let removed = {
2082            let mut guard = self.bug_monitor_incidents.write().await;
2083            let count = guard.len();
2084            guard.clear();
2085            count
2086        };
2087        if removed > 0 {
2088            self.persist_bug_monitor_incidents().await?;
2089        }
2090        Ok(removed)
2091    }
2092
2093    pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
2094        let mut rows = self
2095            .bug_monitor_posts
2096            .read()
2097            .await
2098            .values()
2099            .cloned()
2100            .collect::<Vec<_>>();
2101        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2102        rows.truncate(limit.clamp(1, 200));
2103        rows
2104    }
2105
2106    pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
2107        self.bug_monitor_posts.read().await.get(post_id).cloned()
2108    }
2109
2110    pub async fn put_bug_monitor_post(
2111        &self,
2112        post: BugMonitorPostRecord,
2113    ) -> anyhow::Result<BugMonitorPostRecord> {
2114        self.bug_monitor_posts
2115            .write()
2116            .await
2117            .insert(post.post_id.clone(), post.clone());
2118        self.persist_bug_monitor_posts().await?;
2119        Ok(post)
2120    }
2121
2122    pub async fn delete_bug_monitor_posts(&self, ids: &[String]) -> anyhow::Result<usize> {
2123        let mut removed = 0usize;
2124        {
2125            let mut guard = self.bug_monitor_posts.write().await;
2126            for id in ids {
2127                if guard.remove(id).is_some() {
2128                    removed += 1;
2129                }
2130            }
2131        }
2132        if removed > 0 {
2133            self.persist_bug_monitor_posts().await?;
2134        }
2135        Ok(removed)
2136    }
2137
2138    pub async fn clear_bug_monitor_posts(&self) -> anyhow::Result<usize> {
2139        let removed = {
2140            let mut guard = self.bug_monitor_posts.write().await;
2141            let count = guard.len();
2142            guard.clear();
2143            count
2144        };
2145        if removed > 0 {
2146            self.persist_bug_monitor_posts().await?;
2147        }
2148        Ok(removed)
2149    }
2150
2151    pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
2152        let mut rows = self
2153            .external_actions
2154            .read()
2155            .await
2156            .values()
2157            .cloned()
2158            .collect::<Vec<_>>();
2159        rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2160        rows.truncate(limit.clamp(1, 200));
2161        rows
2162    }
2163
2164    pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
2165        self.external_actions.read().await.get(action_id).cloned()
2166    }
2167
2168    pub async fn get_external_action_by_idempotency_key(
2169        &self,
2170        idempotency_key: &str,
2171    ) -> Option<ExternalActionRecord> {
2172        let normalized = idempotency_key.trim();
2173        if normalized.is_empty() {
2174            return None;
2175        }
2176        self.external_actions
2177            .read()
2178            .await
2179            .values()
2180            .find(|action| {
2181                action
2182                    .idempotency_key
2183                    .as_deref()
2184                    .map(str::trim)
2185                    .filter(|value| !value.is_empty())
2186                    == Some(normalized)
2187            })
2188            .cloned()
2189    }
2190
2191    pub async fn put_external_action(
2192        &self,
2193        action: ExternalActionRecord,
2194    ) -> anyhow::Result<ExternalActionRecord> {
2195        self.external_actions
2196            .write()
2197            .await
2198            .insert(action.action_id.clone(), action.clone());
2199        self.persist_external_actions().await?;
2200        Ok(action)
2201    }
2202
2203    pub async fn record_external_action(
2204        &self,
2205        action: ExternalActionRecord,
2206    ) -> anyhow::Result<ExternalActionRecord> {
2207        let action = {
2208            let mut guard = self.external_actions.write().await;
2209            if let Some(idempotency_key) = action
2210                .idempotency_key
2211                .as_deref()
2212                .map(str::trim)
2213                .filter(|value| !value.is_empty())
2214            {
2215                if let Some(existing) = guard
2216                    .values()
2217                    .find(|existing| {
2218                        existing
2219                            .idempotency_key
2220                            .as_deref()
2221                            .map(str::trim)
2222                            .filter(|value| !value.is_empty())
2223                            == Some(idempotency_key)
2224                    })
2225                    .cloned()
2226                {
2227                    return Ok(existing);
2228                }
2229            }
2230            guard.insert(action.action_id.clone(), action.clone());
2231            action
2232        };
2233        self.persist_external_actions().await?;
2234        if let Some(run_id) = action.routine_run_id.as_deref() {
2235            let artifact = RoutineRunArtifact {
2236                artifact_id: format!("external-action-{}", action.action_id),
2237                uri: format!("external-action://{}", action.action_id),
2238                kind: "external_action_receipt".to_string(),
2239                label: Some(format!("external action receipt: {}", action.operation)),
2240                created_at_ms: action.updated_at_ms,
2241                metadata: Some(json!({
2242                    "actionID": action.action_id,
2243                    "operation": action.operation,
2244                    "status": action.status,
2245                    "sourceKind": action.source_kind,
2246                    "sourceID": action.source_id,
2247                    "capabilityID": action.capability_id,
2248                    "target": action.target,
2249                })),
2250            };
2251            let _ = self
2252                .append_routine_run_artifact(run_id, artifact.clone())
2253                .await;
2254            if let Some(runtime) = self.runtime.get() {
2255                runtime.event_bus.publish(EngineEvent::new(
2256                    "routine.run.artifact_added",
2257                    json!({
2258                        "runID": run_id,
2259                        "artifact": artifact,
2260                    }),
2261                ));
2262            }
2263        }
2264        if let Some(context_run_id) = action.context_run_id.as_deref() {
2265            let payload = serde_json::to_value(&action)?;
2266            if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
2267                self,
2268                context_run_id,
2269                &format!("external-action-{}", action.action_id),
2270                "external_action_receipt",
2271                &format!("external-actions/{}.json", action.action_id),
2272                &payload,
2273            )
2274            .await
2275            {
2276                tracing::warn!(
2277                    "failed to append external action artifact {} to context run {}: {}",
2278                    action.action_id,
2279                    context_run_id,
2280                    error
2281                );
2282            }
2283        }
2284        Ok(action)
2285    }
2286
2287    pub async fn update_bug_monitor_runtime_status(
2288        &self,
2289        update: impl FnOnce(&mut BugMonitorRuntimeStatus),
2290    ) -> BugMonitorRuntimeStatus {
2291        let mut guard = self.bug_monitor_runtime_status.write().await;
2292        update(&mut guard);
2293        guard.clone()
2294    }
2295
2296    pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
2297        let mut rows = self
2298            .bug_monitor_drafts
2299            .read()
2300            .await
2301            .values()
2302            .cloned()
2303            .collect::<Vec<_>>();
2304        rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2305        rows.truncate(limit.clamp(1, 200));
2306        rows
2307    }
2308
2309    pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
2310        self.bug_monitor_drafts.read().await.get(draft_id).cloned()
2311    }
2312
2313    pub async fn put_bug_monitor_draft(
2314        &self,
2315        draft: BugMonitorDraftRecord,
2316    ) -> anyhow::Result<BugMonitorDraftRecord> {
2317        self.bug_monitor_drafts
2318            .write()
2319            .await
2320            .insert(draft.draft_id.clone(), draft.clone());
2321        self.persist_bug_monitor_drafts().await?;
2322        Ok(draft)
2323    }
2324
2325    pub async fn delete_bug_monitor_drafts(&self, ids: &[String]) -> anyhow::Result<usize> {
2326        let mut removed = 0usize;
2327        {
2328            let mut guard = self.bug_monitor_drafts.write().await;
2329            for id in ids {
2330                if guard.remove(id).is_some() {
2331                    removed += 1;
2332                }
2333            }
2334        }
2335        if removed > 0 {
2336            self.persist_bug_monitor_drafts().await?;
2337        }
2338        Ok(removed)
2339    }
2340
2341    pub async fn clear_bug_monitor_drafts(&self) -> anyhow::Result<usize> {
2342        let removed = {
2343            let mut guard = self.bug_monitor_drafts.write().await;
2344            let count = guard.len();
2345            guard.clear();
2346            count
2347        };
2348        if removed > 0 {
2349            self.persist_bug_monitor_drafts().await?;
2350        }
2351        Ok(removed)
2352    }
2353}