1#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
2struct BugMonitorLogWatcherStateFile {
3 #[serde(default)]
4 schema_version: u32,
5 #[serde(default)]
6 sources: std::collections::HashMap<String, BugMonitorLogSourceState>,
7}
8
9fn bug_monitor_log_source_state_key(project_id: &str, source_id: &str) -> String {
10 format!("{}/{}", project_id.trim(), source_id.trim())
11}
12
13fn is_slug_like(value: &str) -> bool {
14 !value.is_empty()
15 && value
16 .chars()
17 .all(|ch| ch.is_ascii_alphanumeric() || ch == '.' || ch == '_' || ch == '-')
18}
19
20async fn write_state_file_atomically(path: &PathBuf, payload: String) -> anyhow::Result<()> {
21 let tmp = path.with_extension("tmp");
22 fs::write(&tmp, payload).await?;
23 fs::rename(&tmp, path).await?;
24 Ok(())
25}
26
27async fn validate_bug_monitor_monitored_projects(
28 state: &AppState,
29 config: &mut BugMonitorConfig,
30) -> anyhow::Result<()> {
31 let needs_mcp_validation = config.monitored_projects.iter().any(|project| {
32 project
33 .mcp_server
34 .as_ref()
35 .is_some_and(|value| !value.trim().is_empty())
36 });
37 let servers = if needs_mcp_validation && state.is_ready() {
38 Some(state.mcp.list().await)
39 } else {
40 None
41 };
42 let mut project_ids = std::collections::HashSet::new();
43 for project in &mut config.monitored_projects {
44 project.project_id = project.project_id.trim().to_string();
45 project.name = project.name.trim().to_string();
46 project.repo = project.repo.trim().to_string();
47 project.workspace_root = project.workspace_root.trim().to_string();
48 project.mcp_server = project
49 .mcp_server
50 .as_ref()
51 .map(|value| value.trim().to_string())
52 .filter(|value| !value.is_empty());
53
54 if !is_slug_like(&project.project_id) {
55 anyhow::bail!("monitored project id must be ASCII slug-like");
56 }
57 if !project_ids.insert(project.project_id.clone()) {
58 anyhow::bail!("duplicate monitored project id `{}`", project.project_id);
59 }
60 if project.name.is_empty() {
61 anyhow::bail!(
62 "monitored project `{}` name is required",
63 project.project_id
64 );
65 }
66 if !is_valid_owner_repo_slug(&project.repo) {
67 anyhow::bail!(
68 "monitored project `{}` repo must be in owner/repo format",
69 project.project_id
70 );
71 }
72 crate::normalize_absolute_workspace_root(&project.workspace_root)
73 .map_err(anyhow::Error::msg)?;
74 if let Some(server) = project.mcp_server.as_ref() {
75 if let Some(servers) = servers.as_ref() {
76 if !servers.contains_key(server) {
77 anyhow::bail!(
78 "monitored project `{}` references unknown mcp server `{server}`",
79 project.project_id
80 );
81 }
82 } else if !state.is_ready() {
83 } else {
86 anyhow::bail!(
87 "monitored project `{}` references unknown mcp server `{server}`",
88 project.project_id
89 );
90 }
91 }
92 if let Some(model_policy) = project.model_policy.as_ref() {
93 crate::http::routines_automations::validate_model_policy(model_policy)
94 .map_err(anyhow::Error::msg)?;
95 }
96
97 let mut source_ids = std::collections::HashSet::new();
98 for source in &mut project.log_sources {
99 source.source_id = source.source_id.trim().to_string();
100 source.path = source.path.trim().to_string();
101 if !is_slug_like(&source.source_id) {
102 anyhow::bail!(
103 "log source id for monitored project `{}` must be ASCII slug-like",
104 project.project_id
105 );
106 }
107 if !source_ids.insert(source.source_id.clone()) {
108 anyhow::bail!(
109 "duplicate log source id `{}` in monitored project `{}`",
110 source.source_id,
111 project.project_id
112 );
113 }
114 if source.path.is_empty() {
115 anyhow::bail!(
116 "log source `{}` in monitored project `{}` path is required",
117 source.source_id,
118 project.project_id
119 );
120 }
121 let path_project = BugMonitorMonitoredProject {
122 project_id: project.project_id.clone(),
123 name: project.name.clone(),
124 repo: project.repo.clone(),
125 workspace_root: project.workspace_root.clone(),
126 ..BugMonitorMonitoredProject::default()
127 };
128 crate::bug_monitor::log_watcher::resolve_log_source_path(&path_project, source)?;
129 source.watch_interval_seconds = source.watch_interval_seconds.clamp(1, 86_400);
130 source.max_bytes_per_poll = source.max_bytes_per_poll.clamp(1_024, 10 * 1024 * 1024);
131 source.max_candidates_per_poll = source.max_candidates_per_poll.clamp(1, 200);
132 }
133 }
134 Ok(())
135}
136
137impl AppState {
138 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
139 #[cfg(feature = "premium-governance")]
140 let governance_engine: Arc<
141 dyn tandem_enterprise_contract::governance::GovernancePolicyEngine,
142 > = Arc::new(tandem_governance_engine::DefaultGovernanceEngine);
143 #[cfg(not(feature = "premium-governance"))]
144 let governance_engine: Arc<
145 dyn tandem_enterprise_contract::governance::GovernancePolicyEngine,
146 > = Arc::new(crate::app::state::governance::UnavailableGovernanceEngine);
147 Self {
148 runtime: Arc::new(OnceLock::new()),
149 startup: Arc::new(RwLock::new(StartupState {
150 status: StartupStatus::Starting,
151 phase: "boot".to_string(),
152 started_at_ms: now_ms(),
153 attempt_id,
154 last_error: None,
155 })),
156 in_process_mode: Arc::new(AtomicBool::new(in_process)),
157 api_token: Arc::new(RwLock::new(None)),
158 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
159 managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
160 run_registry: RunRegistry::new(),
161 run_stale_ms: config::env::resolve_run_stale_ms(),
162 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
163 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
164 memory_audit_path: config::paths::resolve_memory_audit_path(),
165 protected_audit_path: config::paths::resolve_protected_audit_path(),
166 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
167 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
168 shared_resources_path: config::paths::resolve_shared_resources_path(),
169 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
170 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
171 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
172 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
173 channel_automation_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
174 automation_governance: Arc::new(RwLock::new(
175 crate::automation_v2::governance::GovernanceState::default(),
176 )),
177 governance_engine,
178 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
179 automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
180 config::env::resolve_scheduler_max_concurrent_runs(),
181 ))),
182 automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
183 automations_v2_persistence: Arc::new(tokio::sync::Mutex::new(())),
184 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
185 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
186 workflow_planner_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
187 workflow_learning_candidates: Arc::new(RwLock::new(std::collections::HashMap::new())),
188 context_packs: Arc::new(RwLock::new(std::collections::HashMap::new())),
189 optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
190 optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
191 bug_monitor_config: Arc::new(
192 RwLock::new(config::env::resolve_bug_monitor_env_config()),
193 ),
194 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
195 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
196 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
197 bug_monitor_log_watcher_state_path:
198 config::paths::resolve_bug_monitor_log_watcher_state_path(),
199 bug_monitor_log_source_states: Arc::new(RwLock::new(std::collections::HashMap::new())),
200 bug_monitor_log_watcher_status: Arc::new(RwLock::new(
201 BugMonitorLogWatcherStatus::default(),
202 )),
203 bug_monitor_log_evidence_dir: config::paths::resolve_bug_monitor_log_evidence_dir(),
204 bug_monitor_intake_keys: Arc::new(RwLock::new(std::collections::HashMap::new())),
205 bug_monitor_intake_keys_path: config::paths::resolve_bug_monitor_intake_keys_path(),
206 external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
207 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
208 provider_oauth_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
209 mcp_oauth_sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
210 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
211 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
212 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
213 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
214 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
215 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
216 automation_v2_session_mcp_servers: Arc::new(RwLock::new(
217 std::collections::HashMap::new(),
218 )),
219 routines_path: config::paths::resolve_routines_path(),
220 routine_history_path: config::paths::resolve_routine_history_path(),
221 routine_runs_path: config::paths::resolve_routine_runs_path(),
222 automations_v2_path: config::paths::resolve_automations_v2_path(),
223 channel_automation_drafts_path: config::paths::resolve_channel_automation_drafts_path(),
224 automation_governance_path: config::paths::resolve_automation_governance_path(),
225 automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
226 automation_v2_runs_archive_path: config::paths::resolve_automation_v2_runs_archive_path(
227 ),
228 optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
229 optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
230 bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
231 bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
232 bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
233 bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
234 external_actions_path: config::paths::resolve_external_actions_path(),
235 workflow_runs_path: config::paths::resolve_workflow_runs_path(),
236 workflow_planner_sessions_path: config::paths::resolve_workflow_planner_sessions_path(),
237 workflow_learning_candidates_path:
238 config::paths::resolve_workflow_learning_candidates_path(),
239 context_packs_path: config::paths::resolve_context_packs_path(),
240 workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
241 agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
242 web_ui_enabled: Arc::new(AtomicBool::new(false)),
243 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
244 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
245 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
246 host_runtime_context: detect_host_runtime_context(),
247 token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
248 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
249 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
250 preset_registry: Arc::new(PresetRegistry::new(
251 PackManager::default_root(),
252 resolve_shared_paths()
253 .map(|paths| paths.canonical_root)
254 .unwrap_or_else(|_| {
255 dirs::home_dir()
256 .unwrap_or_else(|| PathBuf::from("."))
257 .join(".tandem")
258 }),
259 )),
260 }
261 }
262
263 pub fn is_ready(&self) -> bool {
264 self.runtime.get().is_some()
265 }
266
267 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
268 for _ in 0..attempts {
269 let startup = self.startup_snapshot().await;
270 if matches!(startup.status, StartupStatus::Ready) {
271 return true;
272 }
273 if matches!(startup.status, StartupStatus::Failed) {
274 return false;
275 }
276 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
277 }
278 matches!(self.startup_snapshot().await.status, StartupStatus::Ready)
279 }
280
281 pub fn mode_label(&self) -> &'static str {
282 if self.in_process_mode.load(Ordering::Relaxed) {
283 "in-process"
284 } else {
285 "sidecar"
286 }
287 }
288
289 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
290 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
291 if let Ok(mut guard) = self.web_ui_prefix.write() {
292 *guard = config::webui::normalize_web_ui_prefix(&prefix);
293 }
294 }
295
296 pub fn web_ui_enabled(&self) -> bool {
297 self.web_ui_enabled.load(Ordering::Relaxed)
298 }
299
300 pub fn web_ui_prefix(&self) -> String {
301 self.web_ui_prefix
302 .read()
303 .map(|v| v.clone())
304 .unwrap_or_else(|_| "/admin".to_string())
305 }
306
307 pub fn set_server_base_url(&self, base_url: String) {
308 if let Ok(mut guard) = self.server_base_url.write() {
309 *guard = base_url;
310 }
311 }
312
313 pub fn server_base_url(&self) -> String {
314 self.server_base_url
315 .read()
316 .map(|v| v.clone())
317 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
318 }
319
320 pub async fn api_token(&self) -> Option<String> {
321 self.api_token.read().await.clone()
322 }
323
324 pub async fn set_api_token(&self, token: Option<String>) {
325 *self.api_token.write().await = token;
326 }
327
328 pub async fn startup_snapshot(&self) -> StartupSnapshot {
329 let state = self.startup.read().await.clone();
330 StartupSnapshot {
331 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
332 status: state.status,
333 phase: state.phase,
334 started_at_ms: state.started_at_ms,
335 attempt_id: state.attempt_id,
336 last_error: state.last_error,
337 }
338 }
339
340 pub fn host_runtime_context(&self) -> HostRuntimeContext {
341 self.runtime
342 .get()
343 .map(|runtime| runtime.host_runtime_context.clone())
344 .unwrap_or_else(|| self.host_runtime_context.clone())
345 }
346
347 pub async fn set_phase(&self, phase: impl Into<String>) {
348 let mut startup = self.startup.write().await;
349 startup.phase = phase.into();
350 }
351
352 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
353 self.runtime
354 .set(runtime)
355 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
356 self.tools
357 .register_tool(
358 "pack_builder".to_string(),
359 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
360 )
361 .await;
362 self.tools
363 .register_tool(
364 "mcp_list".to_string(),
365 Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
366 )
367 .await;
368 self.tools
369 .register_tool(
370 "mcp_list_catalog".to_string(),
371 Arc::new(crate::http::mcp_discovery::McpListCatalogTool::new(
372 self.clone(),
373 )),
374 )
375 .await;
376 self.tools
377 .register_tool(
378 "mcp_request_capability".to_string(),
379 Arc::new(crate::http::mcp_discovery::McpRequestCapabilityTool::new(
380 self.clone(),
381 )),
382 )
383 .await;
384 self.engine_loop
385 .set_spawn_agent_hook(std::sync::Arc::new(
386 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
387 ))
388 .await;
389 self.engine_loop
390 .set_tool_policy_hook(std::sync::Arc::new(
391 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
392 ))
393 .await;
394 self.engine_loop
395 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
396 self.clone(),
397 )))
398 .await;
399 let _ = self.load_shared_resources().await;
400 self.load_routines().await?;
401 let _ = self.load_routine_history().await;
402 let _ = self.load_routine_runs().await;
403 self.load_automations_v2().await?;
404 let _ = self.load_channel_automation_drafts().await;
405 let _ = self.load_automation_governance().await;
406 let _ = self.bootstrap_automation_governance().await;
407 let _ = self.load_automation_v2_runs().await;
408 let _ = self.load_optimization_campaigns().await;
409 let _ = self.load_optimization_experiments().await;
410 let _ = self.load_bug_monitor_config().await;
411 let _ = self.load_bug_monitor_drafts().await;
412 let _ = self.load_bug_monitor_incidents().await;
413 let _ = self.load_bug_monitor_posts().await;
414 let _ = self.load_bug_monitor_log_watcher_state().await;
415 let _ = self.load_bug_monitor_intake_keys().await;
416 let _ = self.load_external_actions().await;
417 let _ = self.load_workflow_planner_sessions().await;
418 let _ = self.load_workflow_learning_candidates().await;
419 let _ = self.load_context_packs().await;
420 let _ = self.load_workflow_runs().await;
421 let _ = self.load_workflow_hook_overrides().await;
422 let _ = self.reload_workflows().await;
423 let workspace_root = self.workspace_index.snapshot().await.root;
424 let _ = self
425 .agent_teams
426 .ensure_loaded_for_workspace(&workspace_root)
427 .await;
428 let mut startup = self.startup.write().await;
429 startup.status = StartupStatus::Ready;
430 startup.phase = "ready".to_string();
431 startup.last_error = None;
432 drop(startup);
433 #[cfg(feature = "browser")]
434 {
435 let state = self.clone();
436 tokio::spawn(async move {
437 if let Err(err) = state.register_browser_tools().await {
438 tracing::warn!("browser tool registration skipped: {}", err);
439 }
440 });
441 }
442 Ok(())
443 }
444
445 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
446 let mut startup = self.startup.write().await;
447 startup.status = StartupStatus::Failed;
448 startup.phase = phase.into();
449 startup.last_error = Some(error.into());
450 }
451
452 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
453 let runtime = self.channels_runtime.lock().await;
454 let mut status = runtime.statuses.clone();
455 let diagnostics = runtime.diagnostics.read().await;
456 for spec in registered_channels() {
457 let entry = status
458 .entry(spec.name.to_string())
459 .or_insert(ChannelStatus {
460 enabled: false,
461 connected: false,
462 last_error: None,
463 active_sessions: 0,
464 meta: json!({}),
465 });
466 let mut meta = entry.meta.as_object().cloned().unwrap_or_default();
467 if let Some(diag) = diagnostics.get(spec.name) {
468 entry.last_error = diag.last_error.clone().or_else(|| entry.last_error.clone());
469 meta.insert("state".to_string(), Value::String(diag.state.to_string()));
470 meta.insert(
471 "last_error_code".to_string(),
472 diag.last_error_code
473 .map(|code| Value::String(code.to_string()))
474 .unwrap_or(Value::Null),
475 );
476 meta.insert(
477 "last_reconnect_at".to_string(),
478 diag.last_reconnect_at
479 .map(|value| Value::Number(value.into()))
480 .unwrap_or(Value::Null),
481 );
482 meta.insert(
483 "listener_start_count".to_string(),
484 Value::Number(serde_json::Number::from(diag.listener_start_count)),
485 );
486 } else {
487 meta.insert("state".to_string(), Value::String("stopped".to_string()));
488 meta.insert("last_error_code".to_string(), Value::Null);
489 meta.insert("last_reconnect_at".to_string(), Value::Null);
490 meta.insert(
491 "listener_start_count".to_string(),
492 Value::Number(0u64.into()),
493 );
494 }
495 entry.meta = Value::Object(meta);
496 }
497 status
498 }
499
500 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
501 let effective = self.config.get_effective_value().await;
502 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
503 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
504
505 let diagnostics = tandem_channels::new_channel_runtime_diagnostics();
506
507 let mut runtime = self.channels_runtime.lock().await;
508 if let Some(listeners) = runtime.listeners.as_mut() {
509 listeners.abort_all();
510 }
511 runtime.listeners = None;
512 runtime.diagnostics = diagnostics.clone();
513 runtime.statuses.clear();
514 let channels_config_value = serde_json::to_value(&parsed.channels)
515 .ok()
516 .and_then(|channels| channels.as_object().cloned());
517
518 let mut status_map = std::collections::HashMap::new();
519 for spec in registered_channels() {
520 let enabled = channels_config_value
521 .as_ref()
522 .and_then(|channels| channels.get(spec.config_key))
523 .and_then(Value::as_object)
524 .is_some();
525 status_map.insert(
526 spec.name.to_string(),
527 ChannelStatus {
528 enabled,
529 connected: false,
530 last_error: None,
531 active_sessions: 0,
532 meta: serde_json::json!({}),
533 },
534 );
535 }
536
537 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
538 let listeners = tandem_channels::start_channel_listeners_with_diagnostics(
539 channels_cfg,
540 diagnostics.clone(),
541 )
542 .await;
543 runtime.listeners = Some(listeners);
544 for status in status_map.values_mut() {
545 if status.enabled {
546 status.connected = true;
547 }
548 }
549 }
550
551 runtime.statuses = status_map.clone();
552 drop(runtime);
553
554 self.event_bus.publish(EngineEvent::new(
555 "channel.status.changed",
556 serde_json::json!({ "channels": status_map }),
557 ));
558 Ok(())
559 }
560
561 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
562 let Some(raw) =
563 read_state_file_with_legacy(&self.shared_resources_path, "shared_resources.json")
564 .await?
565 else {
566 return Ok(());
567 };
568 let parsed =
569 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
570 .unwrap_or_default();
571 let mut guard = self.shared_resources.write().await;
572 *guard = parsed;
573 Ok(())
574 }
575
576 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
577 if let Some(parent) = self.shared_resources_path.parent() {
578 fs::create_dir_all(parent).await?;
579 }
580 let payload = {
581 let guard = self.shared_resources.read().await;
582 serde_json::to_string_pretty(&*guard)?
583 };
584 fs::write(&self.shared_resources_path, payload).await?;
585 Ok(())
586 }
587
588 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
589 self.shared_resources.read().await.get(key).cloned()
590 }
591
592 pub async fn list_shared_resources(
593 &self,
594 prefix: Option<&str>,
595 limit: usize,
596 ) -> Vec<SharedResourceRecord> {
597 let limit = limit.clamp(1, 500);
598 let mut rows = self
599 .shared_resources
600 .read()
601 .await
602 .values()
603 .filter(|record| {
604 if let Some(prefix) = prefix {
605 record.key.starts_with(prefix)
606 } else {
607 true
608 }
609 })
610 .cloned()
611 .collect::<Vec<_>>();
612 rows.sort_by(|a, b| a.key.cmp(&b.key));
613 rows.truncate(limit);
614 rows
615 }
616
617 pub async fn put_shared_resource(
618 &self,
619 key: String,
620 value: Value,
621 if_match_rev: Option<u64>,
622 updated_by: String,
623 ttl_ms: Option<u64>,
624 ) -> Result<SharedResourceRecord, ResourceStoreError> {
625 if !is_valid_resource_key(&key) {
626 return Err(ResourceStoreError::InvalidKey { key });
627 }
628
629 let now = now_ms();
630 let mut guard = self.shared_resources.write().await;
631 let existing = guard.get(&key).cloned();
632
633 if let Some(expected) = if_match_rev {
634 let current = existing.as_ref().map(|row| row.rev);
635 if current != Some(expected) {
636 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
637 key,
638 expected_rev: Some(expected),
639 current_rev: current,
640 }));
641 }
642 }
643
644 let next_rev = existing
645 .as_ref()
646 .map(|row| row.rev.saturating_add(1))
647 .unwrap_or(1);
648
649 let record = SharedResourceRecord {
650 key: key.clone(),
651 value,
652 rev: next_rev,
653 updated_at_ms: now,
654 updated_by,
655 ttl_ms,
656 };
657
658 let previous = guard.insert(key.clone(), record.clone());
659 drop(guard);
660
661 if let Err(error) = self.persist_shared_resources().await {
662 let mut rollback = self.shared_resources.write().await;
663 if let Some(previous) = previous {
664 rollback.insert(key, previous);
665 } else {
666 rollback.remove(&key);
667 }
668 return Err(ResourceStoreError::PersistFailed {
669 message: error.to_string(),
670 });
671 }
672
673 Ok(record)
674 }
675
676 pub async fn delete_shared_resource(
677 &self,
678 key: &str,
679 if_match_rev: Option<u64>,
680 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
681 if !is_valid_resource_key(key) {
682 return Err(ResourceStoreError::InvalidKey {
683 key: key.to_string(),
684 });
685 }
686
687 let mut guard = self.shared_resources.write().await;
688 let current = guard.get(key).cloned();
689 if let Some(expected) = if_match_rev {
690 let current_rev = current.as_ref().map(|row| row.rev);
691 if current_rev != Some(expected) {
692 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
693 key: key.to_string(),
694 expected_rev: Some(expected),
695 current_rev,
696 }));
697 }
698 }
699
700 let removed = guard.remove(key);
701 drop(guard);
702
703 if let Err(error) = self.persist_shared_resources().await {
704 if let Some(record) = removed.clone() {
705 self.shared_resources
706 .write()
707 .await
708 .insert(record.key.clone(), record);
709 }
710 return Err(ResourceStoreError::PersistFailed {
711 message: error.to_string(),
712 });
713 }
714
715 Ok(removed)
716 }
717
718 pub async fn load_routines(&self) -> anyhow::Result<()> {
719 let Some(raw) = read_state_file_with_legacy(&self.routines_path, "routines.json").await?
720 else {
721 return Ok(());
722 };
723 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
724 Ok(parsed) => {
725 let mut guard = self.routines.write().await;
726 *guard = parsed;
727 Ok(())
728 }
729 Err(primary_err) => {
730 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
731 if backup_path.exists() {
732 let backup_raw = fs::read_to_string(&backup_path).await?;
733 if let Ok(parsed_backup) = serde_json::from_str::<
734 std::collections::HashMap<String, RoutineSpec>,
735 >(&backup_raw)
736 {
737 let mut guard = self.routines.write().await;
738 *guard = parsed_backup;
739 return Ok(());
740 }
741 }
742 Err(anyhow::anyhow!(
743 "failed to parse routines store {}: {primary_err}",
744 self.routines_path.display()
745 ))
746 }
747 }
748 }
749
750 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
751 if !self.routine_history_path.exists() {
752 return Ok(());
753 }
754 let raw = fs::read_to_string(&self.routine_history_path).await?;
755 let parsed = serde_json::from_str::<
756 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
757 >(&raw)
758 .unwrap_or_default();
759 let mut guard = self.routine_history.write().await;
760 *guard = parsed;
761 Ok(())
762 }
763
764 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
765 let Some(raw) =
766 read_state_file_with_legacy(&self.routine_runs_path, "routine_runs.json").await?
767 else {
768 return Ok(());
769 };
770 let parsed =
771 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
772 .unwrap_or_default();
773 let mut guard = self.routine_runs.write().await;
774 *guard = parsed;
775 Ok(())
776 }
777
778 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
779 if let Some(parent) = self.routines_path.parent() {
780 fs::create_dir_all(parent).await?;
781 }
782 let (payload, is_empty) = {
783 let guard = self.routines.read().await;
784 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
785 };
786 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
787 let existing_raw = fs::read_to_string(&self.routines_path)
788 .await
789 .unwrap_or_default();
790 let existing_has_rows = serde_json::from_str::<
791 std::collections::HashMap<String, RoutineSpec>,
792 >(&existing_raw)
793 .map(|rows| !rows.is_empty())
794 .unwrap_or(true);
795 if existing_has_rows {
796 return Err(anyhow::anyhow!(
797 "refusing to overwrite non-empty routines store {} with empty in-memory state",
798 self.routines_path.display()
799 ));
800 }
801 }
802 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
803 if self.routines_path.exists() {
804 let _ = fs::copy(&self.routines_path, &backup_path).await;
805 }
806 let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
807 fs::write(&tmp_path, payload).await?;
808 fs::rename(&tmp_path, &self.routines_path).await?;
809 Ok(())
810 }
811
812 pub async fn persist_routines(&self) -> anyhow::Result<()> {
813 self.persist_routines_inner(false).await
814 }
815
816 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
817 if let Some(parent) = self.routine_history_path.parent() {
818 fs::create_dir_all(parent).await?;
819 }
820 let payload = {
821 let guard = self.routine_history.read().await;
822 serde_json::to_string_pretty(&*guard)?
823 };
824 fs::write(&self.routine_history_path, payload).await?;
825 Ok(())
826 }
827
828 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
829 if let Some(parent) = self.routine_runs_path.parent() {
830 fs::create_dir_all(parent).await?;
831 }
832 let payload = {
833 let guard = self.routine_runs.read().await;
834 serde_json::to_string_pretty(&*guard)?
835 };
836 fs::write(&self.routine_runs_path, payload).await?;
837 Ok(())
838 }
839
840 pub async fn put_routine(
841 &self,
842 mut routine: RoutineSpec,
843 ) -> Result<RoutineSpec, RoutineStoreError> {
844 if routine.routine_id.trim().is_empty() {
845 return Err(RoutineStoreError::InvalidRoutineId {
846 routine_id: routine.routine_id,
847 });
848 }
849
850 routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
851 routine.output_targets = normalize_non_empty_list(routine.output_targets);
852
853 let now = now_ms();
854 let next_schedule_fire =
855 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
856 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
857 detail: "invalid schedule or timezone".to_string(),
858 })?;
859 match routine.schedule {
860 RoutineSchedule::IntervalSeconds { seconds } => {
861 if seconds == 0 {
862 return Err(RoutineStoreError::InvalidSchedule {
863 detail: "interval_seconds must be > 0".to_string(),
864 });
865 }
866 let _ = seconds;
867 }
868 RoutineSchedule::Cron { .. } => {}
869 }
870 if routine.next_fire_at_ms.is_none() {
871 routine.next_fire_at_ms = Some(next_schedule_fire);
872 }
873
874 let mut guard = self.routines.write().await;
875 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
876 drop(guard);
877
878 if let Err(error) = self.persist_routines().await {
879 let mut rollback = self.routines.write().await;
880 if let Some(previous) = previous {
881 rollback.insert(previous.routine_id.clone(), previous);
882 } else {
883 rollback.remove(&routine.routine_id);
884 }
885 return Err(RoutineStoreError::PersistFailed {
886 message: error.to_string(),
887 });
888 }
889
890 Ok(routine)
891 }
892
893 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
894 let mut rows = self
895 .routines
896 .read()
897 .await
898 .values()
899 .cloned()
900 .collect::<Vec<_>>();
901 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
902 rows
903 }
904
905 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
906 self.routines.read().await.get(routine_id).cloned()
907 }
908
909 pub async fn delete_routine(
910 &self,
911 routine_id: &str,
912 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
913 let mut guard = self.routines.write().await;
914 let removed = guard.remove(routine_id);
915 drop(guard);
916
917 let allow_empty_overwrite = self.routines.read().await.is_empty();
918 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
919 if let Some(removed) = removed.clone() {
920 self.routines
921 .write()
922 .await
923 .insert(removed.routine_id.clone(), removed);
924 }
925 return Err(RoutineStoreError::PersistFailed {
926 message: error.to_string(),
927 });
928 }
929 Ok(removed)
930 }
931
932 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
933 let mut plans = Vec::new();
934 let mut guard = self.routines.write().await;
935 for routine in guard.values_mut() {
936 if routine.status != RoutineStatus::Active {
937 continue;
938 }
939 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
940 continue;
941 };
942 if now_ms < next_fire_at_ms {
943 continue;
944 }
945 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
946 now_ms,
947 next_fire_at_ms,
948 &routine.schedule,
949 &routine.timezone,
950 &routine.misfire_policy,
951 );
952 routine.next_fire_at_ms = Some(next_fire_at_ms);
953 if run_count == 0 {
954 continue;
955 }
956 plans.push(RoutineTriggerPlan {
957 routine_id: routine.routine_id.clone(),
958 run_count,
959 scheduled_at_ms: now_ms,
960 next_fire_at_ms,
961 });
962 }
963 drop(guard);
964 let _ = self.persist_routines().await;
965 plans
966 }
967
968 pub async fn mark_routine_fired(
969 &self,
970 routine_id: &str,
971 fired_at_ms: u64,
972 ) -> Option<RoutineSpec> {
973 let mut guard = self.routines.write().await;
974 let routine = guard.get_mut(routine_id)?;
975 routine.last_fired_at_ms = Some(fired_at_ms);
976 let updated = routine.clone();
977 drop(guard);
978 let _ = self.persist_routines().await;
979 Some(updated)
980 }
981
982 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
983 let mut history = self.routine_history.write().await;
984 history
985 .entry(event.routine_id.clone())
986 .or_default()
987 .push(event);
988 drop(history);
989 let _ = self.persist_routine_history().await;
990 }
991
992 pub async fn list_routine_history(
993 &self,
994 routine_id: &str,
995 limit: usize,
996 ) -> Vec<RoutineHistoryEvent> {
997 let limit = limit.clamp(1, 500);
998 let mut rows = self
999 .routine_history
1000 .read()
1001 .await
1002 .get(routine_id)
1003 .cloned()
1004 .unwrap_or_default();
1005 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
1006 rows.truncate(limit);
1007 rows
1008 }
1009
1010 pub async fn create_routine_run(
1011 &self,
1012 routine: &RoutineSpec,
1013 trigger_type: &str,
1014 run_count: u32,
1015 status: RoutineRunStatus,
1016 detail: Option<String>,
1017 ) -> RoutineRunRecord {
1018 let now = now_ms();
1019 let record = RoutineRunRecord {
1020 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
1021 routine_id: routine.routine_id.clone(),
1022 trigger_type: trigger_type.to_string(),
1023 run_count,
1024 status,
1025 created_at_ms: now,
1026 updated_at_ms: now,
1027 fired_at_ms: Some(now),
1028 started_at_ms: None,
1029 finished_at_ms: None,
1030 requires_approval: routine.requires_approval,
1031 approval_reason: None,
1032 denial_reason: None,
1033 paused_reason: None,
1034 detail,
1035 entrypoint: routine.entrypoint.clone(),
1036 args: routine.args.clone(),
1037 allowed_tools: routine.allowed_tools.clone(),
1038 output_targets: routine.output_targets.clone(),
1039 artifacts: Vec::new(),
1040 active_session_ids: Vec::new(),
1041 latest_session_id: None,
1042 prompt_tokens: 0,
1043 completion_tokens: 0,
1044 total_tokens: 0,
1045 estimated_cost_usd: 0.0,
1046 };
1047 self.routine_runs
1048 .write()
1049 .await
1050 .insert(record.run_id.clone(), record.clone());
1051 let _ = self.persist_routine_runs().await;
1052 record
1053 }
1054
1055 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
1056 self.routine_runs.read().await.get(run_id).cloned()
1057 }
1058
1059 pub async fn list_routine_runs(
1060 &self,
1061 routine_id: Option<&str>,
1062 limit: usize,
1063 ) -> Vec<RoutineRunRecord> {
1064 let mut rows = self
1065 .routine_runs
1066 .read()
1067 .await
1068 .values()
1069 .filter(|row| {
1070 if let Some(id) = routine_id {
1071 row.routine_id == id
1072 } else {
1073 true
1074 }
1075 })
1076 .cloned()
1077 .collect::<Vec<_>>();
1078 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1079 rows.truncate(limit.clamp(1, 500));
1080 rows
1081 }
1082
1083 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
1084 let mut guard = self.routine_runs.write().await;
1085 let next_run_id = guard
1086 .values()
1087 .filter(|row| row.status == RoutineRunStatus::Queued)
1088 .min_by(|a, b| {
1089 a.created_at_ms
1090 .cmp(&b.created_at_ms)
1091 .then_with(|| a.run_id.cmp(&b.run_id))
1092 })
1093 .map(|row| row.run_id.clone())?;
1094 let now = now_ms();
1095 let row = guard.get_mut(&next_run_id)?;
1096 row.status = RoutineRunStatus::Running;
1097 row.updated_at_ms = now;
1098 row.started_at_ms = Some(now);
1099 let claimed = row.clone();
1100 drop(guard);
1101 let _ = self.persist_routine_runs().await;
1102 Some(claimed)
1103 }
1104
1105 pub async fn set_routine_session_policy(
1106 &self,
1107 session_id: String,
1108 run_id: String,
1109 routine_id: String,
1110 allowed_tools: Vec<String>,
1111 ) {
1112 let policy = RoutineSessionPolicy {
1113 session_id: session_id.clone(),
1114 run_id,
1115 routine_id,
1116 allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1117 };
1118 self.routine_session_policies
1119 .write()
1120 .await
1121 .insert(session_id, policy);
1122 }
1123
1124 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1125 self.routine_session_policies
1126 .read()
1127 .await
1128 .get(session_id)
1129 .cloned()
1130 }
1131
1132 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1133 self.routine_session_policies
1134 .write()
1135 .await
1136 .remove(session_id);
1137 }
1138
1139 pub async fn update_routine_run_status(
1140 &self,
1141 run_id: &str,
1142 status: RoutineRunStatus,
1143 reason: Option<String>,
1144 ) -> Option<RoutineRunRecord> {
1145 let mut guard = self.routine_runs.write().await;
1146 let row = guard.get_mut(run_id)?;
1147 row.status = status.clone();
1148 row.updated_at_ms = now_ms();
1149 match status {
1150 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1151 RoutineRunStatus::Running => {
1152 row.started_at_ms.get_or_insert_with(now_ms);
1153 if let Some(detail) = reason {
1154 row.detail = Some(detail);
1155 }
1156 }
1157 RoutineRunStatus::Denied => row.denial_reason = reason,
1158 RoutineRunStatus::Paused => row.paused_reason = reason,
1159 RoutineRunStatus::Completed
1160 | RoutineRunStatus::Failed
1161 | RoutineRunStatus::Cancelled => {
1162 row.finished_at_ms = Some(now_ms());
1163 if let Some(detail) = reason {
1164 row.detail = Some(detail);
1165 }
1166 }
1167 _ => {
1168 if let Some(detail) = reason {
1169 row.detail = Some(detail);
1170 }
1171 }
1172 }
1173 let updated = row.clone();
1174 drop(guard);
1175 let _ = self.persist_routine_runs().await;
1176 Some(updated)
1177 }
1178
1179 pub async fn append_routine_run_artifact(
1180 &self,
1181 run_id: &str,
1182 artifact: RoutineRunArtifact,
1183 ) -> Option<RoutineRunRecord> {
1184 let mut guard = self.routine_runs.write().await;
1185 let row = guard.get_mut(run_id)?;
1186 row.updated_at_ms = now_ms();
1187 row.artifacts.push(artifact);
1188 let updated = row.clone();
1189 drop(guard);
1190 let _ = self.persist_routine_runs().await;
1191 Some(updated)
1192 }
1193
1194 pub async fn add_active_session_id(
1195 &self,
1196 run_id: &str,
1197 session_id: String,
1198 ) -> Option<RoutineRunRecord> {
1199 let mut guard = self.routine_runs.write().await;
1200 let row = guard.get_mut(run_id)?;
1201 if !row.active_session_ids.iter().any(|id| id == &session_id) {
1202 row.active_session_ids.push(session_id);
1203 }
1204 row.latest_session_id = row.active_session_ids.last().cloned();
1205 row.updated_at_ms = now_ms();
1206 let updated = row.clone();
1207 drop(guard);
1208 let _ = self.persist_routine_runs().await;
1209 Some(updated)
1210 }
1211
1212 pub async fn clear_active_session_id(
1213 &self,
1214 run_id: &str,
1215 session_id: &str,
1216 ) -> Option<RoutineRunRecord> {
1217 let mut guard = self.routine_runs.write().await;
1218 let row = guard.get_mut(run_id)?;
1219 row.active_session_ids.retain(|id| id != session_id);
1220 row.updated_at_ms = now_ms();
1221 let updated = row.clone();
1222 drop(guard);
1223 let _ = self.persist_routine_runs().await;
1224 Some(updated)
1225 }
1226
1227 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1228 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1229 let mut loaded_from_alternate = false;
1230 let mut migrated = false;
1231 let mut path_counts = Vec::new();
1232 let mut canonical_loaded = false;
1233 if self.automations_v2_path.exists() {
1234 let raw = fs::read_to_string(&self.automations_v2_path).await?;
1235 if raw.trim().is_empty() || raw.trim() == "{}" {
1236 path_counts.push((self.automations_v2_path.clone(), 0usize));
1237 } else {
1238 let parsed = parse_automation_v2_file(&raw);
1239 path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1240 canonical_loaded = !parsed.is_empty();
1241 merged = parsed;
1242 }
1243 } else {
1244 path_counts.push((self.automations_v2_path.clone(), 0usize));
1245 }
1246 if !canonical_loaded {
1247 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1248 if path == self.automations_v2_path {
1249 continue;
1250 }
1251 if !path.exists() {
1252 path_counts.push((path, 0usize));
1253 continue;
1254 }
1255 let raw = fs::read_to_string(&path).await?;
1256 if raw.trim().is_empty() || raw.trim() == "{}" {
1257 path_counts.push((path, 0usize));
1258 continue;
1259 }
1260 let parsed = parse_automation_v2_file(&raw);
1261 path_counts.push((path.clone(), parsed.len()));
1262 if !parsed.is_empty() {
1263 loaded_from_alternate = true;
1264 }
1265 for (automation_id, automation) in parsed {
1266 match merged.get(&automation_id) {
1267 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1268 _ => {
1269 merged.insert(automation_id, automation);
1270 }
1271 }
1272 }
1273 }
1274 } else {
1275 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1276 if path == self.automations_v2_path {
1277 continue;
1278 }
1279 if !path.exists() {
1280 path_counts.push((path, 0usize));
1281 continue;
1282 }
1283 let raw = fs::read_to_string(&path).await?;
1284 let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1285 0usize
1286 } else {
1287 parse_automation_v2_file(&raw).len()
1288 };
1289 path_counts.push((path, count));
1290 }
1291 }
1292 let active_path = self.automations_v2_path.display().to_string();
1293 let path_count_summary = path_counts
1294 .iter()
1295 .map(|(path, count)| format!("{}={count}", path.display()))
1296 .collect::<Vec<_>>();
1297 tracing::info!(
1298 active_path,
1299 canonical_loaded,
1300 path_counts = ?path_count_summary,
1301 merged_count = merged.len(),
1302 "loaded automation v2 definitions"
1303 );
1304 for automation in merged.values_mut() {
1305 migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1306 migrated = canonicalize_automation_output_paths(automation) || migrated;
1307 migrated = repair_automation_output_contracts(automation) || migrated;
1308 }
1309 *self.automations_v2.write().await = merged;
1310 if loaded_from_alternate || migrated {
1311 let _ = self.persist_automations_v2().await;
1312 } else if canonical_loaded {
1313 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1314 }
1315 Ok(())
1316 }
1317
1318 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1319 let _guard = self.automations_v2_persistence.lock().await;
1320 self.persist_automations_v2_locked().await
1321 }
1322
1323 async fn persist_automations_v2_locked(&self) -> anyhow::Result<()> {
1324 let payload = {
1325 let guard = self.automations_v2.read().await;
1326 serde_json::to_string_pretty(&*guard)?
1327 };
1328 if let Some(parent) = self.automations_v2_path.parent() {
1329 fs::create_dir_all(parent).await?;
1330 }
1331 write_string_atomic(&self.automations_v2_path, &payload).await?;
1332 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1333 Ok(())
1334 }
1335
1336 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1337 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1338 let mut loaded_from_alternate = false;
1339 let mut canonical_loaded = false;
1340 let mut path_counts = Vec::new();
1341 if self.automation_v2_runs_path.exists() {
1342 let raw = fs::read_to_string(&self.automation_v2_runs_path).await?;
1343 if raw.trim().is_empty() || raw.trim() == "{}" {
1344 path_counts.push((self.automation_v2_runs_path.clone(), 0usize));
1345 } else {
1346 let parsed = parse_automation_v2_runs_file(&raw);
1347 path_counts.push((self.automation_v2_runs_path.clone(), parsed.len()));
1348 canonical_loaded = !parsed.is_empty();
1349 merged = parsed;
1350 }
1351 } else {
1352 path_counts.push((self.automation_v2_runs_path.clone(), 0usize));
1353 }
1354 if !canonical_loaded {
1355 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1356 if path == self.automation_v2_runs_path {
1357 continue;
1358 }
1359 if !path.exists() {
1360 path_counts.push((path, 0usize));
1361 continue;
1362 }
1363 let raw = fs::read_to_string(&path).await?;
1364 if raw.trim().is_empty() || raw.trim() == "{}" {
1365 path_counts.push((path, 0usize));
1366 continue;
1367 }
1368 let parsed = parse_automation_v2_runs_file(&raw);
1369 path_counts.push((path.clone(), parsed.len()));
1370 if !parsed.is_empty() {
1371 loaded_from_alternate = true;
1372 }
1373 for (run_id, run) in parsed {
1374 match merged.get(&run_id) {
1375 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1376 _ => {
1377 merged.insert(run_id, run);
1378 }
1379 }
1380 }
1381 }
1382 } else {
1383 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1384 if path == self.automation_v2_runs_path {
1385 continue;
1386 }
1387 path_counts.push((path.clone(), usize::from(path.exists())));
1388 }
1389 }
1390 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1391 let run_path_count_summary = path_counts
1392 .iter()
1393 .map(|(path, count)| format!("{}={count}", path.display()))
1394 .collect::<Vec<_>>();
1395 tracing::info!(
1396 active_path = active_runs_path,
1397 canonical_loaded,
1398 path_counts = ?run_path_count_summary,
1399 merged_count = merged.len(),
1400 "loaded automation v2 runs"
1401 );
1402 *self.automation_v2_runs.write().await = merged;
1403 let recovered = self
1404 .recover_automation_definitions_from_run_snapshots()
1405 .await?;
1406 let automation_count = self.automations_v2.read().await.len();
1407 let run_count = self.automation_v2_runs.read().await.len();
1408 if automation_count == 0 && run_count > 0 {
1409 let active_automations_path = self.automations_v2_path.display().to_string();
1410 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1411 tracing::warn!(
1412 active_automations_path,
1413 active_runs_path,
1414 run_count,
1415 "automation v2 definitions are empty while run history exists"
1416 );
1417 }
1418 if loaded_from_alternate || recovered > 0 {
1419 let _ = self.persist_automation_v2_runs().await;
1420 } else if canonical_loaded {
1421 let _ =
1422 cleanup_stale_legacy_automation_v2_runs_file(&self.automation_v2_runs_path).await;
1423 }
1424 Ok(())
1425 }
1426
1427 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1428 let (runs_snapshot, automations_snapshot) = {
1429 let runs = self.automation_v2_runs.read().await;
1430 let automations = self.automations_v2.read().await;
1431 (runs.clone(), automations.clone())
1432 };
1433 for run in runs_snapshot.values() {
1434 write_automation_v2_run_history_shard(&self.automation_v2_runs_path, run).await?;
1435 }
1436 let mut compacted = runs_snapshot;
1437 compact_automation_v2_runs_for_hot_storage(
1438 &mut compacted,
1439 &automations_snapshot,
1440 automation_v2_hot_cutoff_ms(),
1441 );
1442 let payload = serde_json::to_string_pretty(&compacted)?;
1443 if let Some(parent) = self.automation_v2_runs_path.parent() {
1444 fs::create_dir_all(parent).await?;
1445 }
1446 fs::write(&self.automation_v2_runs_path, &payload).await?;
1447 let _ = cleanup_stale_legacy_automation_v2_runs_file(&self.automation_v2_runs_path).await;
1448 Ok(())
1449 }
1450
1451 pub async fn archive_stale_automation_v2_runs(
1455 &self,
1456 retention_days: u64,
1457 ) -> anyhow::Result<usize> {
1458 let cutoff_ms = {
1459 let now = now_ms();
1460 let window = retention_days.saturating_mul(24 * 60 * 60 * 1000);
1461 now.saturating_sub(window)
1462 };
1463 let archived: std::collections::HashMap<String, AutomationV2RunRecord> = {
1464 let mut guard = self.automation_v2_runs.write().await;
1465 let stale_ids: Vec<String> = guard
1466 .iter()
1467 .filter(|(_, run)| {
1468 matches!(
1469 run.status,
1470 AutomationRunStatus::Completed
1471 | AutomationRunStatus::Failed
1472 | AutomationRunStatus::Blocked
1473 | AutomationRunStatus::Cancelled
1474 ) && run.updated_at_ms <= cutoff_ms
1475 })
1476 .map(|(id, _)| id.clone())
1477 .collect();
1478 let mut archived = std::collections::HashMap::new();
1479 for id in &stale_ids {
1480 if let Some(run) = guard.remove(id) {
1481 archived.insert(id.clone(), run);
1482 }
1483 }
1484 archived
1485 };
1486 if archived.is_empty() {
1487 return Ok(0);
1488 }
1489 let archived_count = archived.len();
1490 for run in archived.values() {
1491 write_automation_v2_run_history_shard(&self.automation_v2_runs_path, run).await?;
1492 }
1493
1494 self.persist_automation_v2_runs().await?;
1496
1497 tracing::info!(
1498 archived = archived_count,
1499 retention_days,
1500 history_root = %automation_v2_run_history_root(&self.automation_v2_runs_path).display(),
1501 "moved stale automation v2 runs to history shards"
1502 );
1503 Ok(archived_count)
1504 }
1505
1506 pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1507 if !self.optimization_campaigns_path.exists() {
1508 return Ok(());
1509 }
1510 let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1511 let parsed = parse_optimization_campaigns_file(&raw);
1512 *self.optimization_campaigns.write().await = parsed;
1513 Ok(())
1514 }
1515
1516 pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1517 let payload = {
1518 let guard = self.optimization_campaigns.read().await;
1519 serde_json::to_string_pretty(&*guard)?
1520 };
1521 if let Some(parent) = self.optimization_campaigns_path.parent() {
1522 fs::create_dir_all(parent).await?;
1523 }
1524 fs::write(&self.optimization_campaigns_path, payload).await?;
1525 Ok(())
1526 }
1527
1528 pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1529 if !self.optimization_experiments_path.exists() {
1530 return Ok(());
1531 }
1532 let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1533 let parsed = parse_optimization_experiments_file(&raw);
1534 *self.optimization_experiments.write().await = parsed;
1535 Ok(())
1536 }
1537
1538 pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1539 let payload = {
1540 let guard = self.optimization_experiments.read().await;
1541 serde_json::to_string_pretty(&*guard)?
1542 };
1543 if let Some(parent) = self.optimization_experiments_path.parent() {
1544 fs::create_dir_all(parent).await?;
1545 }
1546 fs::write(&self.optimization_experiments_path, payload).await?;
1547 Ok(())
1548 }
1549
1550 async fn verify_automation_v2_persisted_locked(
1551 &self,
1552 automation_id: &str,
1553 expected_present: bool,
1554 ) -> anyhow::Result<()> {
1555 let active_raw = if self.automations_v2_path.exists() {
1556 fs::read_to_string(&self.automations_v2_path).await?
1557 } else {
1558 String::new()
1559 };
1560 let active_parsed = parse_automation_v2_file_strict(&active_raw).map_err(|error| {
1561 anyhow::anyhow!(
1562 "failed to parse automation v2 persistence file `{}` during verification: {}",
1563 self.automations_v2_path.display(),
1564 error
1565 )
1566 })?;
1567 let active_present = active_parsed.contains_key(automation_id);
1568 if active_present != expected_present {
1569 let active_path = self.automations_v2_path.display().to_string();
1570 tracing::error!(
1571 automation_id,
1572 expected_present,
1573 actual_present = active_present,
1574 count = active_parsed.len(),
1575 active_path,
1576 "automation v2 persistence verification failed"
1577 );
1578 anyhow::bail!(
1579 "automation v2 persistence verification failed for `{}`",
1580 automation_id
1581 );
1582 }
1583 let mut alternate_mismatches = Vec::new();
1584 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1585 if path == self.automations_v2_path {
1586 continue;
1587 }
1588 let raw = if path.exists() {
1589 fs::read_to_string(&path).await?
1590 } else {
1591 String::new()
1592 };
1593 let parsed = match parse_automation_v2_file_strict(&raw) {
1594 Ok(parsed) => parsed,
1595 Err(error) => {
1596 alternate_mismatches.push(format!(
1597 "{} expected_present={} parse_error={error}",
1598 path.display(),
1599 expected_present
1600 ));
1601 continue;
1602 }
1603 };
1604 let present = parsed.contains_key(automation_id);
1605 if present != expected_present {
1606 alternate_mismatches.push(format!(
1607 "{} expected_present={} actual_present={} count={}",
1608 path.display(),
1609 expected_present,
1610 present,
1611 parsed.len()
1612 ));
1613 }
1614 }
1615 if !alternate_mismatches.is_empty() {
1616 let active_path = self.automations_v2_path.display().to_string();
1617 tracing::warn!(
1618 automation_id,
1619 expected_present,
1620 mismatches = ?alternate_mismatches,
1621 active_path,
1622 "automation v2 alternate persistence paths are stale"
1623 );
1624 }
1625 Ok(())
1626 }
1627
1628 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1629 let runs = self
1630 .automation_v2_runs
1631 .read()
1632 .await
1633 .values()
1634 .cloned()
1635 .collect::<Vec<_>>();
1636 let mut guard = self.automations_v2.write().await;
1637 let mut recovered = 0usize;
1638 for run in runs {
1639 let Some(snapshot) = run.automation_snapshot.clone() else {
1640 continue;
1641 };
1642 let should_replace = match guard.get(&run.automation_id) {
1643 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1644 None => true,
1645 };
1646 if should_replace {
1647 if !guard.contains_key(&run.automation_id) {
1648 recovered += 1;
1649 }
1650 guard.insert(run.automation_id.clone(), snapshot);
1651 }
1652 }
1653 drop(guard);
1654 if recovered > 0 {
1655 let active_path = self.automations_v2_path.display().to_string();
1656 tracing::warn!(
1657 recovered,
1658 active_path,
1659 "recovered automation v2 definitions from run snapshots"
1660 );
1661 self.persist_automations_v2().await?;
1662 }
1663 Ok(recovered)
1664 }
1665
1666 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1667 let path = if self.bug_monitor_config_path.exists() {
1668 self.bug_monitor_config_path.clone()
1669 } else if let Some(path) =
1670 config::paths::resolve_legacy_root_file_path("bug_monitor_config.json")
1671 {
1672 if path.exists() {
1673 path
1674 } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1675 .exists()
1676 {
1677 config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1678 } else {
1679 return Ok(());
1680 }
1681 } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1682 .exists()
1683 {
1684 config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1685 } else {
1686 return Ok(());
1687 };
1688 let raw = fs::read_to_string(path).await?;
1689 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1690 .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1691 *self.bug_monitor_config.write().await = parsed;
1692 Ok(())
1693 }
1694
1695 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1696 if let Some(parent) = self.bug_monitor_config_path.parent() {
1697 fs::create_dir_all(parent).await?;
1698 }
1699 let payload = {
1700 let guard = self.bug_monitor_config.read().await;
1701 serde_json::to_string_pretty(&*guard)?
1702 };
1703 fs::write(&self.bug_monitor_config_path, payload).await?;
1704 Ok(())
1705 }
1706
1707 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1708 self.bug_monitor_config.read().await.clone()
1709 }
1710
1711 pub async fn put_bug_monitor_config(
1712 &self,
1713 mut config: BugMonitorConfig,
1714 ) -> anyhow::Result<BugMonitorConfig> {
1715 config.workspace_root = config
1716 .workspace_root
1717 .as_ref()
1718 .map(|v| v.trim().to_string())
1719 .filter(|v| !v.is_empty());
1720 if let Some(repo) = config.repo.as_ref() {
1721 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1722 anyhow::bail!("repo must be in owner/repo format");
1723 }
1724 }
1725 if let Some(server) = config.mcp_server.as_ref() {
1726 let servers = self.mcp.list().await;
1727 if !servers.contains_key(server) {
1728 anyhow::bail!("unknown mcp server `{server}`");
1729 }
1730 }
1731 if let Some(model_policy) = config.model_policy.as_ref() {
1732 crate::http::routines_automations::validate_model_policy(model_policy)
1733 .map_err(anyhow::Error::msg)?;
1734 }
1735 validate_bug_monitor_monitored_projects(self, &mut config).await?;
1736 config.updated_at_ms = now_ms();
1737 *self.bug_monitor_config.write().await = config.clone();
1738 self.persist_bug_monitor_config().await?;
1739 Ok(config)
1740 }
1741
1742 pub async fn load_bug_monitor_log_watcher_state(&self) -> anyhow::Result<()> {
1743 if !self.bug_monitor_log_watcher_state_path.exists() {
1744 return Ok(());
1745 }
1746 let raw = fs::read_to_string(&self.bug_monitor_log_watcher_state_path).await?;
1747 let parsed =
1748 serde_json::from_str::<BugMonitorLogWatcherStateFile>(&raw).unwrap_or_default();
1749 *self.bug_monitor_log_source_states.write().await = parsed.sources;
1750 Ok(())
1751 }
1752
1753 pub async fn persist_bug_monitor_log_watcher_state(&self) -> anyhow::Result<()> {
1754 if let Some(parent) = self.bug_monitor_log_watcher_state_path.parent() {
1755 fs::create_dir_all(parent).await?;
1756 }
1757 let payload = {
1758 let guard = self.bug_monitor_log_source_states.read().await;
1759 serde_json::to_string_pretty(&BugMonitorLogWatcherStateFile {
1760 schema_version: 1,
1761 sources: guard.clone(),
1762 })?
1763 };
1764 write_state_file_atomically(&self.bug_monitor_log_watcher_state_path, payload).await
1765 }
1766
1767 pub async fn get_bug_monitor_log_source_state(
1768 &self,
1769 project_id: &str,
1770 source_id: &str,
1771 ) -> Option<BugMonitorLogSourceState> {
1772 self.bug_monitor_log_source_states
1773 .read()
1774 .await
1775 .get(&bug_monitor_log_source_state_key(project_id, source_id))
1776 .cloned()
1777 }
1778
1779 pub async fn put_bug_monitor_log_source_state(
1780 &self,
1781 source_state: BugMonitorLogSourceState,
1782 ) -> anyhow::Result<BugMonitorLogSourceState> {
1783 let key =
1784 bug_monitor_log_source_state_key(&source_state.project_id, &source_state.source_id);
1785 self.bug_monitor_log_source_states
1786 .write()
1787 .await
1788 .insert(key, source_state.clone());
1789 self.persist_bug_monitor_log_watcher_state().await?;
1790 Ok(source_state)
1791 }
1792
1793 pub async fn update_bug_monitor_log_watcher_status(
1794 &self,
1795 update: impl FnOnce(&mut BugMonitorLogWatcherStatus),
1796 ) -> BugMonitorLogWatcherStatus {
1797 let mut guard = self.bug_monitor_log_watcher_status.write().await;
1798 update(&mut guard);
1799 guard.clone()
1800 }
1801
1802 pub async fn load_bug_monitor_intake_keys(&self) -> anyhow::Result<()> {
1803 if !self.bug_monitor_intake_keys_path.exists() {
1804 return Ok(());
1805 }
1806 let raw = fs::read_to_string(&self.bug_monitor_intake_keys_path).await?;
1807 let parsed = serde_json::from_str::<
1808 std::collections::HashMap<String, BugMonitorProjectIntakeKey>,
1809 >(&raw)
1810 .unwrap_or_default();
1811 *self.bug_monitor_intake_keys.write().await = parsed;
1812 Ok(())
1813 }
1814
1815 pub async fn persist_bug_monitor_intake_keys(&self) -> anyhow::Result<()> {
1816 if let Some(parent) = self.bug_monitor_intake_keys_path.parent() {
1817 fs::create_dir_all(parent).await?;
1818 }
1819 let payload = {
1820 let guard = self.bug_monitor_intake_keys.read().await;
1821 serde_json::to_string_pretty(&*guard)?
1822 };
1823 write_state_file_atomically(&self.bug_monitor_intake_keys_path, payload).await
1824 }
1825
1826 pub async fn list_bug_monitor_intake_keys(&self) -> Vec<BugMonitorProjectIntakeKey> {
1827 let mut rows = self
1828 .bug_monitor_intake_keys
1829 .read()
1830 .await
1831 .values()
1832 .cloned()
1833 .collect::<Vec<_>>();
1834 rows.sort_by(|a, b| a.project_id.cmp(&b.project_id).then(a.name.cmp(&b.name)));
1835 rows
1836 }
1837
1838 pub async fn put_bug_monitor_intake_key(
1839 &self,
1840 key: BugMonitorProjectIntakeKey,
1841 ) -> anyhow::Result<BugMonitorProjectIntakeKey> {
1842 self.bug_monitor_intake_keys
1843 .write()
1844 .await
1845 .insert(key.key_id.clone(), key.clone());
1846 self.persist_bug_monitor_intake_keys().await?;
1847 Ok(key)
1848 }
1849
1850 pub async fn validate_bug_monitor_intake_key(
1851 &self,
1852 raw_key: &str,
1853 project_id: &str,
1854 required_scope: &str,
1855 ) -> Option<BugMonitorProjectIntakeKey> {
1856 let key_hash = crate::sha256_hex(&[raw_key.trim()]);
1857 let mut matched = {
1858 self.bug_monitor_intake_keys
1859 .read()
1860 .await
1861 .values()
1862 .find(|row| {
1863 row.enabled
1864 && row.project_id == project_id
1865 && row.key_hash == key_hash
1866 && row.scopes.iter().any(|scope| scope == required_scope)
1867 })
1868 .cloned()
1869 }?;
1870 matched.last_used_at_ms = Some(now_ms());
1871 let _ = self.put_bug_monitor_intake_key(matched.clone()).await;
1872 Some(matched)
1873 }
1874
1875 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1876 let path = if self.bug_monitor_drafts_path.exists() {
1877 self.bug_monitor_drafts_path.clone()
1878 } else if let Some(path) =
1879 config::paths::resolve_legacy_root_file_path("bug_monitor_drafts.json")
1880 {
1881 if path.exists() {
1882 path
1883 } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1884 .exists()
1885 {
1886 config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1887 } else {
1888 return Ok(());
1889 }
1890 } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1891 .exists()
1892 {
1893 config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1894 } else {
1895 return Ok(());
1896 };
1897 let raw = fs::read_to_string(path).await?;
1898 let parsed =
1899 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1900 .unwrap_or_default();
1901 *self.bug_monitor_drafts.write().await = parsed;
1902 Ok(())
1903 }
1904
1905 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1906 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1907 fs::create_dir_all(parent).await?;
1908 }
1909 let payload = {
1910 let guard = self.bug_monitor_drafts.read().await;
1911 serde_json::to_string_pretty(&*guard)?
1912 };
1913 fs::write(&self.bug_monitor_drafts_path, payload).await?;
1914 Ok(())
1915 }
1916
1917 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1918 let path = if self.bug_monitor_incidents_path.exists() {
1919 self.bug_monitor_incidents_path.clone()
1920 } else if let Some(path) =
1921 config::paths::resolve_legacy_root_file_path("bug_monitor_incidents.json")
1922 {
1923 if path.exists() {
1924 path
1925 } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1926 .exists()
1927 {
1928 config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1929 } else {
1930 return Ok(());
1931 }
1932 } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1933 .exists()
1934 {
1935 config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1936 } else {
1937 return Ok(());
1938 };
1939 let raw = fs::read_to_string(path).await?;
1940 let parsed = serde_json::from_str::<
1941 std::collections::HashMap<String, BugMonitorIncidentRecord>,
1942 >(&raw)
1943 .unwrap_or_default();
1944 *self.bug_monitor_incidents.write().await = parsed;
1945 Ok(())
1946 }
1947
1948 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1949 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1950 fs::create_dir_all(parent).await?;
1951 }
1952 let payload = {
1953 let guard = self.bug_monitor_incidents.read().await;
1954 serde_json::to_string_pretty(&*guard)?
1955 };
1956 fs::write(&self.bug_monitor_incidents_path, payload).await?;
1957 Ok(())
1958 }
1959
1960 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1961 let path = if self.bug_monitor_posts_path.exists() {
1962 self.bug_monitor_posts_path.clone()
1963 } else if let Some(path) =
1964 config::paths::resolve_legacy_root_file_path("bug_monitor_posts.json")
1965 {
1966 if path.exists() {
1967 path
1968 } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1969 .exists()
1970 {
1971 config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1972 } else {
1973 return Ok(());
1974 }
1975 } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1976 .exists()
1977 {
1978 config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1979 } else {
1980 return Ok(());
1981 };
1982 let raw = fs::read_to_string(path).await?;
1983 let parsed =
1984 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1985 .unwrap_or_default();
1986 *self.bug_monitor_posts.write().await = parsed;
1987 Ok(())
1988 }
1989
1990 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1991 if let Some(parent) = self.bug_monitor_posts_path.parent() {
1992 fs::create_dir_all(parent).await?;
1993 }
1994 let payload = {
1995 let guard = self.bug_monitor_posts.read().await;
1996 serde_json::to_string_pretty(&*guard)?
1997 };
1998 fs::write(&self.bug_monitor_posts_path, payload).await?;
1999 Ok(())
2000 }
2001
2002 pub async fn load_external_actions(&self) -> anyhow::Result<()> {
2003 let Some(raw) =
2004 read_state_file_with_legacy(&self.external_actions_path, "external_actions.json")
2005 .await?
2006 else {
2007 return Ok(());
2008 };
2009 let parsed =
2010 serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
2011 .unwrap_or_default();
2012 *self.external_actions.write().await = parsed;
2013 Ok(())
2014 }
2015
2016 pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
2017 if let Some(parent) = self.external_actions_path.parent() {
2018 fs::create_dir_all(parent).await?;
2019 }
2020 let payload = {
2021 let guard = self.external_actions.read().await;
2022 serde_json::to_string_pretty(&*guard)?
2023 };
2024 fs::write(&self.external_actions_path, payload).await?;
2025 Ok(())
2026 }
2027
2028 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
2029 let mut rows = self
2030 .bug_monitor_incidents
2031 .read()
2032 .await
2033 .values()
2034 .cloned()
2035 .collect::<Vec<_>>();
2036 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2037 rows.truncate(limit.clamp(1, 200));
2038 rows
2039 }
2040
2041 pub async fn get_bug_monitor_incident(
2042 &self,
2043 incident_id: &str,
2044 ) -> Option<BugMonitorIncidentRecord> {
2045 self.bug_monitor_incidents
2046 .read()
2047 .await
2048 .get(incident_id)
2049 .cloned()
2050 }
2051
2052 pub async fn put_bug_monitor_incident(
2053 &self,
2054 incident: BugMonitorIncidentRecord,
2055 ) -> anyhow::Result<BugMonitorIncidentRecord> {
2056 self.bug_monitor_incidents
2057 .write()
2058 .await
2059 .insert(incident.incident_id.clone(), incident.clone());
2060 self.persist_bug_monitor_incidents().await?;
2061 Ok(incident)
2062 }
2063
2064 pub async fn delete_bug_monitor_incidents(&self, ids: &[String]) -> anyhow::Result<usize> {
2065 let mut removed = 0usize;
2066 {
2067 let mut guard = self.bug_monitor_incidents.write().await;
2068 for id in ids {
2069 if guard.remove(id).is_some() {
2070 removed += 1;
2071 }
2072 }
2073 }
2074 if removed > 0 {
2075 self.persist_bug_monitor_incidents().await?;
2076 }
2077 Ok(removed)
2078 }
2079
2080 pub async fn clear_bug_monitor_incidents(&self) -> anyhow::Result<usize> {
2081 let removed = {
2082 let mut guard = self.bug_monitor_incidents.write().await;
2083 let count = guard.len();
2084 guard.clear();
2085 count
2086 };
2087 if removed > 0 {
2088 self.persist_bug_monitor_incidents().await?;
2089 }
2090 Ok(removed)
2091 }
2092
2093 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
2094 let mut rows = self
2095 .bug_monitor_posts
2096 .read()
2097 .await
2098 .values()
2099 .cloned()
2100 .collect::<Vec<_>>();
2101 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2102 rows.truncate(limit.clamp(1, 200));
2103 rows
2104 }
2105
2106 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
2107 self.bug_monitor_posts.read().await.get(post_id).cloned()
2108 }
2109
2110 pub async fn put_bug_monitor_post(
2111 &self,
2112 post: BugMonitorPostRecord,
2113 ) -> anyhow::Result<BugMonitorPostRecord> {
2114 self.bug_monitor_posts
2115 .write()
2116 .await
2117 .insert(post.post_id.clone(), post.clone());
2118 self.persist_bug_monitor_posts().await?;
2119 Ok(post)
2120 }
2121
2122 pub async fn delete_bug_monitor_posts(&self, ids: &[String]) -> anyhow::Result<usize> {
2123 let mut removed = 0usize;
2124 {
2125 let mut guard = self.bug_monitor_posts.write().await;
2126 for id in ids {
2127 if guard.remove(id).is_some() {
2128 removed += 1;
2129 }
2130 }
2131 }
2132 if removed > 0 {
2133 self.persist_bug_monitor_posts().await?;
2134 }
2135 Ok(removed)
2136 }
2137
2138 pub async fn clear_bug_monitor_posts(&self) -> anyhow::Result<usize> {
2139 let removed = {
2140 let mut guard = self.bug_monitor_posts.write().await;
2141 let count = guard.len();
2142 guard.clear();
2143 count
2144 };
2145 if removed > 0 {
2146 self.persist_bug_monitor_posts().await?;
2147 }
2148 Ok(removed)
2149 }
2150
2151 pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
2152 let mut rows = self
2153 .external_actions
2154 .read()
2155 .await
2156 .values()
2157 .cloned()
2158 .collect::<Vec<_>>();
2159 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2160 rows.truncate(limit.clamp(1, 200));
2161 rows
2162 }
2163
2164 pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
2165 self.external_actions.read().await.get(action_id).cloned()
2166 }
2167
2168 pub async fn get_external_action_by_idempotency_key(
2169 &self,
2170 idempotency_key: &str,
2171 ) -> Option<ExternalActionRecord> {
2172 let normalized = idempotency_key.trim();
2173 if normalized.is_empty() {
2174 return None;
2175 }
2176 self.external_actions
2177 .read()
2178 .await
2179 .values()
2180 .find(|action| {
2181 action
2182 .idempotency_key
2183 .as_deref()
2184 .map(str::trim)
2185 .filter(|value| !value.is_empty())
2186 == Some(normalized)
2187 })
2188 .cloned()
2189 }
2190
2191 pub async fn put_external_action(
2192 &self,
2193 action: ExternalActionRecord,
2194 ) -> anyhow::Result<ExternalActionRecord> {
2195 self.external_actions
2196 .write()
2197 .await
2198 .insert(action.action_id.clone(), action.clone());
2199 self.persist_external_actions().await?;
2200 Ok(action)
2201 }
2202
2203 pub async fn record_external_action(
2204 &self,
2205 action: ExternalActionRecord,
2206 ) -> anyhow::Result<ExternalActionRecord> {
2207 let action = {
2208 let mut guard = self.external_actions.write().await;
2209 if let Some(idempotency_key) = action
2210 .idempotency_key
2211 .as_deref()
2212 .map(str::trim)
2213 .filter(|value| !value.is_empty())
2214 {
2215 if let Some(existing) = guard
2216 .values()
2217 .find(|existing| {
2218 existing
2219 .idempotency_key
2220 .as_deref()
2221 .map(str::trim)
2222 .filter(|value| !value.is_empty())
2223 == Some(idempotency_key)
2224 })
2225 .cloned()
2226 {
2227 return Ok(existing);
2228 }
2229 }
2230 guard.insert(action.action_id.clone(), action.clone());
2231 action
2232 };
2233 self.persist_external_actions().await?;
2234 if let Some(run_id) = action.routine_run_id.as_deref() {
2235 let artifact = RoutineRunArtifact {
2236 artifact_id: format!("external-action-{}", action.action_id),
2237 uri: format!("external-action://{}", action.action_id),
2238 kind: "external_action_receipt".to_string(),
2239 label: Some(format!("external action receipt: {}", action.operation)),
2240 created_at_ms: action.updated_at_ms,
2241 metadata: Some(json!({
2242 "actionID": action.action_id,
2243 "operation": action.operation,
2244 "status": action.status,
2245 "sourceKind": action.source_kind,
2246 "sourceID": action.source_id,
2247 "capabilityID": action.capability_id,
2248 "target": action.target,
2249 })),
2250 };
2251 let _ = self
2252 .append_routine_run_artifact(run_id, artifact.clone())
2253 .await;
2254 if let Some(runtime) = self.runtime.get() {
2255 runtime.event_bus.publish(EngineEvent::new(
2256 "routine.run.artifact_added",
2257 json!({
2258 "runID": run_id,
2259 "artifact": artifact,
2260 }),
2261 ));
2262 }
2263 }
2264 if let Some(context_run_id) = action.context_run_id.as_deref() {
2265 let payload = serde_json::to_value(&action)?;
2266 if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
2267 self,
2268 context_run_id,
2269 &format!("external-action-{}", action.action_id),
2270 "external_action_receipt",
2271 &format!("external-actions/{}.json", action.action_id),
2272 &payload,
2273 )
2274 .await
2275 {
2276 tracing::warn!(
2277 "failed to append external action artifact {} to context run {}: {}",
2278 action.action_id,
2279 context_run_id,
2280 error
2281 );
2282 }
2283 }
2284 Ok(action)
2285 }
2286
2287 pub async fn update_bug_monitor_runtime_status(
2288 &self,
2289 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
2290 ) -> BugMonitorRuntimeStatus {
2291 let mut guard = self.bug_monitor_runtime_status.write().await;
2292 update(&mut guard);
2293 guard.clone()
2294 }
2295
2296 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
2297 let mut rows = self
2298 .bug_monitor_drafts
2299 .read()
2300 .await
2301 .values()
2302 .cloned()
2303 .collect::<Vec<_>>();
2304 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2305 rows.truncate(limit.clamp(1, 200));
2306 rows
2307 }
2308
2309 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
2310 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
2311 }
2312
2313 pub async fn put_bug_monitor_draft(
2314 &self,
2315 draft: BugMonitorDraftRecord,
2316 ) -> anyhow::Result<BugMonitorDraftRecord> {
2317 self.bug_monitor_drafts
2318 .write()
2319 .await
2320 .insert(draft.draft_id.clone(), draft.clone());
2321 self.persist_bug_monitor_drafts().await?;
2322 Ok(draft)
2323 }
2324
2325 pub async fn delete_bug_monitor_drafts(&self, ids: &[String]) -> anyhow::Result<usize> {
2326 let mut removed = 0usize;
2327 {
2328 let mut guard = self.bug_monitor_drafts.write().await;
2329 for id in ids {
2330 if guard.remove(id).is_some() {
2331 removed += 1;
2332 }
2333 }
2334 }
2335 if removed > 0 {
2336 self.persist_bug_monitor_drafts().await?;
2337 }
2338 Ok(removed)
2339 }
2340
2341 pub async fn clear_bug_monitor_drafts(&self) -> anyhow::Result<usize> {
2342 let removed = {
2343 let mut guard = self.bug_monitor_drafts.write().await;
2344 let count = guard.len();
2345 guard.clear();
2346 count
2347 };
2348 if removed > 0 {
2349 self.persist_bug_monitor_drafts().await?;
2350 }
2351 Ok(removed)
2352 }
2353}