1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
22use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
23use tandem_memory::db::MemoryDatabase;
24use tandem_providers::ChatMessage;
25use tandem_workflows::{
26 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
27 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
28 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
29};
30
31use crate::agent_teams::AgentTeamRuntime;
32use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
33use crate::automation_v2::types::*;
34use crate::bug_monitor::types::*;
35use crate::capability_resolver::CapabilityResolver;
36use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
37use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
38use crate::pack_manager::PackManager;
39use crate::preset_registry::PresetRegistry;
40use crate::routines::{errors::RoutineStoreError, types::*};
41use crate::runtime::{
42 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
43};
44use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
45use crate::util::{host::detect_host_runtime_context, time::now_ms};
46use crate::{
47 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
48 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
49 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
50 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
51 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
52 OptimizationPromotionDecisionKind,
53};
54
55#[derive(Clone)]
56pub struct AppState {
57 pub runtime: Arc<OnceLock<RuntimeState>>,
58 pub startup: Arc<RwLock<StartupState>>,
59 pub in_process_mode: Arc<AtomicBool>,
60 pub api_token: Arc<RwLock<Option<String>>>,
61 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
62 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
63 pub run_registry: RunRegistry,
64 pub run_stale_ms: u64,
65 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
66 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
67 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
68 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
69 pub shared_resources_path: PathBuf,
70 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
71 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
72 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
73 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
74 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
75 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
76 pub workflow_plan_drafts:
77 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
78 pub optimization_campaigns:
79 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
80 pub optimization_experiments:
81 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
82 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
83 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
84 pub bug_monitor_incidents:
85 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
86 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
87 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
88 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
89 pub workflows: Arc<RwLock<WorkflowRegistry>>,
90 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
91 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
92 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
93 pub routine_session_policies:
94 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
95 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
96 pub token_cost_per_1k_usd: f64,
97 pub routines_path: PathBuf,
98 pub routine_history_path: PathBuf,
99 pub routine_runs_path: PathBuf,
100 pub automations_v2_path: PathBuf,
101 pub automation_v2_runs_path: PathBuf,
102 pub optimization_campaigns_path: PathBuf,
103 pub optimization_experiments_path: PathBuf,
104 pub bug_monitor_config_path: PathBuf,
105 pub bug_monitor_drafts_path: PathBuf,
106 pub bug_monitor_incidents_path: PathBuf,
107 pub bug_monitor_posts_path: PathBuf,
108 pub external_actions_path: PathBuf,
109 pub workflow_runs_path: PathBuf,
110 pub workflow_hook_overrides_path: PathBuf,
111 pub agent_teams: AgentTeamRuntime,
112 pub web_ui_enabled: Arc<AtomicBool>,
113 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
114 pub server_base_url: Arc<std::sync::RwLock<String>>,
115 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
116 pub host_runtime_context: HostRuntimeContext,
117 pub pack_manager: Arc<PackManager>,
118 pub capability_resolver: Arc<CapabilityResolver>,
119 pub preset_registry: Arc<PresetRegistry>,
120}
121#[derive(Debug, Clone, Serialize, Deserialize, Default)]
122pub struct ChannelStatus {
123 pub enabled: bool,
124 pub connected: bool,
125 pub last_error: Option<String>,
126 pub active_sessions: u64,
127 pub meta: Value,
128}
129#[derive(Debug, Clone, Serialize, Deserialize, Default)]
130struct EffectiveAppConfig {
131 #[serde(default)]
132 pub channels: ChannelsConfigFile,
133 #[serde(default)]
134 pub web_ui: WebUiConfig,
135 #[serde(default)]
136 pub browser: tandem_core::BrowserConfig,
137 #[serde(default)]
138 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
139}
140
141#[derive(Default)]
142pub struct ChannelRuntime {
143 pub listeners: Option<tokio::task::JoinSet<()>>,
144 pub statuses: std::collections::HashMap<String, ChannelStatus>,
145}
146
147#[derive(Debug, Clone)]
148pub struct StatusIndexUpdate {
149 pub key: String,
150 pub value: Value,
151}
152
153impl AppState {
154 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
155 Self {
156 runtime: Arc::new(OnceLock::new()),
157 startup: Arc::new(RwLock::new(StartupState {
158 status: StartupStatus::Starting,
159 phase: "boot".to_string(),
160 started_at_ms: now_ms(),
161 attempt_id,
162 last_error: None,
163 })),
164 in_process_mode: Arc::new(AtomicBool::new(in_process)),
165 api_token: Arc::new(RwLock::new(None)),
166 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
167 managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
168 run_registry: RunRegistry::new(),
169 run_stale_ms: config::env::resolve_run_stale_ms(),
170 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
171 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
172 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
173 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
174 shared_resources_path: config::paths::resolve_shared_resources_path(),
175 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
176 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
177 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
178 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
179 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
180 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
181 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
182 optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
183 optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
184 bug_monitor_config: Arc::new(
185 RwLock::new(config::env::resolve_bug_monitor_env_config()),
186 ),
187 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
188 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
189 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
190 external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
191 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
192 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
193 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
194 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
195 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
196 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
197 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
198 routines_path: config::paths::resolve_routines_path(),
199 routine_history_path: config::paths::resolve_routine_history_path(),
200 routine_runs_path: config::paths::resolve_routine_runs_path(),
201 automations_v2_path: config::paths::resolve_automations_v2_path(),
202 automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
203 optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
204 optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
205 bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
206 bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
207 bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
208 bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
209 external_actions_path: config::paths::resolve_external_actions_path(),
210 workflow_runs_path: config::paths::resolve_workflow_runs_path(),
211 workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
212 agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
213 web_ui_enabled: Arc::new(AtomicBool::new(false)),
214 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
215 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
216 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
217 host_runtime_context: detect_host_runtime_context(),
218 token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
219 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
220 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
221 preset_registry: Arc::new(PresetRegistry::new(
222 PackManager::default_root(),
223 resolve_shared_paths()
224 .map(|paths| paths.canonical_root)
225 .unwrap_or_else(|_| {
226 dirs::home_dir()
227 .unwrap_or_else(|| PathBuf::from("."))
228 .join(".tandem")
229 }),
230 )),
231 }
232 }
233
234 pub fn is_ready(&self) -> bool {
235 self.runtime.get().is_some()
236 }
237
238 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
239 for _ in 0..attempts {
240 if self.is_ready() {
241 return true;
242 }
243 let startup = self.startup_snapshot().await;
244 if matches!(startup.status, StartupStatus::Failed) {
245 return false;
246 }
247 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
248 }
249 self.is_ready()
250 }
251
252 pub fn mode_label(&self) -> &'static str {
253 if self.in_process_mode.load(Ordering::Relaxed) {
254 "in-process"
255 } else {
256 "sidecar"
257 }
258 }
259
260 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
261 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
262 if let Ok(mut guard) = self.web_ui_prefix.write() {
263 *guard = config::webui::normalize_web_ui_prefix(&prefix);
264 }
265 }
266
267 pub fn web_ui_enabled(&self) -> bool {
268 self.web_ui_enabled.load(Ordering::Relaxed)
269 }
270
271 pub fn web_ui_prefix(&self) -> String {
272 self.web_ui_prefix
273 .read()
274 .map(|v| v.clone())
275 .unwrap_or_else(|_| "/admin".to_string())
276 }
277
278 pub fn set_server_base_url(&self, base_url: String) {
279 if let Ok(mut guard) = self.server_base_url.write() {
280 *guard = base_url;
281 }
282 }
283
284 pub fn server_base_url(&self) -> String {
285 self.server_base_url
286 .read()
287 .map(|v| v.clone())
288 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
289 }
290
291 pub async fn api_token(&self) -> Option<String> {
292 self.api_token.read().await.clone()
293 }
294
295 pub async fn set_api_token(&self, token: Option<String>) {
296 *self.api_token.write().await = token;
297 }
298
299 pub async fn startup_snapshot(&self) -> StartupSnapshot {
300 let state = self.startup.read().await.clone();
301 StartupSnapshot {
302 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
303 status: state.status,
304 phase: state.phase,
305 started_at_ms: state.started_at_ms,
306 attempt_id: state.attempt_id,
307 last_error: state.last_error,
308 }
309 }
310
311 pub fn host_runtime_context(&self) -> HostRuntimeContext {
312 self.runtime
313 .get()
314 .map(|runtime| runtime.host_runtime_context.clone())
315 .unwrap_or_else(|| self.host_runtime_context.clone())
316 }
317
318 pub async fn set_phase(&self, phase: impl Into<String>) {
319 let mut startup = self.startup.write().await;
320 startup.phase = phase.into();
321 }
322
323 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
324 self.runtime
325 .set(runtime)
326 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
327 #[cfg(feature = "browser")]
328 self.register_browser_tools().await?;
329 self.tools
330 .register_tool(
331 "pack_builder".to_string(),
332 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
333 )
334 .await;
335 self.engine_loop
336 .set_spawn_agent_hook(std::sync::Arc::new(
337 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
338 ))
339 .await;
340 self.engine_loop
341 .set_tool_policy_hook(std::sync::Arc::new(
342 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
343 ))
344 .await;
345 self.engine_loop
346 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
347 self.clone(),
348 )))
349 .await;
350 let _ = self.load_shared_resources().await;
351 self.load_routines().await?;
352 let _ = self.load_routine_history().await;
353 let _ = self.load_routine_runs().await;
354 self.load_automations_v2().await?;
355 let _ = self.load_automation_v2_runs().await;
356 let _ = self.load_optimization_campaigns().await;
357 let _ = self.load_optimization_experiments().await;
358 let _ = self.load_bug_monitor_config().await;
359 let _ = self.load_bug_monitor_drafts().await;
360 let _ = self.load_bug_monitor_incidents().await;
361 let _ = self.load_bug_monitor_posts().await;
362 let _ = self.load_external_actions().await;
363 let _ = self.load_workflow_runs().await;
364 let _ = self.load_workflow_hook_overrides().await;
365 let _ = self.reload_workflows().await;
366 let workspace_root = self.workspace_index.snapshot().await.root;
367 let _ = self
368 .agent_teams
369 .ensure_loaded_for_workspace(&workspace_root)
370 .await;
371 let mut startup = self.startup.write().await;
372 startup.status = StartupStatus::Ready;
373 startup.phase = "ready".to_string();
374 startup.last_error = None;
375 Ok(())
376 }
377
378 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
379 let mut startup = self.startup.write().await;
380 startup.status = StartupStatus::Failed;
381 startup.phase = phase.into();
382 startup.last_error = Some(error.into());
383 }
384
385 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
386 let runtime = self.channels_runtime.lock().await;
387 runtime.statuses.clone()
388 }
389
390 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
391 let effective = self.config.get_effective_value().await;
392 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
393 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
394
395 let mut runtime = self.channels_runtime.lock().await;
396 if let Some(listeners) = runtime.listeners.as_mut() {
397 listeners.abort_all();
398 }
399 runtime.listeners = None;
400 runtime.statuses.clear();
401
402 let mut status_map = std::collections::HashMap::new();
403 status_map.insert(
404 "telegram".to_string(),
405 ChannelStatus {
406 enabled: parsed.channels.telegram.is_some(),
407 connected: false,
408 last_error: None,
409 active_sessions: 0,
410 meta: serde_json::json!({}),
411 },
412 );
413 status_map.insert(
414 "discord".to_string(),
415 ChannelStatus {
416 enabled: parsed.channels.discord.is_some(),
417 connected: false,
418 last_error: None,
419 active_sessions: 0,
420 meta: serde_json::json!({}),
421 },
422 );
423 status_map.insert(
424 "slack".to_string(),
425 ChannelStatus {
426 enabled: parsed.channels.slack.is_some(),
427 connected: false,
428 last_error: None,
429 active_sessions: 0,
430 meta: serde_json::json!({}),
431 },
432 );
433
434 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
435 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
436 runtime.listeners = Some(listeners);
437 for status in status_map.values_mut() {
438 if status.enabled {
439 status.connected = true;
440 }
441 }
442 }
443
444 runtime.statuses = status_map.clone();
445 drop(runtime);
446
447 self.event_bus.publish(EngineEvent::new(
448 "channel.status.changed",
449 serde_json::json!({ "channels": status_map }),
450 ));
451 Ok(())
452 }
453
454 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
455 if !self.shared_resources_path.exists() {
456 return Ok(());
457 }
458 let raw = fs::read_to_string(&self.shared_resources_path).await?;
459 let parsed =
460 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
461 .unwrap_or_default();
462 let mut guard = self.shared_resources.write().await;
463 *guard = parsed;
464 Ok(())
465 }
466
467 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
468 if let Some(parent) = self.shared_resources_path.parent() {
469 fs::create_dir_all(parent).await?;
470 }
471 let payload = {
472 let guard = self.shared_resources.read().await;
473 serde_json::to_string_pretty(&*guard)?
474 };
475 fs::write(&self.shared_resources_path, payload).await?;
476 Ok(())
477 }
478
479 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
480 self.shared_resources.read().await.get(key).cloned()
481 }
482
483 pub async fn list_shared_resources(
484 &self,
485 prefix: Option<&str>,
486 limit: usize,
487 ) -> Vec<SharedResourceRecord> {
488 let limit = limit.clamp(1, 500);
489 let mut rows = self
490 .shared_resources
491 .read()
492 .await
493 .values()
494 .filter(|record| {
495 if let Some(prefix) = prefix {
496 record.key.starts_with(prefix)
497 } else {
498 true
499 }
500 })
501 .cloned()
502 .collect::<Vec<_>>();
503 rows.sort_by(|a, b| a.key.cmp(&b.key));
504 rows.truncate(limit);
505 rows
506 }
507
508 pub async fn put_shared_resource(
509 &self,
510 key: String,
511 value: Value,
512 if_match_rev: Option<u64>,
513 updated_by: String,
514 ttl_ms: Option<u64>,
515 ) -> Result<SharedResourceRecord, ResourceStoreError> {
516 if !is_valid_resource_key(&key) {
517 return Err(ResourceStoreError::InvalidKey { key });
518 }
519
520 let now = now_ms();
521 let mut guard = self.shared_resources.write().await;
522 let existing = guard.get(&key).cloned();
523
524 if let Some(expected) = if_match_rev {
525 let current = existing.as_ref().map(|row| row.rev);
526 if current != Some(expected) {
527 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
528 key,
529 expected_rev: Some(expected),
530 current_rev: current,
531 }));
532 }
533 }
534
535 let next_rev = existing
536 .as_ref()
537 .map(|row| row.rev.saturating_add(1))
538 .unwrap_or(1);
539
540 let record = SharedResourceRecord {
541 key: key.clone(),
542 value,
543 rev: next_rev,
544 updated_at_ms: now,
545 updated_by,
546 ttl_ms,
547 };
548
549 let previous = guard.insert(key.clone(), record.clone());
550 drop(guard);
551
552 if let Err(error) = self.persist_shared_resources().await {
553 let mut rollback = self.shared_resources.write().await;
554 if let Some(previous) = previous {
555 rollback.insert(key, previous);
556 } else {
557 rollback.remove(&key);
558 }
559 return Err(ResourceStoreError::PersistFailed {
560 message: error.to_string(),
561 });
562 }
563
564 Ok(record)
565 }
566
567 pub async fn delete_shared_resource(
568 &self,
569 key: &str,
570 if_match_rev: Option<u64>,
571 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
572 if !is_valid_resource_key(key) {
573 return Err(ResourceStoreError::InvalidKey {
574 key: key.to_string(),
575 });
576 }
577
578 let mut guard = self.shared_resources.write().await;
579 let current = guard.get(key).cloned();
580 if let Some(expected) = if_match_rev {
581 let current_rev = current.as_ref().map(|row| row.rev);
582 if current_rev != Some(expected) {
583 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
584 key: key.to_string(),
585 expected_rev: Some(expected),
586 current_rev,
587 }));
588 }
589 }
590
591 let removed = guard.remove(key);
592 drop(guard);
593
594 if let Err(error) = self.persist_shared_resources().await {
595 if let Some(record) = removed.clone() {
596 self.shared_resources
597 .write()
598 .await
599 .insert(record.key.clone(), record);
600 }
601 return Err(ResourceStoreError::PersistFailed {
602 message: error.to_string(),
603 });
604 }
605
606 Ok(removed)
607 }
608
609 pub async fn load_routines(&self) -> anyhow::Result<()> {
610 if !self.routines_path.exists() {
611 return Ok(());
612 }
613 let raw = fs::read_to_string(&self.routines_path).await?;
614 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
615 Ok(parsed) => {
616 let mut guard = self.routines.write().await;
617 *guard = parsed;
618 Ok(())
619 }
620 Err(primary_err) => {
621 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
622 if backup_path.exists() {
623 let backup_raw = fs::read_to_string(&backup_path).await?;
624 if let Ok(parsed_backup) = serde_json::from_str::<
625 std::collections::HashMap<String, RoutineSpec>,
626 >(&backup_raw)
627 {
628 let mut guard = self.routines.write().await;
629 *guard = parsed_backup;
630 return Ok(());
631 }
632 }
633 Err(anyhow::anyhow!(
634 "failed to parse routines store {}: {primary_err}",
635 self.routines_path.display()
636 ))
637 }
638 }
639 }
640
641 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
642 if !self.routine_history_path.exists() {
643 return Ok(());
644 }
645 let raw = fs::read_to_string(&self.routine_history_path).await?;
646 let parsed = serde_json::from_str::<
647 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
648 >(&raw)
649 .unwrap_or_default();
650 let mut guard = self.routine_history.write().await;
651 *guard = parsed;
652 Ok(())
653 }
654
655 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
656 if !self.routine_runs_path.exists() {
657 return Ok(());
658 }
659 let raw = fs::read_to_string(&self.routine_runs_path).await?;
660 let parsed =
661 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
662 .unwrap_or_default();
663 let mut guard = self.routine_runs.write().await;
664 *guard = parsed;
665 Ok(())
666 }
667
668 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
669 if let Some(parent) = self.routines_path.parent() {
670 fs::create_dir_all(parent).await?;
671 }
672 let (payload, is_empty) = {
673 let guard = self.routines.read().await;
674 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
675 };
676 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
677 let existing_raw = fs::read_to_string(&self.routines_path)
678 .await
679 .unwrap_or_default();
680 let existing_has_rows = serde_json::from_str::<
681 std::collections::HashMap<String, RoutineSpec>,
682 >(&existing_raw)
683 .map(|rows| !rows.is_empty())
684 .unwrap_or(true);
685 if existing_has_rows {
686 return Err(anyhow::anyhow!(
687 "refusing to overwrite non-empty routines store {} with empty in-memory state",
688 self.routines_path.display()
689 ));
690 }
691 }
692 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
693 if self.routines_path.exists() {
694 let _ = fs::copy(&self.routines_path, &backup_path).await;
695 }
696 let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
697 fs::write(&tmp_path, payload).await?;
698 fs::rename(&tmp_path, &self.routines_path).await?;
699 Ok(())
700 }
701
702 pub async fn persist_routines(&self) -> anyhow::Result<()> {
703 self.persist_routines_inner(false).await
704 }
705
706 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
707 if let Some(parent) = self.routine_history_path.parent() {
708 fs::create_dir_all(parent).await?;
709 }
710 let payload = {
711 let guard = self.routine_history.read().await;
712 serde_json::to_string_pretty(&*guard)?
713 };
714 fs::write(&self.routine_history_path, payload).await?;
715 Ok(())
716 }
717
718 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
719 if let Some(parent) = self.routine_runs_path.parent() {
720 fs::create_dir_all(parent).await?;
721 }
722 let payload = {
723 let guard = self.routine_runs.read().await;
724 serde_json::to_string_pretty(&*guard)?
725 };
726 fs::write(&self.routine_runs_path, payload).await?;
727 Ok(())
728 }
729
730 pub async fn put_routine(
731 &self,
732 mut routine: RoutineSpec,
733 ) -> Result<RoutineSpec, RoutineStoreError> {
734 if routine.routine_id.trim().is_empty() {
735 return Err(RoutineStoreError::InvalidRoutineId {
736 routine_id: routine.routine_id,
737 });
738 }
739
740 routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
741 routine.output_targets = normalize_non_empty_list(routine.output_targets);
742
743 let now = now_ms();
744 let next_schedule_fire =
745 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
746 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
747 detail: "invalid schedule or timezone".to_string(),
748 })?;
749 match routine.schedule {
750 RoutineSchedule::IntervalSeconds { seconds } => {
751 if seconds == 0 {
752 return Err(RoutineStoreError::InvalidSchedule {
753 detail: "interval_seconds must be > 0".to_string(),
754 });
755 }
756 let _ = seconds;
757 }
758 RoutineSchedule::Cron { .. } => {}
759 }
760 if routine.next_fire_at_ms.is_none() {
761 routine.next_fire_at_ms = Some(next_schedule_fire);
762 }
763
764 let mut guard = self.routines.write().await;
765 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
766 drop(guard);
767
768 if let Err(error) = self.persist_routines().await {
769 let mut rollback = self.routines.write().await;
770 if let Some(previous) = previous {
771 rollback.insert(previous.routine_id.clone(), previous);
772 } else {
773 rollback.remove(&routine.routine_id);
774 }
775 return Err(RoutineStoreError::PersistFailed {
776 message: error.to_string(),
777 });
778 }
779
780 Ok(routine)
781 }
782
783 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
784 let mut rows = self
785 .routines
786 .read()
787 .await
788 .values()
789 .cloned()
790 .collect::<Vec<_>>();
791 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
792 rows
793 }
794
795 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
796 self.routines.read().await.get(routine_id).cloned()
797 }
798
799 pub async fn delete_routine(
800 &self,
801 routine_id: &str,
802 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
803 let mut guard = self.routines.write().await;
804 let removed = guard.remove(routine_id);
805 drop(guard);
806
807 let allow_empty_overwrite = self.routines.read().await.is_empty();
808 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
809 if let Some(removed) = removed.clone() {
810 self.routines
811 .write()
812 .await
813 .insert(removed.routine_id.clone(), removed);
814 }
815 return Err(RoutineStoreError::PersistFailed {
816 message: error.to_string(),
817 });
818 }
819 Ok(removed)
820 }
821
822 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
823 let mut plans = Vec::new();
824 let mut guard = self.routines.write().await;
825 for routine in guard.values_mut() {
826 if routine.status != RoutineStatus::Active {
827 continue;
828 }
829 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
830 continue;
831 };
832 if now_ms < next_fire_at_ms {
833 continue;
834 }
835 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
836 now_ms,
837 next_fire_at_ms,
838 &routine.schedule,
839 &routine.timezone,
840 &routine.misfire_policy,
841 );
842 routine.next_fire_at_ms = Some(next_fire_at_ms);
843 if run_count == 0 {
844 continue;
845 }
846 plans.push(RoutineTriggerPlan {
847 routine_id: routine.routine_id.clone(),
848 run_count,
849 scheduled_at_ms: now_ms,
850 next_fire_at_ms,
851 });
852 }
853 drop(guard);
854 let _ = self.persist_routines().await;
855 plans
856 }
857
858 pub async fn mark_routine_fired(
859 &self,
860 routine_id: &str,
861 fired_at_ms: u64,
862 ) -> Option<RoutineSpec> {
863 let mut guard = self.routines.write().await;
864 let routine = guard.get_mut(routine_id)?;
865 routine.last_fired_at_ms = Some(fired_at_ms);
866 let updated = routine.clone();
867 drop(guard);
868 let _ = self.persist_routines().await;
869 Some(updated)
870 }
871
872 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
873 let mut history = self.routine_history.write().await;
874 history
875 .entry(event.routine_id.clone())
876 .or_default()
877 .push(event);
878 drop(history);
879 let _ = self.persist_routine_history().await;
880 }
881
882 pub async fn list_routine_history(
883 &self,
884 routine_id: &str,
885 limit: usize,
886 ) -> Vec<RoutineHistoryEvent> {
887 let limit = limit.clamp(1, 500);
888 let mut rows = self
889 .routine_history
890 .read()
891 .await
892 .get(routine_id)
893 .cloned()
894 .unwrap_or_default();
895 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
896 rows.truncate(limit);
897 rows
898 }
899
900 pub async fn create_routine_run(
901 &self,
902 routine: &RoutineSpec,
903 trigger_type: &str,
904 run_count: u32,
905 status: RoutineRunStatus,
906 detail: Option<String>,
907 ) -> RoutineRunRecord {
908 let now = now_ms();
909 let record = RoutineRunRecord {
910 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
911 routine_id: routine.routine_id.clone(),
912 trigger_type: trigger_type.to_string(),
913 run_count,
914 status,
915 created_at_ms: now,
916 updated_at_ms: now,
917 fired_at_ms: Some(now),
918 started_at_ms: None,
919 finished_at_ms: None,
920 requires_approval: routine.requires_approval,
921 approval_reason: None,
922 denial_reason: None,
923 paused_reason: None,
924 detail,
925 entrypoint: routine.entrypoint.clone(),
926 args: routine.args.clone(),
927 allowed_tools: routine.allowed_tools.clone(),
928 output_targets: routine.output_targets.clone(),
929 artifacts: Vec::new(),
930 active_session_ids: Vec::new(),
931 latest_session_id: None,
932 prompt_tokens: 0,
933 completion_tokens: 0,
934 total_tokens: 0,
935 estimated_cost_usd: 0.0,
936 };
937 self.routine_runs
938 .write()
939 .await
940 .insert(record.run_id.clone(), record.clone());
941 let _ = self.persist_routine_runs().await;
942 record
943 }
944
945 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
946 self.routine_runs.read().await.get(run_id).cloned()
947 }
948
949 pub async fn list_routine_runs(
950 &self,
951 routine_id: Option<&str>,
952 limit: usize,
953 ) -> Vec<RoutineRunRecord> {
954 let mut rows = self
955 .routine_runs
956 .read()
957 .await
958 .values()
959 .filter(|row| {
960 if let Some(id) = routine_id {
961 row.routine_id == id
962 } else {
963 true
964 }
965 })
966 .cloned()
967 .collect::<Vec<_>>();
968 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
969 rows.truncate(limit.clamp(1, 500));
970 rows
971 }
972
973 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
974 let mut guard = self.routine_runs.write().await;
975 let next_run_id = guard
976 .values()
977 .filter(|row| row.status == RoutineRunStatus::Queued)
978 .min_by(|a, b| {
979 a.created_at_ms
980 .cmp(&b.created_at_ms)
981 .then_with(|| a.run_id.cmp(&b.run_id))
982 })
983 .map(|row| row.run_id.clone())?;
984 let now = now_ms();
985 let row = guard.get_mut(&next_run_id)?;
986 row.status = RoutineRunStatus::Running;
987 row.updated_at_ms = now;
988 row.started_at_ms = Some(now);
989 let claimed = row.clone();
990 drop(guard);
991 let _ = self.persist_routine_runs().await;
992 Some(claimed)
993 }
994
995 pub async fn set_routine_session_policy(
996 &self,
997 session_id: String,
998 run_id: String,
999 routine_id: String,
1000 allowed_tools: Vec<String>,
1001 ) {
1002 let policy = RoutineSessionPolicy {
1003 session_id: session_id.clone(),
1004 run_id,
1005 routine_id,
1006 allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1007 };
1008 self.routine_session_policies
1009 .write()
1010 .await
1011 .insert(session_id, policy);
1012 }
1013
1014 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1015 self.routine_session_policies
1016 .read()
1017 .await
1018 .get(session_id)
1019 .cloned()
1020 }
1021
1022 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1023 self.routine_session_policies
1024 .write()
1025 .await
1026 .remove(session_id);
1027 }
1028
1029 pub async fn update_routine_run_status(
1030 &self,
1031 run_id: &str,
1032 status: RoutineRunStatus,
1033 reason: Option<String>,
1034 ) -> Option<RoutineRunRecord> {
1035 let mut guard = self.routine_runs.write().await;
1036 let row = guard.get_mut(run_id)?;
1037 row.status = status.clone();
1038 row.updated_at_ms = now_ms();
1039 match status {
1040 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1041 RoutineRunStatus::Running => {
1042 row.started_at_ms.get_or_insert_with(now_ms);
1043 if let Some(detail) = reason {
1044 row.detail = Some(detail);
1045 }
1046 }
1047 RoutineRunStatus::Denied => row.denial_reason = reason,
1048 RoutineRunStatus::Paused => row.paused_reason = reason,
1049 RoutineRunStatus::Completed
1050 | RoutineRunStatus::Failed
1051 | RoutineRunStatus::Cancelled => {
1052 row.finished_at_ms = Some(now_ms());
1053 if let Some(detail) = reason {
1054 row.detail = Some(detail);
1055 }
1056 }
1057 _ => {
1058 if let Some(detail) = reason {
1059 row.detail = Some(detail);
1060 }
1061 }
1062 }
1063 let updated = row.clone();
1064 drop(guard);
1065 let _ = self.persist_routine_runs().await;
1066 Some(updated)
1067 }
1068
1069 pub async fn append_routine_run_artifact(
1070 &self,
1071 run_id: &str,
1072 artifact: RoutineRunArtifact,
1073 ) -> Option<RoutineRunRecord> {
1074 let mut guard = self.routine_runs.write().await;
1075 let row = guard.get_mut(run_id)?;
1076 row.updated_at_ms = now_ms();
1077 row.artifacts.push(artifact);
1078 let updated = row.clone();
1079 drop(guard);
1080 let _ = self.persist_routine_runs().await;
1081 Some(updated)
1082 }
1083
1084 pub async fn add_active_session_id(
1085 &self,
1086 run_id: &str,
1087 session_id: String,
1088 ) -> Option<RoutineRunRecord> {
1089 let mut guard = self.routine_runs.write().await;
1090 let row = guard.get_mut(run_id)?;
1091 if !row.active_session_ids.iter().any(|id| id == &session_id) {
1092 row.active_session_ids.push(session_id);
1093 }
1094 row.latest_session_id = row.active_session_ids.last().cloned();
1095 row.updated_at_ms = now_ms();
1096 let updated = row.clone();
1097 drop(guard);
1098 let _ = self.persist_routine_runs().await;
1099 Some(updated)
1100 }
1101
1102 pub async fn clear_active_session_id(
1103 &self,
1104 run_id: &str,
1105 session_id: &str,
1106 ) -> Option<RoutineRunRecord> {
1107 let mut guard = self.routine_runs.write().await;
1108 let row = guard.get_mut(run_id)?;
1109 row.active_session_ids.retain(|id| id != session_id);
1110 row.updated_at_ms = now_ms();
1111 let updated = row.clone();
1112 drop(guard);
1113 let _ = self.persist_routine_runs().await;
1114 Some(updated)
1115 }
1116
1117 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1118 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1119 let mut loaded_from_alternate = false;
1120 let mut migrated = false;
1121 let mut path_counts = Vec::new();
1122 let mut canonical_loaded = false;
1123 if self.automations_v2_path.exists() {
1124 let raw = fs::read_to_string(&self.automations_v2_path).await?;
1125 if raw.trim().is_empty() || raw.trim() == "{}" {
1126 path_counts.push((self.automations_v2_path.clone(), 0usize));
1127 } else {
1128 let parsed = parse_automation_v2_file(&raw);
1129 path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1130 canonical_loaded = !parsed.is_empty();
1131 merged = parsed;
1132 }
1133 } else {
1134 path_counts.push((self.automations_v2_path.clone(), 0usize));
1135 }
1136 if !canonical_loaded {
1137 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1138 if path == self.automations_v2_path {
1139 continue;
1140 }
1141 if !path.exists() {
1142 path_counts.push((path, 0usize));
1143 continue;
1144 }
1145 let raw = fs::read_to_string(&path).await?;
1146 if raw.trim().is_empty() || raw.trim() == "{}" {
1147 path_counts.push((path, 0usize));
1148 continue;
1149 }
1150 let parsed = parse_automation_v2_file(&raw);
1151 path_counts.push((path.clone(), parsed.len()));
1152 if !parsed.is_empty() {
1153 loaded_from_alternate = true;
1154 }
1155 for (automation_id, automation) in parsed {
1156 match merged.get(&automation_id) {
1157 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1158 _ => {
1159 merged.insert(automation_id, automation);
1160 }
1161 }
1162 }
1163 }
1164 } else {
1165 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1166 if path == self.automations_v2_path {
1167 continue;
1168 }
1169 if !path.exists() {
1170 path_counts.push((path, 0usize));
1171 continue;
1172 }
1173 let raw = fs::read_to_string(&path).await?;
1174 let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1175 0usize
1176 } else {
1177 parse_automation_v2_file(&raw).len()
1178 };
1179 path_counts.push((path, count));
1180 }
1181 }
1182 let active_path = self.automations_v2_path.display().to_string();
1183 let path_count_summary = path_counts
1184 .iter()
1185 .map(|(path, count)| format!("{}={count}", path.display()))
1186 .collect::<Vec<_>>();
1187 tracing::info!(
1188 active_path,
1189 canonical_loaded,
1190 path_counts = ?path_count_summary,
1191 merged_count = merged.len(),
1192 "loaded automation v2 definitions"
1193 );
1194 for automation in merged.values_mut() {
1195 migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1196 }
1197 *self.automations_v2.write().await = merged;
1198 if loaded_from_alternate || migrated {
1199 let _ = self.persist_automations_v2().await;
1200 } else if canonical_loaded {
1201 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1202 }
1203 Ok(())
1204 }
1205
1206 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1207 let payload = {
1208 let guard = self.automations_v2.read().await;
1209 serde_json::to_string_pretty(&*guard)?
1210 };
1211 if let Some(parent) = self.automations_v2_path.parent() {
1212 fs::create_dir_all(parent).await?;
1213 }
1214 fs::write(&self.automations_v2_path, &payload).await?;
1215 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1216 Ok(())
1217 }
1218
1219 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1220 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1221 let mut loaded_from_alternate = false;
1222 let mut path_counts = Vec::new();
1223 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1224 if !path.exists() {
1225 path_counts.push((path, 0usize));
1226 continue;
1227 }
1228 let raw = fs::read_to_string(&path).await?;
1229 if raw.trim().is_empty() || raw.trim() == "{}" {
1230 path_counts.push((path, 0usize));
1231 continue;
1232 }
1233 let parsed = parse_automation_v2_runs_file(&raw);
1234 path_counts.push((path.clone(), parsed.len()));
1235 if path != self.automation_v2_runs_path {
1236 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1237 }
1238 for (run_id, run) in parsed {
1239 match merged.get(&run_id) {
1240 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1241 _ => {
1242 merged.insert(run_id, run);
1243 }
1244 }
1245 }
1246 }
1247 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1248 let run_path_count_summary = path_counts
1249 .iter()
1250 .map(|(path, count)| format!("{}={count}", path.display()))
1251 .collect::<Vec<_>>();
1252 tracing::info!(
1253 active_path = active_runs_path,
1254 path_counts = ?run_path_count_summary,
1255 merged_count = merged.len(),
1256 "loaded automation v2 runs"
1257 );
1258 *self.automation_v2_runs.write().await = merged;
1259 let recovered = self
1260 .recover_automation_definitions_from_run_snapshots()
1261 .await?;
1262 let automation_count = self.automations_v2.read().await.len();
1263 let run_count = self.automation_v2_runs.read().await.len();
1264 if automation_count == 0 && run_count > 0 {
1265 let active_automations_path = self.automations_v2_path.display().to_string();
1266 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1267 tracing::warn!(
1268 active_automations_path,
1269 active_runs_path,
1270 run_count,
1271 "automation v2 definitions are empty while run history exists"
1272 );
1273 }
1274 if loaded_from_alternate || recovered > 0 {
1275 let _ = self.persist_automation_v2_runs().await;
1276 }
1277 Ok(())
1278 }
1279
1280 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1281 let payload = {
1282 let guard = self.automation_v2_runs.read().await;
1283 serde_json::to_string_pretty(&*guard)?
1284 };
1285 if let Some(parent) = self.automation_v2_runs_path.parent() {
1286 fs::create_dir_all(parent).await?;
1287 }
1288 fs::write(&self.automation_v2_runs_path, &payload).await?;
1289 Ok(())
1290 }
1291
1292 pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1293 if !self.optimization_campaigns_path.exists() {
1294 return Ok(());
1295 }
1296 let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1297 let parsed = parse_optimization_campaigns_file(&raw);
1298 *self.optimization_campaigns.write().await = parsed;
1299 Ok(())
1300 }
1301
1302 pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1303 let payload = {
1304 let guard = self.optimization_campaigns.read().await;
1305 serde_json::to_string_pretty(&*guard)?
1306 };
1307 if let Some(parent) = self.optimization_campaigns_path.parent() {
1308 fs::create_dir_all(parent).await?;
1309 }
1310 fs::write(&self.optimization_campaigns_path, payload).await?;
1311 Ok(())
1312 }
1313
1314 pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1315 if !self.optimization_experiments_path.exists() {
1316 return Ok(());
1317 }
1318 let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1319 let parsed = parse_optimization_experiments_file(&raw);
1320 *self.optimization_experiments.write().await = parsed;
1321 Ok(())
1322 }
1323
1324 pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1325 let payload = {
1326 let guard = self.optimization_experiments.read().await;
1327 serde_json::to_string_pretty(&*guard)?
1328 };
1329 if let Some(parent) = self.optimization_experiments_path.parent() {
1330 fs::create_dir_all(parent).await?;
1331 }
1332 fs::write(&self.optimization_experiments_path, payload).await?;
1333 Ok(())
1334 }
1335
1336 async fn verify_automation_v2_persisted(
1337 &self,
1338 automation_id: &str,
1339 expected_present: bool,
1340 ) -> anyhow::Result<()> {
1341 let active_raw = if self.automations_v2_path.exists() {
1342 fs::read_to_string(&self.automations_v2_path).await?
1343 } else {
1344 String::new()
1345 };
1346 let active_parsed = parse_automation_v2_file(&active_raw);
1347 let active_present = active_parsed.contains_key(automation_id);
1348 if active_present != expected_present {
1349 let active_path = self.automations_v2_path.display().to_string();
1350 tracing::error!(
1351 automation_id,
1352 expected_present,
1353 actual_present = active_present,
1354 count = active_parsed.len(),
1355 active_path,
1356 "automation v2 persistence verification failed"
1357 );
1358 anyhow::bail!(
1359 "automation v2 persistence verification failed for `{}`",
1360 automation_id
1361 );
1362 }
1363 let mut alternate_mismatches = Vec::new();
1364 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1365 if path == self.automations_v2_path {
1366 continue;
1367 }
1368 let raw = if path.exists() {
1369 fs::read_to_string(&path).await?
1370 } else {
1371 String::new()
1372 };
1373 let parsed = parse_automation_v2_file(&raw);
1374 let present = parsed.contains_key(automation_id);
1375 if present != expected_present {
1376 alternate_mismatches.push(format!(
1377 "{} expected_present={} actual_present={} count={}",
1378 path.display(),
1379 expected_present,
1380 present,
1381 parsed.len()
1382 ));
1383 }
1384 }
1385 if !alternate_mismatches.is_empty() {
1386 let active_path = self.automations_v2_path.display().to_string();
1387 tracing::warn!(
1388 automation_id,
1389 expected_present,
1390 mismatches = ?alternate_mismatches,
1391 active_path,
1392 "automation v2 alternate persistence paths are stale"
1393 );
1394 }
1395 Ok(())
1396 }
1397
1398 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1399 let runs = self
1400 .automation_v2_runs
1401 .read()
1402 .await
1403 .values()
1404 .cloned()
1405 .collect::<Vec<_>>();
1406 let mut guard = self.automations_v2.write().await;
1407 let mut recovered = 0usize;
1408 for run in runs {
1409 let Some(snapshot) = run.automation_snapshot.clone() else {
1410 continue;
1411 };
1412 let should_replace = match guard.get(&run.automation_id) {
1413 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1414 None => true,
1415 };
1416 if should_replace {
1417 if !guard.contains_key(&run.automation_id) {
1418 recovered += 1;
1419 }
1420 guard.insert(run.automation_id.clone(), snapshot);
1421 }
1422 }
1423 drop(guard);
1424 if recovered > 0 {
1425 let active_path = self.automations_v2_path.display().to_string();
1426 tracing::warn!(
1427 recovered,
1428 active_path,
1429 "recovered automation v2 definitions from run snapshots"
1430 );
1431 self.persist_automations_v2().await?;
1432 }
1433 Ok(recovered)
1434 }
1435
1436 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1437 let path = if self.bug_monitor_config_path.exists() {
1438 self.bug_monitor_config_path.clone()
1439 } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1440 .exists()
1441 {
1442 config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1443 } else {
1444 return Ok(());
1445 };
1446 let raw = fs::read_to_string(path).await?;
1447 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1448 .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1449 *self.bug_monitor_config.write().await = parsed;
1450 Ok(())
1451 }
1452
1453 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1454 if let Some(parent) = self.bug_monitor_config_path.parent() {
1455 fs::create_dir_all(parent).await?;
1456 }
1457 let payload = {
1458 let guard = self.bug_monitor_config.read().await;
1459 serde_json::to_string_pretty(&*guard)?
1460 };
1461 fs::write(&self.bug_monitor_config_path, payload).await?;
1462 Ok(())
1463 }
1464
1465 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1466 self.bug_monitor_config.read().await.clone()
1467 }
1468
1469 pub async fn put_bug_monitor_config(
1470 &self,
1471 mut config: BugMonitorConfig,
1472 ) -> anyhow::Result<BugMonitorConfig> {
1473 config.workspace_root = config
1474 .workspace_root
1475 .as_ref()
1476 .map(|v| v.trim().to_string())
1477 .filter(|v| !v.is_empty());
1478 if let Some(repo) = config.repo.as_ref() {
1479 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1480 anyhow::bail!("repo must be in owner/repo format");
1481 }
1482 }
1483 if let Some(server) = config.mcp_server.as_ref() {
1484 let servers = self.mcp.list().await;
1485 if !servers.contains_key(server) {
1486 anyhow::bail!("unknown mcp server `{server}`");
1487 }
1488 }
1489 if let Some(model_policy) = config.model_policy.as_ref() {
1490 crate::http::routines_automations::validate_model_policy(model_policy)
1491 .map_err(anyhow::Error::msg)?;
1492 }
1493 config.updated_at_ms = now_ms();
1494 *self.bug_monitor_config.write().await = config.clone();
1495 self.persist_bug_monitor_config().await?;
1496 Ok(config)
1497 }
1498
1499 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1500 let path = if self.bug_monitor_drafts_path.exists() {
1501 self.bug_monitor_drafts_path.clone()
1502 } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1503 .exists()
1504 {
1505 config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1506 } else {
1507 return Ok(());
1508 };
1509 let raw = fs::read_to_string(path).await?;
1510 let parsed =
1511 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1512 .unwrap_or_default();
1513 *self.bug_monitor_drafts.write().await = parsed;
1514 Ok(())
1515 }
1516
1517 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1518 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1519 fs::create_dir_all(parent).await?;
1520 }
1521 let payload = {
1522 let guard = self.bug_monitor_drafts.read().await;
1523 serde_json::to_string_pretty(&*guard)?
1524 };
1525 fs::write(&self.bug_monitor_drafts_path, payload).await?;
1526 Ok(())
1527 }
1528
1529 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1530 let path = if self.bug_monitor_incidents_path.exists() {
1531 self.bug_monitor_incidents_path.clone()
1532 } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1533 .exists()
1534 {
1535 config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1536 } else {
1537 return Ok(());
1538 };
1539 let raw = fs::read_to_string(path).await?;
1540 let parsed = serde_json::from_str::<
1541 std::collections::HashMap<String, BugMonitorIncidentRecord>,
1542 >(&raw)
1543 .unwrap_or_default();
1544 *self.bug_monitor_incidents.write().await = parsed;
1545 Ok(())
1546 }
1547
1548 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1549 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1550 fs::create_dir_all(parent).await?;
1551 }
1552 let payload = {
1553 let guard = self.bug_monitor_incidents.read().await;
1554 serde_json::to_string_pretty(&*guard)?
1555 };
1556 fs::write(&self.bug_monitor_incidents_path, payload).await?;
1557 Ok(())
1558 }
1559
1560 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1561 let path = if self.bug_monitor_posts_path.exists() {
1562 self.bug_monitor_posts_path.clone()
1563 } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1564 .exists()
1565 {
1566 config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1567 } else {
1568 return Ok(());
1569 };
1570 let raw = fs::read_to_string(path).await?;
1571 let parsed =
1572 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1573 .unwrap_or_default();
1574 *self.bug_monitor_posts.write().await = parsed;
1575 Ok(())
1576 }
1577
1578 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1579 if let Some(parent) = self.bug_monitor_posts_path.parent() {
1580 fs::create_dir_all(parent).await?;
1581 }
1582 let payload = {
1583 let guard = self.bug_monitor_posts.read().await;
1584 serde_json::to_string_pretty(&*guard)?
1585 };
1586 fs::write(&self.bug_monitor_posts_path, payload).await?;
1587 Ok(())
1588 }
1589
1590 pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1591 if !self.external_actions_path.exists() {
1592 return Ok(());
1593 }
1594 let raw = fs::read_to_string(&self.external_actions_path).await?;
1595 let parsed =
1596 serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1597 .unwrap_or_default();
1598 *self.external_actions.write().await = parsed;
1599 Ok(())
1600 }
1601
1602 pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1603 if let Some(parent) = self.external_actions_path.parent() {
1604 fs::create_dir_all(parent).await?;
1605 }
1606 let payload = {
1607 let guard = self.external_actions.read().await;
1608 serde_json::to_string_pretty(&*guard)?
1609 };
1610 fs::write(&self.external_actions_path, payload).await?;
1611 Ok(())
1612 }
1613
1614 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1615 let mut rows = self
1616 .bug_monitor_incidents
1617 .read()
1618 .await
1619 .values()
1620 .cloned()
1621 .collect::<Vec<_>>();
1622 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1623 rows.truncate(limit.clamp(1, 200));
1624 rows
1625 }
1626
1627 pub async fn get_bug_monitor_incident(
1628 &self,
1629 incident_id: &str,
1630 ) -> Option<BugMonitorIncidentRecord> {
1631 self.bug_monitor_incidents
1632 .read()
1633 .await
1634 .get(incident_id)
1635 .cloned()
1636 }
1637
1638 pub async fn put_bug_monitor_incident(
1639 &self,
1640 incident: BugMonitorIncidentRecord,
1641 ) -> anyhow::Result<BugMonitorIncidentRecord> {
1642 self.bug_monitor_incidents
1643 .write()
1644 .await
1645 .insert(incident.incident_id.clone(), incident.clone());
1646 self.persist_bug_monitor_incidents().await?;
1647 Ok(incident)
1648 }
1649
1650 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1651 let mut rows = self
1652 .bug_monitor_posts
1653 .read()
1654 .await
1655 .values()
1656 .cloned()
1657 .collect::<Vec<_>>();
1658 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1659 rows.truncate(limit.clamp(1, 200));
1660 rows
1661 }
1662
1663 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1664 self.bug_monitor_posts.read().await.get(post_id).cloned()
1665 }
1666
1667 pub async fn put_bug_monitor_post(
1668 &self,
1669 post: BugMonitorPostRecord,
1670 ) -> anyhow::Result<BugMonitorPostRecord> {
1671 self.bug_monitor_posts
1672 .write()
1673 .await
1674 .insert(post.post_id.clone(), post.clone());
1675 self.persist_bug_monitor_posts().await?;
1676 Ok(post)
1677 }
1678
1679 pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1680 let mut rows = self
1681 .external_actions
1682 .read()
1683 .await
1684 .values()
1685 .cloned()
1686 .collect::<Vec<_>>();
1687 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1688 rows.truncate(limit.clamp(1, 200));
1689 rows
1690 }
1691
1692 pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1693 self.external_actions.read().await.get(action_id).cloned()
1694 }
1695
1696 pub async fn put_external_action(
1697 &self,
1698 action: ExternalActionRecord,
1699 ) -> anyhow::Result<ExternalActionRecord> {
1700 self.external_actions
1701 .write()
1702 .await
1703 .insert(action.action_id.clone(), action.clone());
1704 self.persist_external_actions().await?;
1705 Ok(action)
1706 }
1707
1708 pub async fn record_external_action(
1709 &self,
1710 action: ExternalActionRecord,
1711 ) -> anyhow::Result<ExternalActionRecord> {
1712 let action = self.put_external_action(action).await?;
1713 if let Some(run_id) = action.routine_run_id.as_deref() {
1714 let artifact = RoutineRunArtifact {
1715 artifact_id: format!("external-action-{}", action.action_id),
1716 uri: format!("external-action://{}", action.action_id),
1717 kind: "external_action_receipt".to_string(),
1718 label: Some(format!("external action receipt: {}", action.operation)),
1719 created_at_ms: action.updated_at_ms,
1720 metadata: Some(json!({
1721 "actionID": action.action_id,
1722 "operation": action.operation,
1723 "status": action.status,
1724 "sourceKind": action.source_kind,
1725 "sourceID": action.source_id,
1726 "capabilityID": action.capability_id,
1727 "target": action.target,
1728 })),
1729 };
1730 let _ = self
1731 .append_routine_run_artifact(run_id, artifact.clone())
1732 .await;
1733 if let Some(runtime) = self.runtime.get() {
1734 runtime.event_bus.publish(EngineEvent::new(
1735 "routine.run.artifact_added",
1736 json!({
1737 "runID": run_id,
1738 "artifact": artifact,
1739 }),
1740 ));
1741 }
1742 }
1743 if let Some(context_run_id) = action.context_run_id.as_deref() {
1744 let payload = serde_json::to_value(&action)?;
1745 if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1746 self,
1747 context_run_id,
1748 &format!("external-action-{}", action.action_id),
1749 "external_action_receipt",
1750 &format!("external-actions/{}.json", action.action_id),
1751 &payload,
1752 )
1753 .await
1754 {
1755 tracing::warn!(
1756 "failed to append external action artifact {} to context run {}: {}",
1757 action.action_id,
1758 context_run_id,
1759 error
1760 );
1761 }
1762 }
1763 Ok(action)
1764 }
1765
1766 pub async fn update_bug_monitor_runtime_status(
1767 &self,
1768 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1769 ) -> BugMonitorRuntimeStatus {
1770 let mut guard = self.bug_monitor_runtime_status.write().await;
1771 update(&mut guard);
1772 guard.clone()
1773 }
1774
1775 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1776 let mut rows = self
1777 .bug_monitor_drafts
1778 .read()
1779 .await
1780 .values()
1781 .cloned()
1782 .collect::<Vec<_>>();
1783 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1784 rows.truncate(limit.clamp(1, 200));
1785 rows
1786 }
1787
1788 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1789 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1790 }
1791
1792 pub async fn put_bug_monitor_draft(
1793 &self,
1794 draft: BugMonitorDraftRecord,
1795 ) -> anyhow::Result<BugMonitorDraftRecord> {
1796 self.bug_monitor_drafts
1797 .write()
1798 .await
1799 .insert(draft.draft_id.clone(), draft.clone());
1800 self.persist_bug_monitor_drafts().await?;
1801 Ok(draft)
1802 }
1803
1804 pub async fn submit_bug_monitor_draft(
1805 &self,
1806 mut submission: BugMonitorSubmission,
1807 ) -> anyhow::Result<BugMonitorDraftRecord> {
1808 fn normalize_optional(value: Option<String>) -> Option<String> {
1809 value
1810 .map(|v| v.trim().to_string())
1811 .filter(|v| !v.is_empty())
1812 }
1813
1814 fn compute_fingerprint(parts: &[&str]) -> String {
1815 use std::hash::{Hash, Hasher};
1816
1817 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1818 for part in parts {
1819 part.hash(&mut hasher);
1820 }
1821 format!("{:016x}", hasher.finish())
1822 }
1823
1824 submission.repo = normalize_optional(submission.repo);
1825 submission.title = normalize_optional(submission.title);
1826 submission.detail = normalize_optional(submission.detail);
1827 submission.source = normalize_optional(submission.source);
1828 submission.run_id = normalize_optional(submission.run_id);
1829 submission.session_id = normalize_optional(submission.session_id);
1830 submission.correlation_id = normalize_optional(submission.correlation_id);
1831 submission.file_name = normalize_optional(submission.file_name);
1832 submission.process = normalize_optional(submission.process);
1833 submission.component = normalize_optional(submission.component);
1834 submission.event = normalize_optional(submission.event);
1835 submission.level = normalize_optional(submission.level);
1836 submission.fingerprint = normalize_optional(submission.fingerprint);
1837 submission.excerpt = submission
1838 .excerpt
1839 .into_iter()
1840 .map(|line| line.trim_end().to_string())
1841 .filter(|line| !line.is_empty())
1842 .take(50)
1843 .collect();
1844
1845 let config = self.bug_monitor_config().await;
1846 let repo = submission
1847 .repo
1848 .clone()
1849 .or(config.repo.clone())
1850 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1851 if !is_valid_owner_repo_slug(&repo) {
1852 anyhow::bail!("Bug Monitor repo must be in owner/repo format");
1853 }
1854
1855 let title = submission.title.clone().unwrap_or_else(|| {
1856 if let Some(event) = submission.event.as_ref() {
1857 format!("Failure detected in {event}")
1858 } else if let Some(component) = submission.component.as_ref() {
1859 format!("Failure detected in {component}")
1860 } else if let Some(process) = submission.process.as_ref() {
1861 format!("Failure detected in {process}")
1862 } else if let Some(source) = submission.source.as_ref() {
1863 format!("Failure report from {source}")
1864 } else {
1865 "Failure report".to_string()
1866 }
1867 });
1868
1869 let mut detail_lines = Vec::new();
1870 if let Some(source) = submission.source.as_ref() {
1871 detail_lines.push(format!("source: {source}"));
1872 }
1873 if let Some(file_name) = submission.file_name.as_ref() {
1874 detail_lines.push(format!("file: {file_name}"));
1875 }
1876 if let Some(level) = submission.level.as_ref() {
1877 detail_lines.push(format!("level: {level}"));
1878 }
1879 if let Some(process) = submission.process.as_ref() {
1880 detail_lines.push(format!("process: {process}"));
1881 }
1882 if let Some(component) = submission.component.as_ref() {
1883 detail_lines.push(format!("component: {component}"));
1884 }
1885 if let Some(event) = submission.event.as_ref() {
1886 detail_lines.push(format!("event: {event}"));
1887 }
1888 if let Some(run_id) = submission.run_id.as_ref() {
1889 detail_lines.push(format!("run_id: {run_id}"));
1890 }
1891 if let Some(session_id) = submission.session_id.as_ref() {
1892 detail_lines.push(format!("session_id: {session_id}"));
1893 }
1894 if let Some(correlation_id) = submission.correlation_id.as_ref() {
1895 detail_lines.push(format!("correlation_id: {correlation_id}"));
1896 }
1897 if let Some(detail) = submission.detail.as_ref() {
1898 detail_lines.push(String::new());
1899 detail_lines.push(detail.clone());
1900 }
1901 if !submission.excerpt.is_empty() {
1902 if !detail_lines.is_empty() {
1903 detail_lines.push(String::new());
1904 }
1905 detail_lines.push("excerpt:".to_string());
1906 detail_lines.extend(submission.excerpt.iter().map(|line| format!(" {line}")));
1907 }
1908 let detail = if detail_lines.is_empty() {
1909 None
1910 } else {
1911 Some(detail_lines.join("\n"))
1912 };
1913
1914 let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
1915 compute_fingerprint(&[
1916 repo.as_str(),
1917 title.as_str(),
1918 detail.as_deref().unwrap_or(""),
1919 submission.source.as_deref().unwrap_or(""),
1920 submission.run_id.as_deref().unwrap_or(""),
1921 submission.session_id.as_deref().unwrap_or(""),
1922 submission.correlation_id.as_deref().unwrap_or(""),
1923 ])
1924 });
1925
1926 let mut drafts = self.bug_monitor_drafts.write().await;
1927 if let Some(existing) = drafts
1928 .values()
1929 .find(|row| row.repo == repo && row.fingerprint == fingerprint)
1930 .cloned()
1931 {
1932 return Ok(existing);
1933 }
1934
1935 let draft = BugMonitorDraftRecord {
1936 draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
1937 fingerprint,
1938 repo,
1939 status: if config.require_approval_for_new_issues {
1940 "approval_required".to_string()
1941 } else {
1942 "draft_ready".to_string()
1943 },
1944 created_at_ms: now_ms(),
1945 triage_run_id: None,
1946 issue_number: None,
1947 title: Some(title),
1948 detail,
1949 github_status: None,
1950 github_issue_url: None,
1951 github_comment_url: None,
1952 github_posted_at_ms: None,
1953 matched_issue_number: None,
1954 matched_issue_state: None,
1955 evidence_digest: None,
1956 last_post_error: None,
1957 };
1958 drafts.insert(draft.draft_id.clone(), draft.clone());
1959 drop(drafts);
1960 self.persist_bug_monitor_drafts().await?;
1961 Ok(draft)
1962 }
1963
1964 pub async fn update_bug_monitor_draft_status(
1965 &self,
1966 draft_id: &str,
1967 next_status: &str,
1968 reason: Option<&str>,
1969 ) -> anyhow::Result<BugMonitorDraftRecord> {
1970 let normalized_status = next_status.trim().to_ascii_lowercase();
1971 if normalized_status != "draft_ready" && normalized_status != "denied" {
1972 anyhow::bail!("unsupported Bug Monitor draft status");
1973 }
1974
1975 let mut drafts = self.bug_monitor_drafts.write().await;
1976 let Some(draft) = drafts.get_mut(draft_id) else {
1977 anyhow::bail!("Bug Monitor draft not found");
1978 };
1979 if !draft.status.eq_ignore_ascii_case("approval_required") {
1980 anyhow::bail!("Bug Monitor draft is not waiting for approval");
1981 }
1982 draft.status = normalized_status.clone();
1983 if let Some(reason) = reason
1984 .map(|value| value.trim())
1985 .filter(|value| !value.is_empty())
1986 {
1987 let next_detail = if let Some(detail) = draft.detail.as_ref() {
1988 format!("{detail}\n\noperator_note: {reason}")
1989 } else {
1990 format!("operator_note: {reason}")
1991 };
1992 draft.detail = Some(next_detail);
1993 }
1994 let updated = draft.clone();
1995 drop(drafts);
1996 self.persist_bug_monitor_drafts().await?;
1997
1998 let event_name = if normalized_status == "draft_ready" {
1999 "bug_monitor.draft.approved"
2000 } else {
2001 "bug_monitor.draft.denied"
2002 };
2003 self.event_bus.publish(EngineEvent::new(
2004 event_name,
2005 serde_json::json!({
2006 "draft_id": updated.draft_id,
2007 "repo": updated.repo,
2008 "status": updated.status,
2009 "reason": reason,
2010 }),
2011 ));
2012 Ok(updated)
2013 }
2014
2015 pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2016 let required_capabilities = vec![
2017 "github.list_issues".to_string(),
2018 "github.get_issue".to_string(),
2019 "github.create_issue".to_string(),
2020 "github.comment_on_issue".to_string(),
2021 ];
2022 let config = self.bug_monitor_config().await;
2023 let drafts = self.bug_monitor_drafts.read().await;
2024 let incidents = self.bug_monitor_incidents.read().await;
2025 let posts = self.bug_monitor_posts.read().await;
2026 let total_incidents = incidents.len();
2027 let pending_incidents = incidents
2028 .values()
2029 .filter(|row| {
2030 matches!(
2031 row.status.as_str(),
2032 "queued"
2033 | "draft_created"
2034 | "triage_queued"
2035 | "analysis_queued"
2036 | "triage_pending"
2037 | "issue_draft_pending"
2038 )
2039 })
2040 .count();
2041 let pending_drafts = drafts
2042 .values()
2043 .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2044 .count();
2045 let pending_posts = posts
2046 .values()
2047 .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2048 .count();
2049 let last_activity_at_ms = drafts
2050 .values()
2051 .map(|row| row.created_at_ms)
2052 .chain(posts.values().map(|row| row.updated_at_ms))
2053 .max();
2054 drop(drafts);
2055 drop(incidents);
2056 drop(posts);
2057 let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2058 runtime.paused = config.paused;
2059 runtime.total_incidents = total_incidents;
2060 runtime.pending_incidents = pending_incidents;
2061 runtime.pending_posts = pending_posts;
2062
2063 let mut status = BugMonitorStatus {
2064 config: config.clone(),
2065 runtime,
2066 pending_drafts,
2067 pending_posts,
2068 last_activity_at_ms,
2069 ..BugMonitorStatus::default()
2070 };
2071 let repo_valid = config
2072 .repo
2073 .as_ref()
2074 .map(|repo| is_valid_owner_repo_slug(repo))
2075 .unwrap_or(false);
2076 let servers = self.mcp.list().await;
2077 let selected_server = config
2078 .mcp_server
2079 .as_ref()
2080 .and_then(|name| servers.get(name))
2081 .cloned();
2082 let provider_catalog = self.providers.list().await;
2083 let selected_model = config
2084 .model_policy
2085 .as_ref()
2086 .and_then(|policy| policy.get("default_model"))
2087 .and_then(crate::app::routines::parse_model_spec);
2088 let selected_model_ready = selected_model
2089 .as_ref()
2090 .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2091 .unwrap_or(false);
2092 let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2093 self.mcp.server_tools(server_name).await
2094 } else {
2095 Vec::new()
2096 };
2097 let discovered_tools = self
2098 .capability_resolver
2099 .discover_from_runtime(selected_server_tools, Vec::new())
2100 .await;
2101 status.discovered_mcp_tools = discovered_tools
2102 .iter()
2103 .map(|row| row.tool_name.clone())
2104 .collect();
2105 let discovered_providers = discovered_tools
2106 .iter()
2107 .map(|row| row.provider.to_ascii_lowercase())
2108 .collect::<std::collections::HashSet<_>>();
2109 let provider_preference = match config.provider_preference {
2110 BugMonitorProviderPreference::OfficialGithub => {
2111 vec![
2112 "mcp".to_string(),
2113 "composio".to_string(),
2114 "arcade".to_string(),
2115 ]
2116 }
2117 BugMonitorProviderPreference::Composio => {
2118 vec![
2119 "composio".to_string(),
2120 "mcp".to_string(),
2121 "arcade".to_string(),
2122 ]
2123 }
2124 BugMonitorProviderPreference::Arcade => {
2125 vec![
2126 "arcade".to_string(),
2127 "mcp".to_string(),
2128 "composio".to_string(),
2129 ]
2130 }
2131 BugMonitorProviderPreference::Auto => {
2132 vec![
2133 "mcp".to_string(),
2134 "composio".to_string(),
2135 "arcade".to_string(),
2136 ]
2137 }
2138 };
2139 let capability_resolution = self
2140 .capability_resolver
2141 .resolve(
2142 crate::capability_resolver::CapabilityResolveInput {
2143 workflow_id: Some("bug_monitor".to_string()),
2144 required_capabilities: required_capabilities.clone(),
2145 optional_capabilities: Vec::new(),
2146 provider_preference,
2147 available_tools: discovered_tools,
2148 },
2149 Vec::new(),
2150 )
2151 .await
2152 .ok();
2153 let bindings_file = self.capability_resolver.list_bindings().await.ok();
2154 if let Some(bindings) = bindings_file.as_ref() {
2155 status.binding_source_version = bindings.builtin_version.clone();
2156 status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2157 status.selected_server_binding_candidates = bindings
2158 .bindings
2159 .iter()
2160 .filter(|binding| required_capabilities.contains(&binding.capability_id))
2161 .filter(|binding| {
2162 discovered_providers.is_empty()
2163 || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2164 })
2165 .map(|binding| {
2166 let binding_key = format!(
2167 "{}::{}",
2168 binding.capability_id,
2169 binding.tool_name.to_ascii_lowercase()
2170 );
2171 let matched = capability_resolution
2172 .as_ref()
2173 .map(|resolution| {
2174 resolution.resolved.iter().any(|row| {
2175 row.capability_id == binding.capability_id
2176 && format!(
2177 "{}::{}",
2178 row.capability_id,
2179 row.tool_name.to_ascii_lowercase()
2180 ) == binding_key
2181 })
2182 })
2183 .unwrap_or(false);
2184 BugMonitorBindingCandidate {
2185 capability_id: binding.capability_id.clone(),
2186 binding_tool_name: binding.tool_name.clone(),
2187 aliases: binding.tool_name_aliases.clone(),
2188 matched,
2189 }
2190 })
2191 .collect();
2192 status.selected_server_binding_candidates.sort_by(|a, b| {
2193 a.capability_id
2194 .cmp(&b.capability_id)
2195 .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2196 });
2197 }
2198 let capability_ready = |capability_id: &str| -> bool {
2199 capability_resolution
2200 .as_ref()
2201 .map(|resolved| {
2202 resolved
2203 .resolved
2204 .iter()
2205 .any(|row| row.capability_id == capability_id)
2206 })
2207 .unwrap_or(false)
2208 };
2209 if let Some(resolution) = capability_resolution.as_ref() {
2210 status.missing_required_capabilities = resolution.missing_required.clone();
2211 status.resolved_capabilities = resolution
2212 .resolved
2213 .iter()
2214 .map(|row| BugMonitorCapabilityMatch {
2215 capability_id: row.capability_id.clone(),
2216 provider: row.provider.clone(),
2217 tool_name: row.tool_name.clone(),
2218 binding_index: row.binding_index,
2219 })
2220 .collect();
2221 } else {
2222 status.missing_required_capabilities = required_capabilities.clone();
2223 }
2224 status.required_capabilities = BugMonitorCapabilityReadiness {
2225 github_list_issues: capability_ready("github.list_issues"),
2226 github_get_issue: capability_ready("github.get_issue"),
2227 github_create_issue: capability_ready("github.create_issue"),
2228 github_comment_on_issue: capability_ready("github.comment_on_issue"),
2229 };
2230 status.selected_model = selected_model;
2231 status.readiness = BugMonitorReadiness {
2232 config_valid: repo_valid
2233 && selected_server.is_some()
2234 && status.required_capabilities.github_list_issues
2235 && status.required_capabilities.github_get_issue
2236 && status.required_capabilities.github_create_issue
2237 && status.required_capabilities.github_comment_on_issue
2238 && selected_model_ready,
2239 repo_valid,
2240 mcp_server_present: selected_server.is_some(),
2241 mcp_connected: selected_server
2242 .as_ref()
2243 .map(|row| row.connected)
2244 .unwrap_or(false),
2245 github_read_ready: status.required_capabilities.github_list_issues
2246 && status.required_capabilities.github_get_issue,
2247 github_write_ready: status.required_capabilities.github_create_issue
2248 && status.required_capabilities.github_comment_on_issue,
2249 selected_model_ready,
2250 ingest_ready: config.enabled && !config.paused && repo_valid,
2251 publish_ready: config.enabled
2252 && !config.paused
2253 && repo_valid
2254 && selected_server
2255 .as_ref()
2256 .map(|row| row.connected)
2257 .unwrap_or(false)
2258 && status.required_capabilities.github_list_issues
2259 && status.required_capabilities.github_get_issue
2260 && status.required_capabilities.github_create_issue
2261 && status.required_capabilities.github_comment_on_issue
2262 && selected_model_ready,
2263 runtime_ready: config.enabled
2264 && !config.paused
2265 && repo_valid
2266 && selected_server
2267 .as_ref()
2268 .map(|row| row.connected)
2269 .unwrap_or(false)
2270 && status.required_capabilities.github_list_issues
2271 && status.required_capabilities.github_get_issue
2272 && status.required_capabilities.github_create_issue
2273 && status.required_capabilities.github_comment_on_issue
2274 && selected_model_ready,
2275 };
2276 if config.enabled {
2277 if config.paused {
2278 status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2279 } else if !repo_valid {
2280 status.last_error = Some("Target repo is missing or invalid.".to_string());
2281 } else if selected_server.is_none() {
2282 status.last_error = Some("Selected MCP server is missing.".to_string());
2283 } else if !status.readiness.mcp_connected {
2284 status.last_error = Some("Selected MCP server is disconnected.".to_string());
2285 } else if !selected_model_ready {
2286 status.last_error = Some(
2287 "Selected provider/model is unavailable. Bug monitor is fail-closed."
2288 .to_string(),
2289 );
2290 } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2291 let missing = if status.missing_required_capabilities.is_empty() {
2292 "unknown".to_string()
2293 } else {
2294 status.missing_required_capabilities.join(", ")
2295 };
2296 status.last_error = Some(format!(
2297 "Selected MCP server is missing required GitHub capabilities: {missing}"
2298 ));
2299 }
2300 }
2301 status.runtime.monitoring_active = status.readiness.ingest_ready;
2302 status
2303 }
2304
2305 pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2306 if !self.workflow_runs_path.exists() {
2307 return Ok(());
2308 }
2309 let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2310 let parsed =
2311 serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2312 .unwrap_or_default();
2313 *self.workflow_runs.write().await = parsed;
2314 Ok(())
2315 }
2316
2317 pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2318 if let Some(parent) = self.workflow_runs_path.parent() {
2319 fs::create_dir_all(parent).await?;
2320 }
2321 let payload = {
2322 let guard = self.workflow_runs.read().await;
2323 serde_json::to_string_pretty(&*guard)?
2324 };
2325 fs::write(&self.workflow_runs_path, payload).await?;
2326 Ok(())
2327 }
2328
2329 pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2330 if !self.workflow_hook_overrides_path.exists() {
2331 return Ok(());
2332 }
2333 let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2334 let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2335 .unwrap_or_default();
2336 *self.workflow_hook_overrides.write().await = parsed;
2337 Ok(())
2338 }
2339
2340 pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2341 if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2342 fs::create_dir_all(parent).await?;
2343 }
2344 let payload = {
2345 let guard = self.workflow_hook_overrides.read().await;
2346 serde_json::to_string_pretty(&*guard)?
2347 };
2348 fs::write(&self.workflow_hook_overrides_path, payload).await?;
2349 Ok(())
2350 }
2351
2352 pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2353 let mut sources = Vec::new();
2354 sources.push(WorkflowLoadSource {
2355 root: config::paths::resolve_builtin_workflows_dir(),
2356 kind: WorkflowSourceKind::BuiltIn,
2357 pack_id: None,
2358 });
2359
2360 let workspace_root = self.workspace_index.snapshot().await.root;
2361 sources.push(WorkflowLoadSource {
2362 root: PathBuf::from(workspace_root).join(".tandem"),
2363 kind: WorkflowSourceKind::Workspace,
2364 pack_id: None,
2365 });
2366
2367 if let Ok(packs) = self.pack_manager.list().await {
2368 for pack in packs {
2369 sources.push(WorkflowLoadSource {
2370 root: PathBuf::from(pack.install_path),
2371 kind: WorkflowSourceKind::Pack,
2372 pack_id: Some(pack.pack_id),
2373 });
2374 }
2375 }
2376
2377 let mut registry = load_workflow_registry(&sources)?;
2378 let overrides = self.workflow_hook_overrides.read().await.clone();
2379 for hook in &mut registry.hooks {
2380 if let Some(enabled) = overrides.get(&hook.binding_id) {
2381 hook.enabled = *enabled;
2382 }
2383 }
2384 for workflow in registry.workflows.values_mut() {
2385 workflow.hooks = registry
2386 .hooks
2387 .iter()
2388 .filter(|hook| hook.workflow_id == workflow.workflow_id)
2389 .cloned()
2390 .collect();
2391 }
2392 let messages = validate_workflow_registry(®istry);
2393 *self.workflows.write().await = registry;
2394 Ok(messages)
2395 }
2396
2397 pub async fn workflow_registry(&self) -> WorkflowRegistry {
2398 self.workflows.read().await.clone()
2399 }
2400
2401 pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2402 let mut rows = self
2403 .workflows
2404 .read()
2405 .await
2406 .workflows
2407 .values()
2408 .cloned()
2409 .collect::<Vec<_>>();
2410 rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2411 rows
2412 }
2413
2414 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2415 self.workflows
2416 .read()
2417 .await
2418 .workflows
2419 .get(workflow_id)
2420 .cloned()
2421 }
2422
2423 pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2424 let mut rows = self
2425 .workflows
2426 .read()
2427 .await
2428 .hooks
2429 .iter()
2430 .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2431 .cloned()
2432 .collect::<Vec<_>>();
2433 rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2434 rows
2435 }
2436
2437 pub async fn set_workflow_hook_enabled(
2438 &self,
2439 binding_id: &str,
2440 enabled: bool,
2441 ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2442 self.workflow_hook_overrides
2443 .write()
2444 .await
2445 .insert(binding_id.to_string(), enabled);
2446 self.persist_workflow_hook_overrides().await?;
2447 let _ = self.reload_workflows().await?;
2448 Ok(self
2449 .workflows
2450 .read()
2451 .await
2452 .hooks
2453 .iter()
2454 .find(|hook| hook.binding_id == binding_id)
2455 .cloned())
2456 }
2457
2458 pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2459 self.workflow_runs
2460 .write()
2461 .await
2462 .insert(run.run_id.clone(), run);
2463 self.persist_workflow_runs().await
2464 }
2465
2466 pub async fn update_workflow_run(
2467 &self,
2468 run_id: &str,
2469 update: impl FnOnce(&mut WorkflowRunRecord),
2470 ) -> Option<WorkflowRunRecord> {
2471 let mut guard = self.workflow_runs.write().await;
2472 let row = guard.get_mut(run_id)?;
2473 update(row);
2474 row.updated_at_ms = now_ms();
2475 if matches!(
2476 row.status,
2477 WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2478 ) {
2479 row.finished_at_ms.get_or_insert_with(now_ms);
2480 }
2481 let out = row.clone();
2482 drop(guard);
2483 let _ = self.persist_workflow_runs().await;
2484 Some(out)
2485 }
2486
2487 pub async fn list_workflow_runs(
2488 &self,
2489 workflow_id: Option<&str>,
2490 limit: usize,
2491 ) -> Vec<WorkflowRunRecord> {
2492 let mut rows = self
2493 .workflow_runs
2494 .read()
2495 .await
2496 .values()
2497 .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2498 .cloned()
2499 .collect::<Vec<_>>();
2500 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2501 rows.truncate(limit.clamp(1, 500));
2502 rows
2503 }
2504
2505 pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2506 self.workflow_runs.read().await.get(run_id).cloned()
2507 }
2508
2509 pub async fn put_automation_v2(
2510 &self,
2511 mut automation: AutomationV2Spec,
2512 ) -> anyhow::Result<AutomationV2Spec> {
2513 if automation.automation_id.trim().is_empty() {
2514 anyhow::bail!("automation_id is required");
2515 }
2516 for agent in &mut automation.agents {
2517 if agent.display_name.trim().is_empty() {
2518 agent.display_name = auto_generated_agent_name(&agent.agent_id);
2519 }
2520 agent.tool_policy.allowlist =
2521 config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2522 agent.tool_policy.denylist =
2523 config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2524 agent.mcp_policy.allowed_servers =
2525 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2526 agent.mcp_policy.allowed_tools = agent
2527 .mcp_policy
2528 .allowed_tools
2529 .take()
2530 .map(normalize_allowed_tools);
2531 }
2532 let now = now_ms();
2533 if automation.created_at_ms == 0 {
2534 automation.created_at_ms = now;
2535 }
2536 automation.updated_at_ms = now;
2537 if automation.next_fire_at_ms.is_none() {
2538 automation.next_fire_at_ms =
2539 automation_schedule_next_fire_at_ms(&automation.schedule, now);
2540 }
2541 migrate_bundled_studio_research_split_automation(&mut automation);
2542 self.automations_v2
2543 .write()
2544 .await
2545 .insert(automation.automation_id.clone(), automation.clone());
2546 self.persist_automations_v2().await?;
2547 self.verify_automation_v2_persisted(&automation.automation_id, true)
2548 .await?;
2549 Ok(automation)
2550 }
2551
2552 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2553 self.automations_v2.read().await.get(automation_id).cloned()
2554 }
2555
2556 pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2557 self.workflow_plans
2558 .write()
2559 .await
2560 .insert(plan.plan_id.clone(), plan);
2561 }
2562
2563 pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2564 self.workflow_plans.read().await.get(plan_id).cloned()
2565 }
2566
2567 pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2568 self.workflow_plan_drafts
2569 .write()
2570 .await
2571 .insert(draft.current_plan.plan_id.clone(), draft.clone());
2572 self.put_workflow_plan(draft.current_plan).await;
2573 }
2574
2575 pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2576 self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2577 }
2578
2579 pub async fn put_optimization_campaign(
2580 &self,
2581 mut campaign: OptimizationCampaignRecord,
2582 ) -> anyhow::Result<OptimizationCampaignRecord> {
2583 if campaign.optimization_id.trim().is_empty() {
2584 anyhow::bail!("optimization_id is required");
2585 }
2586 if campaign.source_workflow_id.trim().is_empty() {
2587 anyhow::bail!("source_workflow_id is required");
2588 }
2589 if campaign.name.trim().is_empty() {
2590 anyhow::bail!("name is required");
2591 }
2592 let now = now_ms();
2593 if campaign.created_at_ms == 0 {
2594 campaign.created_at_ms = now;
2595 }
2596 campaign.updated_at_ms = now;
2597 campaign.source_workflow_snapshot_hash =
2598 optimization_snapshot_hash(&campaign.source_workflow_snapshot);
2599 campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
2600 self.optimization_campaigns
2601 .write()
2602 .await
2603 .insert(campaign.optimization_id.clone(), campaign.clone());
2604 self.persist_optimization_campaigns().await?;
2605 Ok(campaign)
2606 }
2607
2608 pub async fn get_optimization_campaign(
2609 &self,
2610 optimization_id: &str,
2611 ) -> Option<OptimizationCampaignRecord> {
2612 self.optimization_campaigns
2613 .read()
2614 .await
2615 .get(optimization_id)
2616 .cloned()
2617 }
2618
2619 pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
2620 let mut rows = self
2621 .optimization_campaigns
2622 .read()
2623 .await
2624 .values()
2625 .cloned()
2626 .collect::<Vec<_>>();
2627 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2628 rows
2629 }
2630
2631 pub async fn put_optimization_experiment(
2632 &self,
2633 mut experiment: OptimizationExperimentRecord,
2634 ) -> anyhow::Result<OptimizationExperimentRecord> {
2635 if experiment.experiment_id.trim().is_empty() {
2636 anyhow::bail!("experiment_id is required");
2637 }
2638 if experiment.optimization_id.trim().is_empty() {
2639 anyhow::bail!("optimization_id is required");
2640 }
2641 let now = now_ms();
2642 if experiment.created_at_ms == 0 {
2643 experiment.created_at_ms = now;
2644 }
2645 experiment.updated_at_ms = now;
2646 experiment.candidate_snapshot_hash =
2647 optimization_snapshot_hash(&experiment.candidate_snapshot);
2648 self.optimization_experiments
2649 .write()
2650 .await
2651 .insert(experiment.experiment_id.clone(), experiment.clone());
2652 self.persist_optimization_experiments().await?;
2653 Ok(experiment)
2654 }
2655
2656 pub async fn get_optimization_experiment(
2657 &self,
2658 optimization_id: &str,
2659 experiment_id: &str,
2660 ) -> Option<OptimizationExperimentRecord> {
2661 self.optimization_experiments
2662 .read()
2663 .await
2664 .get(experiment_id)
2665 .filter(|row| row.optimization_id == optimization_id)
2666 .cloned()
2667 }
2668
2669 pub async fn list_optimization_experiments(
2670 &self,
2671 optimization_id: &str,
2672 ) -> Vec<OptimizationExperimentRecord> {
2673 let mut rows = self
2674 .optimization_experiments
2675 .read()
2676 .await
2677 .values()
2678 .filter(|row| row.optimization_id == optimization_id)
2679 .cloned()
2680 .collect::<Vec<_>>();
2681 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2682 rows
2683 }
2684
2685 pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
2686 self.optimization_experiments
2687 .read()
2688 .await
2689 .values()
2690 .filter(|row| row.optimization_id == optimization_id)
2691 .count()
2692 }
2693
2694 fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
2695 matches!(
2696 status,
2697 crate::AutomationRunStatus::Completed
2698 | crate::AutomationRunStatus::Blocked
2699 | crate::AutomationRunStatus::Failed
2700 | crate::AutomationRunStatus::Cancelled
2701 )
2702 }
2703
2704 fn optimization_consecutive_failure_count(
2705 experiments: &[OptimizationExperimentRecord],
2706 ) -> usize {
2707 let mut ordered = experiments.to_vec();
2708 ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
2709 ordered
2710 .iter()
2711 .rev()
2712 .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
2713 .count()
2714 }
2715
2716 fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
2717 match field {
2718 OptimizationMutableField::Objective => "objective",
2719 OptimizationMutableField::OutputContractSummaryGuidance => {
2720 "output_contract.summary_guidance"
2721 }
2722 OptimizationMutableField::TimeoutMs => "timeout_ms",
2723 OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
2724 OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
2725 }
2726 }
2727
2728 fn optimization_node_field_value(
2729 node: &crate::AutomationFlowNode,
2730 field: OptimizationMutableField,
2731 ) -> Result<Value, String> {
2732 match field {
2733 OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
2734 OptimizationMutableField::OutputContractSummaryGuidance => node
2735 .output_contract
2736 .as_ref()
2737 .and_then(|contract| contract.summary_guidance.clone())
2738 .map(Value::String)
2739 .ok_or_else(|| {
2740 format!(
2741 "node `{}` is missing output_contract.summary_guidance",
2742 node.node_id
2743 )
2744 }),
2745 OptimizationMutableField::TimeoutMs => node
2746 .timeout_ms
2747 .map(|value| json!(value))
2748 .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
2749 OptimizationMutableField::RetryPolicyMaxAttempts => node
2750 .retry_policy
2751 .as_ref()
2752 .and_then(Value::as_object)
2753 .and_then(|policy| policy.get("max_attempts"))
2754 .cloned()
2755 .ok_or_else(|| {
2756 format!(
2757 "node `{}` is missing retry_policy.max_attempts",
2758 node.node_id
2759 )
2760 }),
2761 OptimizationMutableField::RetryPolicyRetries => node
2762 .retry_policy
2763 .as_ref()
2764 .and_then(Value::as_object)
2765 .and_then(|policy| policy.get("retries"))
2766 .cloned()
2767 .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
2768 }
2769 }
2770
2771 fn set_optimization_node_field_value(
2772 node: &mut crate::AutomationFlowNode,
2773 field: OptimizationMutableField,
2774 value: &Value,
2775 ) -> Result<(), String> {
2776 match field {
2777 OptimizationMutableField::Objective => {
2778 node.objective = value
2779 .as_str()
2780 .ok_or_else(|| "objective apply value must be a string".to_string())?
2781 .to_string();
2782 }
2783 OptimizationMutableField::OutputContractSummaryGuidance => {
2784 let guidance = value
2785 .as_str()
2786 .ok_or_else(|| {
2787 "output_contract.summary_guidance apply value must be a string".to_string()
2788 })?
2789 .to_string();
2790 let contract = node.output_contract.as_mut().ok_or_else(|| {
2791 format!(
2792 "node `{}` is missing output_contract for apply",
2793 node.node_id
2794 )
2795 })?;
2796 contract.summary_guidance = Some(guidance);
2797 }
2798 OptimizationMutableField::TimeoutMs => {
2799 node.timeout_ms = Some(
2800 value
2801 .as_u64()
2802 .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
2803 );
2804 }
2805 OptimizationMutableField::RetryPolicyMaxAttempts => {
2806 let next = value.as_i64().ok_or_else(|| {
2807 "retry_policy.max_attempts apply value must be an integer".to_string()
2808 })?;
2809 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
2810 let object = policy.as_object_mut().ok_or_else(|| {
2811 format!("node `{}` retry_policy must be a JSON object", node.node_id)
2812 })?;
2813 object.insert("max_attempts".to_string(), json!(next));
2814 }
2815 OptimizationMutableField::RetryPolicyRetries => {
2816 let next = value.as_i64().ok_or_else(|| {
2817 "retry_policy.retries apply value must be an integer".to_string()
2818 })?;
2819 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
2820 let object = policy.as_object_mut().ok_or_else(|| {
2821 format!("node `{}` retry_policy must be a JSON object", node.node_id)
2822 })?;
2823 object.insert("retries".to_string(), json!(next));
2824 }
2825 }
2826 Ok(())
2827 }
2828
2829 fn append_optimization_apply_metadata(
2830 metadata: Option<Value>,
2831 record: Value,
2832 ) -> Result<Option<Value>, String> {
2833 let mut root = match metadata {
2834 Some(Value::Object(map)) => map,
2835 Some(_) => return Err("automation metadata must be a JSON object".to_string()),
2836 None => serde_json::Map::new(),
2837 };
2838 let history = root
2839 .entry("optimization_apply_history".to_string())
2840 .or_insert_with(|| Value::Array(Vec::new()));
2841 let Some(entries) = history.as_array_mut() else {
2842 return Err("optimization_apply_history metadata must be an array".to_string());
2843 };
2844 entries.push(record.clone());
2845 root.insert("last_optimization_apply".to_string(), record);
2846 Ok(Some(Value::Object(root)))
2847 }
2848
2849 fn build_optimization_apply_patch(
2850 baseline: &crate::AutomationV2Spec,
2851 candidate: &crate::AutomationV2Spec,
2852 mutation: &crate::OptimizationValidatedMutation,
2853 approved_at_ms: u64,
2854 ) -> Result<Value, String> {
2855 let baseline_node = baseline
2856 .flow
2857 .nodes
2858 .iter()
2859 .find(|node| node.node_id == mutation.node_id)
2860 .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
2861 let candidate_node = candidate
2862 .flow
2863 .nodes
2864 .iter()
2865 .find(|node| node.node_id == mutation.node_id)
2866 .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
2867 let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
2868 let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
2869 Ok(json!({
2870 "node_id": mutation.node_id,
2871 "field": mutation.field,
2872 "field_path": Self::optimization_mutation_field_path(mutation.field),
2873 "expected_before": before,
2874 "apply_value": after,
2875 "approved_at_ms": approved_at_ms,
2876 }))
2877 }
2878
2879 pub async fn apply_optimization_winner(
2880 &self,
2881 optimization_id: &str,
2882 experiment_id: &str,
2883 ) -> Result<
2884 (
2885 OptimizationCampaignRecord,
2886 OptimizationExperimentRecord,
2887 crate::AutomationV2Spec,
2888 ),
2889 String,
2890 > {
2891 let campaign = self
2892 .get_optimization_campaign(optimization_id)
2893 .await
2894 .ok_or_else(|| "optimization not found".to_string())?;
2895 let mut experiment = self
2896 .get_optimization_experiment(optimization_id, experiment_id)
2897 .await
2898 .ok_or_else(|| "experiment not found".to_string())?;
2899 if experiment.status != OptimizationExperimentStatus::PromotionApproved {
2900 return Err("only approved winner experiments may be applied".to_string());
2901 }
2902 if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
2903 return Err(
2904 "only the latest approved winner may be applied to the live workflow".to_string(),
2905 );
2906 }
2907 let patch = experiment
2908 .metadata
2909 .as_ref()
2910 .and_then(|metadata| metadata.get("apply_patch"))
2911 .cloned()
2912 .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
2913 let node_id = patch
2914 .get("node_id")
2915 .and_then(Value::as_str)
2916 .map(str::to_string)
2917 .filter(|value| !value.is_empty())
2918 .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
2919 let field: OptimizationMutableField = serde_json::from_value(
2920 patch
2921 .get("field")
2922 .cloned()
2923 .ok_or_else(|| "apply_patch.field is required".to_string())?,
2924 )
2925 .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
2926 let expected_before = patch
2927 .get("expected_before")
2928 .cloned()
2929 .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
2930 let apply_value = patch
2931 .get("apply_value")
2932 .cloned()
2933 .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
2934 let mut live = self
2935 .get_automation_v2(&campaign.source_workflow_id)
2936 .await
2937 .ok_or_else(|| "source workflow not found".to_string())?;
2938 let current_value = {
2939 let live_node = live
2940 .flow
2941 .nodes
2942 .iter()
2943 .find(|node| node.node_id == node_id)
2944 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
2945 Self::optimization_node_field_value(live_node, field)?
2946 };
2947 if current_value != expected_before {
2948 return Err(format!(
2949 "live workflow drift detected for node `{node_id}` {}",
2950 Self::optimization_mutation_field_path(field)
2951 ));
2952 }
2953 let live_node = live
2954 .flow
2955 .nodes
2956 .iter_mut()
2957 .find(|node| node.node_id == node_id)
2958 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
2959 Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
2960 let applied_at_ms = now_ms();
2961 let apply_record = json!({
2962 "optimization_id": campaign.optimization_id,
2963 "experiment_id": experiment.experiment_id,
2964 "node_id": node_id,
2965 "field": field,
2966 "field_path": Self::optimization_mutation_field_path(field),
2967 "previous_value": expected_before,
2968 "new_value": apply_value,
2969 "applied_at_ms": applied_at_ms,
2970 });
2971 live.metadata =
2972 Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
2973 let stored_live = self
2974 .put_automation_v2(live)
2975 .await
2976 .map_err(|error| error.to_string())?;
2977 let mut metadata = match experiment.metadata.take() {
2978 Some(Value::Object(map)) => map,
2979 Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
2980 None => serde_json::Map::new(),
2981 };
2982 metadata.insert(
2983 "applied_to_live".to_string(),
2984 json!({
2985 "automation_id": stored_live.automation_id,
2986 "applied_at_ms": applied_at_ms,
2987 "field": field,
2988 "node_id": node_id,
2989 }),
2990 );
2991 experiment.metadata = Some(Value::Object(metadata));
2992 let stored_experiment = self
2993 .put_optimization_experiment(experiment)
2994 .await
2995 .map_err(|error| error.to_string())?;
2996 Ok((campaign, stored_experiment, stored_live))
2997 }
2998
2999 fn optimization_objective_hint(text: &str) -> String {
3000 let cleaned = text
3001 .lines()
3002 .map(str::trim)
3003 .filter(|line| !line.is_empty() && !line.starts_with('#'))
3004 .collect::<Vec<_>>()
3005 .join(" ");
3006 let hint = if cleaned.is_empty() {
3007 "Prioritize validator-complete output with explicit evidence."
3008 } else {
3009 cleaned.as_str()
3010 };
3011 let trimmed = hint.trim();
3012 let clipped = if trimmed.len() > 140 {
3013 trimmed[..140].trim_end()
3014 } else {
3015 trimmed
3016 };
3017 let mut sentence = clipped.trim_end_matches('.').to_string();
3018 if sentence.is_empty() {
3019 sentence = "Prioritize validator-complete output with explicit evidence".to_string();
3020 }
3021 sentence.push('.');
3022 sentence
3023 }
3024
3025 fn build_phase1_candidate_options(
3026 baseline: &crate::AutomationV2Spec,
3027 phase1: &crate::OptimizationPhase1Config,
3028 ) -> Vec<(
3029 crate::AutomationV2Spec,
3030 crate::OptimizationValidatedMutation,
3031 )> {
3032 let mut options = Vec::new();
3033 let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
3034 for (index, node) in baseline.flow.nodes.iter().enumerate() {
3035 if phase1
3036 .mutation_policy
3037 .allowed_text_fields
3038 .contains(&OptimizationMutableField::Objective)
3039 {
3040 let addition = if node.objective.contains(&hint) {
3041 "Prioritize validator-complete output with concrete evidence."
3042 } else {
3043 &hint
3044 };
3045 let mut candidate = baseline.clone();
3046 candidate.flow.nodes[index].objective =
3047 format!("{} {}", node.objective.trim(), addition.trim())
3048 .trim()
3049 .to_string();
3050 if let Ok(validated) =
3051 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3052 {
3053 options.push((candidate, validated));
3054 }
3055 }
3056 if phase1
3057 .mutation_policy
3058 .allowed_text_fields
3059 .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
3060 {
3061 if let Some(summary_guidance) = node
3062 .output_contract
3063 .as_ref()
3064 .and_then(|contract| contract.summary_guidance.as_ref())
3065 {
3066 let addition = if summary_guidance.contains("Cite concrete evidence") {
3067 "Keep evidence explicit."
3068 } else {
3069 "Cite concrete evidence in the summary."
3070 };
3071 let mut candidate = baseline.clone();
3072 if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
3073 contract.summary_guidance = Some(
3074 format!("{} {}", summary_guidance.trim(), addition)
3075 .trim()
3076 .to_string(),
3077 );
3078 }
3079 if let Ok(validated) =
3080 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3081 {
3082 options.push((candidate, validated));
3083 }
3084 }
3085 }
3086 if phase1
3087 .mutation_policy
3088 .allowed_knob_fields
3089 .contains(&OptimizationMutableField::TimeoutMs)
3090 {
3091 if let Some(timeout_ms) = node.timeout_ms {
3092 let delta_by_percent = ((timeout_ms as f64)
3093 * phase1.mutation_policy.timeout_delta_percent)
3094 .round() as u64;
3095 let delta = delta_by_percent
3096 .min(phase1.mutation_policy.timeout_delta_ms)
3097 .max(1);
3098 let next = timeout_ms
3099 .saturating_add(delta)
3100 .min(phase1.mutation_policy.timeout_max_ms);
3101 if next != timeout_ms {
3102 let mut candidate = baseline.clone();
3103 candidate.flow.nodes[index].timeout_ms = Some(next);
3104 if let Ok(validated) =
3105 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3106 {
3107 options.push((candidate, validated));
3108 }
3109 }
3110 }
3111 }
3112 if phase1
3113 .mutation_policy
3114 .allowed_knob_fields
3115 .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
3116 {
3117 let current = node
3118 .retry_policy
3119 .as_ref()
3120 .and_then(Value::as_object)
3121 .and_then(|row| row.get("max_attempts"))
3122 .and_then(Value::as_i64);
3123 if let Some(before) = current {
3124 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3125 if next != before {
3126 let mut candidate = baseline.clone();
3127 let policy = candidate.flow.nodes[index]
3128 .retry_policy
3129 .get_or_insert_with(|| json!({}));
3130 if let Some(object) = policy.as_object_mut() {
3131 object.insert("max_attempts".to_string(), json!(next));
3132 }
3133 if let Ok(validated) =
3134 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3135 {
3136 options.push((candidate, validated));
3137 }
3138 }
3139 }
3140 }
3141 if phase1
3142 .mutation_policy
3143 .allowed_knob_fields
3144 .contains(&OptimizationMutableField::RetryPolicyRetries)
3145 {
3146 let current = node
3147 .retry_policy
3148 .as_ref()
3149 .and_then(Value::as_object)
3150 .and_then(|row| row.get("retries"))
3151 .and_then(Value::as_i64);
3152 if let Some(before) = current {
3153 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3154 if next != before {
3155 let mut candidate = baseline.clone();
3156 let policy = candidate.flow.nodes[index]
3157 .retry_policy
3158 .get_or_insert_with(|| json!({}));
3159 if let Some(object) = policy.as_object_mut() {
3160 object.insert("retries".to_string(), json!(next));
3161 }
3162 if let Ok(validated) =
3163 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3164 {
3165 options.push((candidate, validated));
3166 }
3167 }
3168 }
3169 }
3170 }
3171 options
3172 }
3173
3174 async fn maybe_queue_phase1_candidate_experiment(
3175 &self,
3176 campaign: &mut OptimizationCampaignRecord,
3177 ) -> Result<bool, String> {
3178 let Some(phase1) = campaign.phase1.as_ref() else {
3179 return Ok(false);
3180 };
3181 let experiment_count = self
3182 .count_optimization_experiments(&campaign.optimization_id)
3183 .await;
3184 if experiment_count >= phase1.budget.max_experiments as usize {
3185 campaign.status = OptimizationCampaignStatus::Completed;
3186 campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
3187 campaign.updated_at_ms = now_ms();
3188 return Ok(true);
3189 }
3190 if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
3191 {
3192 return Ok(false);
3193 }
3194 let existing = self
3195 .list_optimization_experiments(&campaign.optimization_id)
3196 .await;
3197 let active_eval_exists = existing.iter().any(|experiment| {
3198 matches!(experiment.status, OptimizationExperimentStatus::Draft)
3199 && experiment
3200 .metadata
3201 .as_ref()
3202 .and_then(|metadata| metadata.get("eval_run_id"))
3203 .and_then(Value::as_str)
3204 .is_some()
3205 });
3206 if active_eval_exists {
3207 return Ok(false);
3208 }
3209 let existing_hashes = existing
3210 .iter()
3211 .map(|experiment| experiment.candidate_snapshot_hash.clone())
3212 .collect::<std::collections::HashSet<_>>();
3213 let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
3214 let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
3215 !existing_hashes.contains(&optimization_snapshot_hash(candidate))
3216 }) else {
3217 campaign.status = OptimizationCampaignStatus::Completed;
3218 campaign.last_pause_reason = Some(
3219 "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
3220 );
3221 campaign.updated_at_ms = now_ms();
3222 return Ok(true);
3223 };
3224 let eval_run = self
3225 .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
3226 .await
3227 .map_err(|error| error.to_string())?;
3228 let now = now_ms();
3229 let experiment = OptimizationExperimentRecord {
3230 experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
3231 optimization_id: campaign.optimization_id.clone(),
3232 status: OptimizationExperimentStatus::Draft,
3233 candidate_snapshot: candidate_snapshot.clone(),
3234 candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
3235 baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
3236 mutation_summary: Some(mutation.summary.clone()),
3237 metrics: None,
3238 phase1_metrics: None,
3239 promotion_recommendation: None,
3240 promotion_decision: None,
3241 created_at_ms: now,
3242 updated_at_ms: now,
3243 metadata: Some(json!({
3244 "generator": "phase1_deterministic_v1",
3245 "eval_run_id": eval_run.run_id,
3246 "mutation": mutation,
3247 })),
3248 };
3249 self.put_optimization_experiment(experiment)
3250 .await
3251 .map_err(|error| error.to_string())?;
3252 campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
3253 campaign.updated_at_ms = now_ms();
3254 Ok(true)
3255 }
3256
3257 async fn reconcile_phase1_candidate_experiments(
3258 &self,
3259 campaign: &mut OptimizationCampaignRecord,
3260 ) -> Result<bool, String> {
3261 let Some(phase1) = campaign.phase1.as_ref() else {
3262 return Ok(false);
3263 };
3264 let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
3265 return Ok(false);
3266 };
3267 let experiments = self
3268 .list_optimization_experiments(&campaign.optimization_id)
3269 .await;
3270 let mut changed = false;
3271 for mut experiment in experiments {
3272 if experiment.status != OptimizationExperimentStatus::Draft {
3273 continue;
3274 }
3275 let Some(eval_run_id) = experiment
3276 .metadata
3277 .as_ref()
3278 .and_then(|metadata| metadata.get("eval_run_id"))
3279 .and_then(Value::as_str)
3280 .map(str::to_string)
3281 else {
3282 continue;
3283 };
3284 let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
3285 continue;
3286 };
3287 if !Self::automation_run_is_terminal(&run.status) {
3288 continue;
3289 }
3290 if run.status != crate::AutomationRunStatus::Completed {
3291 experiment.status = OptimizationExperimentStatus::Failed;
3292 let mut metadata = match experiment.metadata.take() {
3293 Some(Value::Object(map)) => map,
3294 Some(_) => serde_json::Map::new(),
3295 None => serde_json::Map::new(),
3296 };
3297 metadata.insert(
3298 "eval_failure".to_string(),
3299 json!({
3300 "run_id": run.run_id,
3301 "status": run.status,
3302 }),
3303 );
3304 experiment.metadata = Some(Value::Object(metadata));
3305 self.put_optimization_experiment(experiment)
3306 .await
3307 .map_err(|error| error.to_string())?;
3308 changed = true;
3309 continue;
3310 }
3311 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3312 experiment.status = OptimizationExperimentStatus::Failed;
3313 let mut metadata = match experiment.metadata.take() {
3314 Some(Value::Object(map)) => map,
3315 Some(_) => serde_json::Map::new(),
3316 None => serde_json::Map::new(),
3317 };
3318 metadata.insert(
3319 "eval_failure".to_string(),
3320 json!({
3321 "run_id": run.run_id,
3322 "status": run.status,
3323 "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
3324 }),
3325 );
3326 experiment.metadata = Some(Value::Object(metadata));
3327 self.put_optimization_experiment(experiment)
3328 .await
3329 .map_err(|error| error.to_string())?;
3330 changed = true;
3331 continue;
3332 }
3333 let metrics =
3334 match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
3335 Ok(metrics) => metrics,
3336 Err(error) => {
3337 experiment.status = OptimizationExperimentStatus::Failed;
3338 let mut metadata = match experiment.metadata.take() {
3339 Some(Value::Object(map)) => map,
3340 Some(_) => serde_json::Map::new(),
3341 None => serde_json::Map::new(),
3342 };
3343 metadata.insert(
3344 "eval_failure".to_string(),
3345 json!({
3346 "run_id": run.run_id,
3347 "status": run.status,
3348 "reason": error,
3349 }),
3350 );
3351 experiment.metadata = Some(Value::Object(metadata));
3352 self.put_optimization_experiment(experiment)
3353 .await
3354 .map_err(|error| error.to_string())?;
3355 changed = true;
3356 continue;
3357 }
3358 };
3359 let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
3360 experiment.phase1_metrics = Some(metrics.clone());
3361 experiment.metrics = Some(json!({
3362 "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
3363 "unmet_requirement_count": metrics.unmet_requirement_count,
3364 "blocked_node_rate": metrics.blocked_node_rate,
3365 "budget_within_limits": metrics.budget_within_limits,
3366 }));
3367 experiment.promotion_recommendation = Some(
3368 match decision.decision {
3369 OptimizationPromotionDecisionKind::Promote => "promote",
3370 OptimizationPromotionDecisionKind::Discard => "discard",
3371 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3372 "needs_operator_review"
3373 }
3374 }
3375 .to_string(),
3376 );
3377 experiment.promotion_decision = Some(decision.clone());
3378 match decision.decision {
3379 OptimizationPromotionDecisionKind::Promote
3380 | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3381 experiment.status = OptimizationExperimentStatus::PromotionRecommended;
3382 campaign.pending_promotion_experiment_id =
3383 Some(experiment.experiment_id.clone());
3384 campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
3385 campaign.last_pause_reason = Some(decision.reason.clone());
3386 }
3387 OptimizationPromotionDecisionKind::Discard => {
3388 experiment.status = OptimizationExperimentStatus::Discarded;
3389 if campaign.status == OptimizationCampaignStatus::Running {
3390 campaign.last_pause_reason = Some(decision.reason.clone());
3391 }
3392 }
3393 }
3394 self.put_optimization_experiment(experiment)
3395 .await
3396 .map_err(|error| error.to_string())?;
3397 changed = true;
3398 }
3399 let refreshed = self
3400 .list_optimization_experiments(&campaign.optimization_id)
3401 .await;
3402 let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
3403 if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
3404 && phase1.budget.max_consecutive_failures > 0
3405 {
3406 campaign.status = OptimizationCampaignStatus::Failed;
3407 campaign.last_pause_reason = Some(format!(
3408 "phase 1 candidate evaluations reached {} consecutive failures",
3409 consecutive_failures
3410 ));
3411 changed = true;
3412 }
3413 Ok(changed)
3414 }
3415
3416 async fn reconcile_pending_baseline_replays(
3417 &self,
3418 campaign: &mut OptimizationCampaignRecord,
3419 ) -> Result<bool, String> {
3420 let Some(phase1) = campaign.phase1.as_ref() else {
3421 return Ok(false);
3422 };
3423 let mut changed = false;
3424 let mut remaining = Vec::new();
3425 for run_id in campaign.pending_baseline_run_ids.clone() {
3426 let Some(run) = self.get_automation_v2_run(&run_id).await else {
3427 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3428 campaign.last_pause_reason = Some(format!(
3429 "baseline replay run `{run_id}` was not found during optimization reconciliation"
3430 ));
3431 changed = true;
3432 continue;
3433 };
3434 if !Self::automation_run_is_terminal(&run.status) {
3435 remaining.push(run_id);
3436 continue;
3437 }
3438 if run.status != crate::AutomationRunStatus::Completed {
3439 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3440 campaign.last_pause_reason = Some(format!(
3441 "baseline replay run `{}` finished with status `{:?}`",
3442 run.run_id, run.status
3443 ));
3444 changed = true;
3445 continue;
3446 }
3447 if run.automation_id != campaign.source_workflow_id {
3448 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3449 campaign.last_pause_reason = Some(
3450 "baseline replay run must belong to the optimization source workflow"
3451 .to_string(),
3452 );
3453 changed = true;
3454 continue;
3455 }
3456 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3457 "baseline replay run must include an automation snapshot".to_string()
3458 })?;
3459 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3460 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3461 campaign.last_pause_reason = Some(
3462 "baseline replay run does not match the current campaign baseline snapshot"
3463 .to_string(),
3464 );
3465 changed = true;
3466 continue;
3467 }
3468 let metrics =
3469 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3470 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3471 campaign
3472 .baseline_replays
3473 .push(OptimizationBaselineReplayRecord {
3474 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3475 automation_run_id: Some(run.run_id.clone()),
3476 phase1_metrics: metrics,
3477 validator_case_outcomes,
3478 experiment_count_at_recording: self
3479 .count_optimization_experiments(&campaign.optimization_id)
3480 .await as u64,
3481 recorded_at_ms: now_ms(),
3482 });
3483 changed = true;
3484 }
3485 if remaining != campaign.pending_baseline_run_ids {
3486 campaign.pending_baseline_run_ids = remaining;
3487 changed = true;
3488 }
3489 Ok(changed)
3490 }
3491
3492 pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
3493 let campaigns = self.list_optimization_campaigns().await;
3494 let mut updated = 0usize;
3495 for campaign in campaigns {
3496 let Some(mut latest) = self
3497 .get_optimization_campaign(&campaign.optimization_id)
3498 .await
3499 else {
3500 continue;
3501 };
3502 let Some(phase1) = latest.phase1.clone() else {
3503 continue;
3504 };
3505 let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
3506 changed |= self
3507 .reconcile_phase1_candidate_experiments(&mut latest)
3508 .await?;
3509 let experiment_count = self
3510 .count_optimization_experiments(&latest.optimization_id)
3511 .await;
3512 if latest.pending_baseline_run_ids.is_empty() {
3513 if phase1_baseline_replay_due(
3514 &latest.baseline_replays,
3515 latest.pending_baseline_run_ids.len(),
3516 &phase1,
3517 experiment_count,
3518 now_ms(),
3519 ) {
3520 if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
3521 latest.status = OptimizationCampaignStatus::Draft;
3522 changed = true;
3523 }
3524 } else if latest.baseline_replays.len()
3525 >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
3526 {
3527 match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
3528 Ok(metrics) => {
3529 if latest.baseline_metrics.as_ref() != Some(&metrics) {
3530 latest.baseline_metrics = Some(metrics);
3531 changed = true;
3532 }
3533 if matches!(
3534 latest.status,
3535 OptimizationCampaignStatus::Draft
3536 | OptimizationCampaignStatus::PausedEvaluatorUnstable
3537 ) || (latest.status == OptimizationCampaignStatus::Running
3538 && latest.last_pause_reason.is_some())
3539 {
3540 latest.status = OptimizationCampaignStatus::Running;
3541 latest.last_pause_reason = None;
3542 changed = true;
3543 }
3544 }
3545 Err(error) => {
3546 if matches!(
3547 latest.status,
3548 OptimizationCampaignStatus::Draft
3549 | OptimizationCampaignStatus::Running
3550 | OptimizationCampaignStatus::PausedEvaluatorUnstable
3551 ) && (latest.status
3552 != OptimizationCampaignStatus::PausedEvaluatorUnstable
3553 || latest.last_pause_reason.as_deref() != Some(error.as_str()))
3554 {
3555 latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3556 latest.last_pause_reason = Some(error);
3557 changed = true;
3558 }
3559 }
3560 }
3561 }
3562 } else if latest.last_pause_reason.as_deref()
3563 != Some("waiting for phase 1 baseline replay completion")
3564 {
3565 latest.last_pause_reason =
3566 Some("waiting for phase 1 baseline replay completion".to_string());
3567 changed = true;
3568 }
3569 if latest.status == OptimizationCampaignStatus::Running
3570 && latest.pending_baseline_run_ids.is_empty()
3571 {
3572 changed |= self
3573 .maybe_queue_phase1_candidate_experiment(&mut latest)
3574 .await?;
3575 }
3576 if changed {
3577 self.put_optimization_campaign(latest)
3578 .await
3579 .map_err(|error| error.to_string())?;
3580 updated = updated.saturating_add(1);
3581 }
3582 }
3583 Ok(updated)
3584 }
3585
3586 async fn maybe_queue_phase1_baseline_replay(
3587 &self,
3588 campaign: &mut OptimizationCampaignRecord,
3589 ) -> Result<bool, String> {
3590 let Some(phase1) = campaign.phase1.as_ref() else {
3591 return Ok(false);
3592 };
3593 if !campaign.pending_baseline_run_ids.is_empty() {
3594 campaign.last_pause_reason =
3595 Some("waiting for phase 1 baseline replay completion".into());
3596 campaign.updated_at_ms = now_ms();
3597 return Ok(true);
3598 }
3599 let experiment_count = self
3600 .count_optimization_experiments(&campaign.optimization_id)
3601 .await;
3602 if !phase1_baseline_replay_due(
3603 &campaign.baseline_replays,
3604 campaign.pending_baseline_run_ids.len(),
3605 phase1,
3606 experiment_count,
3607 now_ms(),
3608 ) {
3609 return Ok(false);
3610 }
3611 let replay_run = self
3612 .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
3613 .await
3614 .map_err(|error| error.to_string())?;
3615 if !campaign
3616 .pending_baseline_run_ids
3617 .iter()
3618 .any(|value| value == &replay_run.run_id)
3619 {
3620 campaign
3621 .pending_baseline_run_ids
3622 .push(replay_run.run_id.clone());
3623 }
3624 campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
3625 campaign.updated_at_ms = now_ms();
3626 Ok(true)
3627 }
3628
3629 async fn maybe_queue_initial_phase1_baseline_replay(
3630 &self,
3631 campaign: &mut OptimizationCampaignRecord,
3632 ) -> Result<bool, String> {
3633 let Some(phase1) = campaign.phase1.as_ref() else {
3634 return Ok(false);
3635 };
3636 let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
3637 if campaign.baseline_replays.len() >= required_runs {
3638 return Ok(false);
3639 }
3640 self.maybe_queue_phase1_baseline_replay(campaign).await
3641 }
3642
3643 pub async fn apply_optimization_action(
3644 &self,
3645 optimization_id: &str,
3646 action: &str,
3647 experiment_id: Option<&str>,
3648 run_id: Option<&str>,
3649 reason: Option<&str>,
3650 ) -> Result<OptimizationCampaignRecord, String> {
3651 let normalized = action.trim().to_ascii_lowercase();
3652 let mut campaign = self
3653 .get_optimization_campaign(optimization_id)
3654 .await
3655 .ok_or_else(|| "optimization not found".to_string())?;
3656 match normalized.as_str() {
3657 "start" => {
3658 if campaign.phase1.is_some() {
3659 if self
3660 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3661 .await?
3662 {
3663 campaign.status = OptimizationCampaignStatus::Draft;
3664 } else {
3665 let phase1 = campaign
3666 .phase1
3667 .as_ref()
3668 .ok_or_else(|| "phase 1 config is required".to_string())?;
3669 match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
3670 Ok(metrics) => {
3671 campaign.baseline_metrics = Some(metrics);
3672 campaign.status = OptimizationCampaignStatus::Running;
3673 campaign.last_pause_reason = None;
3674 }
3675 Err(error) => {
3676 campaign.status =
3677 OptimizationCampaignStatus::PausedEvaluatorUnstable;
3678 campaign.last_pause_reason = Some(error);
3679 }
3680 }
3681 }
3682 } else {
3683 campaign.status = OptimizationCampaignStatus::Running;
3684 campaign.last_pause_reason = None;
3685 }
3686 }
3687 "pause" => {
3688 campaign.status = OptimizationCampaignStatus::PausedManual;
3689 campaign.last_pause_reason = reason
3690 .map(str::trim)
3691 .filter(|value| !value.is_empty())
3692 .map(str::to_string);
3693 }
3694 "resume" => {
3695 if self
3696 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3697 .await?
3698 {
3699 campaign.status = OptimizationCampaignStatus::Draft;
3700 } else {
3701 campaign.status = OptimizationCampaignStatus::Running;
3702 campaign.last_pause_reason = None;
3703 }
3704 }
3705 "queue_baseline_replay" => {
3706 let replay_run = self
3707 .create_automation_v2_run(
3708 &campaign.baseline_snapshot,
3709 "optimization_baseline_replay",
3710 )
3711 .await
3712 .map_err(|error| error.to_string())?;
3713 if !campaign
3714 .pending_baseline_run_ids
3715 .iter()
3716 .any(|value| value == &replay_run.run_id)
3717 {
3718 campaign
3719 .pending_baseline_run_ids
3720 .push(replay_run.run_id.clone());
3721 }
3722 campaign.updated_at_ms = now_ms();
3723 }
3724 "record_baseline_replay" => {
3725 let run_id = run_id
3726 .map(str::trim)
3727 .filter(|value| !value.is_empty())
3728 .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
3729 let phase1 = campaign
3730 .phase1
3731 .as_ref()
3732 .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
3733 let run = self
3734 .get_automation_v2_run(run_id)
3735 .await
3736 .ok_or_else(|| "automation run not found".to_string())?;
3737 if run.automation_id != campaign.source_workflow_id {
3738 return Err(
3739 "baseline replay run must belong to the optimization source workflow"
3740 .to_string(),
3741 );
3742 }
3743 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3744 "baseline replay run must include an automation snapshot".to_string()
3745 })?;
3746 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3747 return Err(
3748 "baseline replay run does not match the current campaign baseline snapshot"
3749 .to_string(),
3750 );
3751 }
3752 let metrics =
3753 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3754 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3755 campaign
3756 .baseline_replays
3757 .push(OptimizationBaselineReplayRecord {
3758 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3759 automation_run_id: Some(run.run_id.clone()),
3760 phase1_metrics: metrics,
3761 validator_case_outcomes,
3762 experiment_count_at_recording: self
3763 .count_optimization_experiments(&campaign.optimization_id)
3764 .await as u64,
3765 recorded_at_ms: now_ms(),
3766 });
3767 campaign
3768 .pending_baseline_run_ids
3769 .retain(|value| value != run_id);
3770 campaign.updated_at_ms = now_ms();
3771 }
3772 "approve_winner" => {
3773 let experiment_id = experiment_id
3774 .map(str::trim)
3775 .filter(|value| !value.is_empty())
3776 .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
3777 let mut experiment = self
3778 .get_optimization_experiment(optimization_id, experiment_id)
3779 .await
3780 .ok_or_else(|| "experiment not found".to_string())?;
3781 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3782 return Err(
3783 "experiment baseline_snapshot_hash does not match current campaign baseline"
3784 .to_string(),
3785 );
3786 }
3787 if let Some(phase1) = campaign.phase1.as_ref() {
3788 let validated = validate_phase1_candidate_mutation(
3789 &campaign.baseline_snapshot,
3790 &experiment.candidate_snapshot,
3791 phase1,
3792 )?;
3793 if experiment.mutation_summary.is_none() {
3794 experiment.mutation_summary = Some(validated.summary.clone());
3795 }
3796 let approved_at_ms = now_ms();
3797 let apply_patch = Self::build_optimization_apply_patch(
3798 &campaign.baseline_snapshot,
3799 &experiment.candidate_snapshot,
3800 &validated,
3801 approved_at_ms,
3802 )?;
3803 let mut metadata = match experiment.metadata.take() {
3804 Some(Value::Object(map)) => map,
3805 Some(_) => {
3806 return Err("experiment metadata must be a JSON object".to_string());
3807 }
3808 None => serde_json::Map::new(),
3809 };
3810 metadata.insert("apply_patch".to_string(), apply_patch);
3811 experiment.metadata = Some(Value::Object(metadata));
3812 if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
3813 let candidate_metrics = experiment
3814 .phase1_metrics
3815 .clone()
3816 .or_else(|| {
3817 experiment
3818 .metrics
3819 .as_ref()
3820 .and_then(|metrics| parse_phase1_metrics(metrics).ok())
3821 })
3822 .ok_or_else(|| {
3823 "phase 1 candidate is missing promotion metrics".to_string()
3824 })?;
3825 let decision =
3826 evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
3827 experiment.promotion_recommendation = Some(
3828 match decision.decision {
3829 OptimizationPromotionDecisionKind::Promote => "promote",
3830 OptimizationPromotionDecisionKind::Discard => "discard",
3831 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3832 "needs_operator_review"
3833 }
3834 }
3835 .to_string(),
3836 );
3837 experiment.promotion_decision = Some(decision.clone());
3838 if decision.decision != OptimizationPromotionDecisionKind::Promote {
3839 let _ = self
3840 .put_optimization_experiment(experiment)
3841 .await
3842 .map_err(|e| e.to_string())?;
3843 return Err(decision.reason);
3844 }
3845 campaign.baseline_metrics = Some(candidate_metrics);
3846 }
3847 }
3848 campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
3849 campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
3850 campaign.baseline_replays.clear();
3851 campaign.pending_baseline_run_ids.clear();
3852 campaign.pending_promotion_experiment_id = None;
3853 campaign.status = OptimizationCampaignStatus::Draft;
3854 campaign.last_pause_reason = None;
3855 experiment.status = OptimizationExperimentStatus::PromotionApproved;
3856 let _ = self
3857 .put_optimization_experiment(experiment)
3858 .await
3859 .map_err(|e| e.to_string())?;
3860 }
3861 "reject_winner" => {
3862 let experiment_id = experiment_id
3863 .map(str::trim)
3864 .filter(|value| !value.is_empty())
3865 .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
3866 let mut experiment = self
3867 .get_optimization_experiment(optimization_id, experiment_id)
3868 .await
3869 .ok_or_else(|| "experiment not found".to_string())?;
3870 campaign.pending_promotion_experiment_id = None;
3871 campaign.status = OptimizationCampaignStatus::Draft;
3872 campaign.last_pause_reason = reason
3873 .map(str::trim)
3874 .filter(|value| !value.is_empty())
3875 .map(str::to_string);
3876 experiment.status = OptimizationExperimentStatus::PromotionRejected;
3877 let _ = self
3878 .put_optimization_experiment(experiment)
3879 .await
3880 .map_err(|e| e.to_string())?;
3881 }
3882 _ => return Err("unsupported optimization action".to_string()),
3883 }
3884 self.put_optimization_campaign(campaign)
3885 .await
3886 .map_err(|e| e.to_string())
3887 }
3888
3889 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3890 let mut rows = self
3891 .automations_v2
3892 .read()
3893 .await
3894 .values()
3895 .cloned()
3896 .collect::<Vec<_>>();
3897 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3898 rows
3899 }
3900
3901 pub async fn delete_automation_v2(
3902 &self,
3903 automation_id: &str,
3904 ) -> anyhow::Result<Option<AutomationV2Spec>> {
3905 let removed = self.automations_v2.write().await.remove(automation_id);
3906 let removed_run_count = {
3907 let mut runs = self.automation_v2_runs.write().await;
3908 let before = runs.len();
3909 runs.retain(|_, run| run.automation_id != automation_id);
3910 before.saturating_sub(runs.len())
3911 };
3912 self.persist_automations_v2().await?;
3913 if removed_run_count > 0 {
3914 self.persist_automation_v2_runs().await?;
3915 }
3916 self.verify_automation_v2_persisted(automation_id, false)
3917 .await?;
3918 Ok(removed)
3919 }
3920
3921 pub async fn create_automation_v2_run(
3922 &self,
3923 automation: &AutomationV2Spec,
3924 trigger_type: &str,
3925 ) -> anyhow::Result<AutomationV2RunRecord> {
3926 let now = now_ms();
3927 let pending_nodes = automation
3928 .flow
3929 .nodes
3930 .iter()
3931 .map(|n| n.node_id.clone())
3932 .collect::<Vec<_>>();
3933 let run = AutomationV2RunRecord {
3934 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
3935 automation_id: automation.automation_id.clone(),
3936 trigger_type: trigger_type.to_string(),
3937 status: AutomationRunStatus::Queued,
3938 created_at_ms: now,
3939 updated_at_ms: now,
3940 started_at_ms: None,
3941 finished_at_ms: None,
3942 active_session_ids: Vec::new(),
3943 latest_session_id: None,
3944 active_instance_ids: Vec::new(),
3945 checkpoint: AutomationRunCheckpoint {
3946 completed_nodes: Vec::new(),
3947 pending_nodes,
3948 node_outputs: std::collections::HashMap::new(),
3949 node_attempts: std::collections::HashMap::new(),
3950 blocked_nodes: Vec::new(),
3951 awaiting_gate: None,
3952 gate_history: Vec::new(),
3953 lifecycle_history: Vec::new(),
3954 last_failure: None,
3955 },
3956 automation_snapshot: Some(automation.clone()),
3957 pause_reason: None,
3958 resume_reason: None,
3959 detail: None,
3960 stop_kind: None,
3961 stop_reason: None,
3962 prompt_tokens: 0,
3963 completion_tokens: 0,
3964 total_tokens: 0,
3965 estimated_cost_usd: 0.0,
3966 };
3967 self.automation_v2_runs
3968 .write()
3969 .await
3970 .insert(run.run_id.clone(), run.clone());
3971 self.persist_automation_v2_runs().await?;
3972 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
3973 .await
3974 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
3975 Ok(run)
3976 }
3977
3978 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
3979 self.automation_v2_runs.read().await.get(run_id).cloned()
3980 }
3981
3982 pub async fn list_automation_v2_runs(
3983 &self,
3984 automation_id: Option<&str>,
3985 limit: usize,
3986 ) -> Vec<AutomationV2RunRecord> {
3987 let mut rows = self
3988 .automation_v2_runs
3989 .read()
3990 .await
3991 .values()
3992 .filter(|row| {
3993 if let Some(id) = automation_id {
3994 row.automation_id == id
3995 } else {
3996 true
3997 }
3998 })
3999 .cloned()
4000 .collect::<Vec<_>>();
4001 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
4002 rows.truncate(limit.clamp(1, 500));
4003 rows
4004 }
4005
4006 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
4007 let mut guard = self.automation_v2_runs.write().await;
4008 let run_id = guard
4009 .values()
4010 .filter(|row| row.status == AutomationRunStatus::Queued)
4011 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
4012 .map(|row| row.run_id.clone())?;
4013 let now = now_ms();
4014 let run = guard.get_mut(&run_id)?;
4015 run.status = AutomationRunStatus::Running;
4016 run.updated_at_ms = now;
4017 run.started_at_ms.get_or_insert(now);
4018 let claimed = run.clone();
4019 drop(guard);
4020 let _ = self.persist_automation_v2_runs().await;
4021 Some(claimed)
4022 }
4023
4024 pub async fn update_automation_v2_run(
4025 &self,
4026 run_id: &str,
4027 update: impl FnOnce(&mut AutomationV2RunRecord),
4028 ) -> Option<AutomationV2RunRecord> {
4029 let mut guard = self.automation_v2_runs.write().await;
4030 let run = guard.get_mut(run_id)?;
4031 update(run);
4032 run.updated_at_ms = now_ms();
4033 if matches!(
4034 run.status,
4035 AutomationRunStatus::Completed
4036 | AutomationRunStatus::Blocked
4037 | AutomationRunStatus::Failed
4038 | AutomationRunStatus::Cancelled
4039 ) {
4040 run.finished_at_ms.get_or_insert_with(now_ms);
4041 }
4042 let out = run.clone();
4043 drop(guard);
4044 let _ = self.persist_automation_v2_runs().await;
4045 Some(out)
4046 }
4047
4048 pub async fn add_automation_v2_session(
4049 &self,
4050 run_id: &str,
4051 session_id: &str,
4052 ) -> Option<AutomationV2RunRecord> {
4053 let updated = self
4054 .update_automation_v2_run(run_id, |row| {
4055 if !row.active_session_ids.iter().any(|id| id == session_id) {
4056 row.active_session_ids.push(session_id.to_string());
4057 }
4058 row.latest_session_id = Some(session_id.to_string());
4059 })
4060 .await;
4061 self.automation_v2_session_runs
4062 .write()
4063 .await
4064 .insert(session_id.to_string(), run_id.to_string());
4065 updated
4066 }
4067
4068 pub async fn clear_automation_v2_session(
4069 &self,
4070 run_id: &str,
4071 session_id: &str,
4072 ) -> Option<AutomationV2RunRecord> {
4073 self.automation_v2_session_runs
4074 .write()
4075 .await
4076 .remove(session_id);
4077 self.update_automation_v2_run(run_id, |row| {
4078 row.active_session_ids.retain(|id| id != session_id);
4079 })
4080 .await
4081 }
4082
4083 pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
4084 let mut guard = self.automation_v2_session_runs.write().await;
4085 for session_id in session_ids {
4086 guard.remove(session_id);
4087 }
4088 }
4089
4090 pub async fn add_automation_v2_instance(
4091 &self,
4092 run_id: &str,
4093 instance_id: &str,
4094 ) -> Option<AutomationV2RunRecord> {
4095 self.update_automation_v2_run(run_id, |row| {
4096 if !row.active_instance_ids.iter().any(|id| id == instance_id) {
4097 row.active_instance_ids.push(instance_id.to_string());
4098 }
4099 })
4100 .await
4101 }
4102
4103 pub async fn clear_automation_v2_instance(
4104 &self,
4105 run_id: &str,
4106 instance_id: &str,
4107 ) -> Option<AutomationV2RunRecord> {
4108 self.update_automation_v2_run(run_id, |row| {
4109 row.active_instance_ids.retain(|id| id != instance_id);
4110 })
4111 .await
4112 }
4113
4114 pub async fn apply_provider_usage_to_runs(
4115 &self,
4116 session_id: &str,
4117 prompt_tokens: u64,
4118 completion_tokens: u64,
4119 total_tokens: u64,
4120 ) {
4121 if let Some(policy) = self.routine_session_policy(session_id).await {
4122 let rate = self.token_cost_per_1k_usd.max(0.0);
4123 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4124 let mut guard = self.routine_runs.write().await;
4125 if let Some(run) = guard.get_mut(&policy.run_id) {
4126 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4127 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4128 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4129 run.estimated_cost_usd += delta_cost;
4130 run.updated_at_ms = now_ms();
4131 }
4132 drop(guard);
4133 let _ = self.persist_routine_runs().await;
4134 }
4135
4136 let maybe_v2_run_id = self
4137 .automation_v2_session_runs
4138 .read()
4139 .await
4140 .get(session_id)
4141 .cloned();
4142 if let Some(run_id) = maybe_v2_run_id {
4143 let rate = self.token_cost_per_1k_usd.max(0.0);
4144 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4145 let mut guard = self.automation_v2_runs.write().await;
4146 if let Some(run) = guard.get_mut(&run_id) {
4147 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4148 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4149 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4150 run.estimated_cost_usd += delta_cost;
4151 run.updated_at_ms = now_ms();
4152 }
4153 drop(guard);
4154 let _ = self.persist_automation_v2_runs().await;
4155 }
4156 }
4157
4158 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
4159 let mut fired = Vec::new();
4160 let mut guard = self.automations_v2.write().await;
4161 for automation in guard.values_mut() {
4162 if automation.status != AutomationV2Status::Active {
4163 continue;
4164 }
4165 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
4166 automation.next_fire_at_ms =
4167 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4168 continue;
4169 };
4170 if now_ms < next_fire_at_ms {
4171 continue;
4172 }
4173 let run_count =
4174 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
4175 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4176 automation.next_fire_at_ms = next;
4177 automation.last_fired_at_ms = Some(now_ms);
4178 for _ in 0..run_count {
4179 fired.push(automation.automation_id.clone());
4180 }
4181 }
4182 drop(guard);
4183 let _ = self.persist_automations_v2().await;
4184 fired
4185 }
4186}
4187
4188async fn build_channels_config(
4189 state: &AppState,
4190 channels: &ChannelsConfigFile,
4191) -> Option<ChannelsConfig> {
4192 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
4193 return None;
4194 }
4195 Some(ChannelsConfig {
4196 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
4197 bot_token: cfg.bot_token,
4198 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4199 mention_only: cfg.mention_only,
4200 style_profile: cfg.style_profile,
4201 security_profile: cfg.security_profile,
4202 }),
4203 discord: channels.discord.clone().map(|cfg| DiscordConfig {
4204 bot_token: cfg.bot_token,
4205 guild_id: cfg.guild_id,
4206 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4207 mention_only: cfg.mention_only,
4208 security_profile: cfg.security_profile,
4209 }),
4210 slack: channels.slack.clone().map(|cfg| SlackConfig {
4211 bot_token: cfg.bot_token,
4212 channel_id: cfg.channel_id,
4213 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4214 mention_only: cfg.mention_only,
4215 security_profile: cfg.security_profile,
4216 }),
4217 server_base_url: state.server_base_url(),
4218 api_token: state.api_token().await.unwrap_or_default(),
4219 tool_policy: channels.tool_policy.clone(),
4220 })
4221}
4222
4223fn is_valid_owner_repo_slug(value: &str) -> bool {
4226 let trimmed = value.trim();
4227 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
4228 return false;
4229 }
4230 let mut parts = trimmed.split('/');
4231 let Some(owner) = parts.next() else {
4232 return false;
4233 };
4234 let Some(repo) = parts.next() else {
4235 return false;
4236 };
4237 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
4238}
4239
4240fn legacy_automations_v2_path() -> Option<PathBuf> {
4241 config::paths::resolve_legacy_root_file_path("automations_v2.json")
4242 .filter(|path| path != &config::paths::resolve_automations_v2_path())
4243}
4244
4245fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4246 let mut candidates = vec![active_path.clone()];
4247 if let Some(legacy_path) = legacy_automations_v2_path() {
4248 if !candidates.contains(&legacy_path) {
4249 candidates.push(legacy_path);
4250 }
4251 }
4252 let default_path = config::paths::default_state_dir().join("automations_v2.json");
4253 if !candidates.contains(&default_path) {
4254 candidates.push(default_path);
4255 }
4256 candidates
4257}
4258
4259async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
4260 let Some(legacy_path) = legacy_automations_v2_path() else {
4261 return Ok(());
4262 };
4263 if legacy_path == *active_path || !legacy_path.exists() {
4264 return Ok(());
4265 }
4266 fs::remove_file(&legacy_path).await?;
4267 tracing::info!(
4268 active_path = active_path.display().to_string(),
4269 removed_path = legacy_path.display().to_string(),
4270 "removed stale legacy automation v2 file after canonical persistence"
4271 );
4272 Ok(())
4273}
4274
4275fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
4276 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
4277 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
4278}
4279
4280fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4281 let mut candidates = vec![active_path.clone()];
4282 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
4283 if !candidates.contains(&legacy_path) {
4284 candidates.push(legacy_path);
4285 }
4286 }
4287 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
4288 if !candidates.contains(&default_path) {
4289 candidates.push(default_path);
4290 }
4291 candidates
4292}
4293
4294fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4295 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4296 .unwrap_or_default()
4297}
4298
4299fn parse_automation_v2_runs_file(
4300 raw: &str,
4301) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4302 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4303 .unwrap_or_default()
4304}
4305
4306fn parse_optimization_campaigns_file(
4307 raw: &str,
4308) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
4309 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
4310 .unwrap_or_default()
4311}
4312
4313fn parse_optimization_experiments_file(
4314 raw: &str,
4315) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
4316 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
4317 .unwrap_or_default()
4318}
4319
4320fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4321 match schedule {
4322 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4323 RoutineSchedule::Cron { .. } => None,
4324 }
4325}
4326
4327fn parse_timezone(timezone: &str) -> Option<Tz> {
4328 timezone.trim().parse::<Tz>().ok()
4329}
4330
4331fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4332 let tz = parse_timezone(timezone)?;
4333 let schedule = Schedule::from_str(expression).ok()?;
4334 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4335 let local_from = from_dt.with_timezone(&tz);
4336 let next = schedule.after(&local_from).next()?;
4337 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4338}
4339
4340fn compute_next_schedule_fire_at_ms(
4341 schedule: &RoutineSchedule,
4342 timezone: &str,
4343 from_ms: u64,
4344) -> Option<u64> {
4345 let _ = parse_timezone(timezone)?;
4346 match schedule {
4347 RoutineSchedule::IntervalSeconds { seconds } => {
4348 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4349 }
4350 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4351 }
4352}
4353
4354fn compute_misfire_plan_for_schedule(
4355 now_ms: u64,
4356 next_fire_at_ms: u64,
4357 schedule: &RoutineSchedule,
4358 timezone: &str,
4359 policy: &RoutineMisfirePolicy,
4360) -> (u32, u64) {
4361 match schedule {
4362 RoutineSchedule::IntervalSeconds { .. } => {
4363 let Some(interval_ms) = routine_interval_ms(schedule) else {
4364 return (0, next_fire_at_ms);
4365 };
4366 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4367 }
4368 RoutineSchedule::Cron { expression } => {
4369 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4370 .unwrap_or_else(|| now_ms.saturating_add(60_000));
4371 match policy {
4372 RoutineMisfirePolicy::Skip => (0, aligned_next),
4373 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4374 RoutineMisfirePolicy::CatchUp { max_runs } => {
4375 let mut count = 0u32;
4376 let mut cursor = next_fire_at_ms;
4377 while cursor <= now_ms && count < *max_runs {
4378 count = count.saturating_add(1);
4379 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4380 break;
4381 };
4382 if next <= cursor {
4383 break;
4384 }
4385 cursor = next;
4386 }
4387 (count, aligned_next)
4388 }
4389 }
4390 }
4391 }
4392}
4393
4394fn compute_misfire_plan(
4395 now_ms: u64,
4396 next_fire_at_ms: u64,
4397 interval_ms: u64,
4398 policy: &RoutineMisfirePolicy,
4399) -> (u32, u64) {
4400 if now_ms < next_fire_at_ms || interval_ms == 0 {
4401 return (0, next_fire_at_ms);
4402 }
4403 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4404 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4405 match policy {
4406 RoutineMisfirePolicy::Skip => (0, aligned_next),
4407 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4408 RoutineMisfirePolicy::CatchUp { max_runs } => {
4409 let count = missed.min(u64::from(*max_runs)) as u32;
4410 (count, aligned_next)
4411 }
4412 }
4413}
4414
4415fn auto_generated_agent_name(agent_id: &str) -> String {
4416 let names = [
4417 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4418 ];
4419 let digest = Sha256::digest(agent_id.as_bytes());
4420 let idx = usize::from(digest[0]) % names.len();
4421 format!("{}-{:02x}", names[idx], digest[1])
4422}
4423
4424fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4425 match schedule.schedule_type {
4426 AutomationV2ScheduleType::Manual => None,
4427 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4428 seconds: schedule.interval_seconds.unwrap_or(60),
4429 }),
4430 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4431 expression: schedule.cron_expression.clone().unwrap_or_default(),
4432 }),
4433 }
4434}
4435
4436fn automation_schedule_next_fire_at_ms(
4437 schedule: &AutomationV2Schedule,
4438 from_ms: u64,
4439) -> Option<u64> {
4440 let routine_schedule = schedule_from_automation_v2(schedule)?;
4441 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4442}
4443
4444fn automation_schedule_due_count(
4445 schedule: &AutomationV2Schedule,
4446 now_ms: u64,
4447 next_fire_at_ms: u64,
4448) -> u32 {
4449 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4450 return 0;
4451 };
4452 let (count, _) = compute_misfire_plan_for_schedule(
4453 now_ms,
4454 next_fire_at_ms,
4455 &routine_schedule,
4456 &schedule.timezone,
4457 &schedule.misfire_policy,
4458 );
4459 count.max(1)
4460}
4461
4462#[derive(Debug, Clone, PartialEq, Eq)]
4463pub enum RoutineExecutionDecision {
4464 Allowed,
4465 RequiresApproval { reason: String },
4466 Blocked { reason: String },
4467}
4468
4469pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4470 let entrypoint = routine.entrypoint.to_ascii_lowercase();
4471 if entrypoint.starts_with("connector.")
4472 || entrypoint.starts_with("integration.")
4473 || entrypoint.contains("external")
4474 {
4475 return true;
4476 }
4477 routine
4478 .args
4479 .get("uses_external_integrations")
4480 .and_then(|v| v.as_bool())
4481 .unwrap_or(false)
4482 || routine
4483 .args
4484 .get("connector_id")
4485 .and_then(|v| v.as_str())
4486 .is_some()
4487}
4488
4489pub fn evaluate_routine_execution_policy(
4490 routine: &RoutineSpec,
4491 trigger_type: &str,
4492) -> RoutineExecutionDecision {
4493 if !routine_uses_external_integrations(routine) {
4494 return RoutineExecutionDecision::Allowed;
4495 }
4496 if !routine.external_integrations_allowed {
4497 return RoutineExecutionDecision::Blocked {
4498 reason: "external integrations are disabled by policy".to_string(),
4499 };
4500 }
4501 if routine.requires_approval {
4502 return RoutineExecutionDecision::RequiresApproval {
4503 reason: format!(
4504 "manual approval required before external side effects ({})",
4505 trigger_type
4506 ),
4507 };
4508 }
4509 RoutineExecutionDecision::Allowed
4510}
4511
4512fn is_valid_resource_key(key: &str) -> bool {
4513 let trimmed = key.trim();
4514 if trimmed.is_empty() {
4515 return false;
4516 }
4517 if trimmed == "swarm.active_tasks" {
4518 return true;
4519 }
4520 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4521 if !allowed_prefix
4522 .iter()
4523 .any(|prefix| trimmed.starts_with(prefix))
4524 {
4525 return false;
4526 }
4527 !trimmed.contains("//")
4528}
4529
4530impl Deref for AppState {
4531 type Target = RuntimeState;
4532
4533 fn deref(&self) -> &Self::Target {
4534 self.runtime
4535 .get()
4536 .expect("runtime accessed before startup completion")
4537 }
4538}
4539
4540#[derive(Clone)]
4541struct ServerPromptContextHook {
4542 state: AppState,
4543}
4544
4545impl ServerPromptContextHook {
4546 fn new(state: AppState) -> Self {
4547 Self { state }
4548 }
4549
4550 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4551 let paths = resolve_shared_paths().ok()?;
4552 MemoryDatabase::new(&paths.memory_db_path).await.ok()
4553 }
4554
4555 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4556 let paths = resolve_shared_paths().ok()?;
4557 tandem_memory::MemoryManager::new(&paths.memory_db_path)
4558 .await
4559 .ok()
4560 }
4561
4562 fn hash_query(input: &str) -> String {
4563 let mut hasher = Sha256::new();
4564 hasher.update(input.as_bytes());
4565 format!("{:x}", hasher.finalize())
4566 }
4567
4568 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4569 let mut out = vec!["<memory_context>".to_string()];
4570 let mut used = 0usize;
4571 for hit in hits {
4572 let text = hit
4573 .record
4574 .content
4575 .split_whitespace()
4576 .take(60)
4577 .collect::<Vec<_>>()
4578 .join(" ");
4579 let line = format!(
4580 "- [{:.3}] {} (source={}, run={})",
4581 hit.score, text, hit.record.source_type, hit.record.run_id
4582 );
4583 used = used.saturating_add(line.len());
4584 if used > 2200 {
4585 break;
4586 }
4587 out.push(line);
4588 }
4589 out.push("</memory_context>".to_string());
4590 out.join("\n")
4591 }
4592
4593 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4594 chunk
4595 .metadata
4596 .as_ref()
4597 .and_then(|meta| meta.get("source_url"))
4598 .and_then(Value::as_str)
4599 .map(str::trim)
4600 .filter(|v| !v.is_empty())
4601 .map(ToString::to_string)
4602 }
4603
4604 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4605 if let Some(path) = chunk
4606 .metadata
4607 .as_ref()
4608 .and_then(|meta| meta.get("relative_path"))
4609 .and_then(Value::as_str)
4610 .map(str::trim)
4611 .filter(|v| !v.is_empty())
4612 {
4613 return path.to_string();
4614 }
4615 chunk
4616 .source
4617 .strip_prefix("guide_docs:")
4618 .unwrap_or(chunk.source.as_str())
4619 .to_string()
4620 }
4621
4622 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4623 let mut out = vec!["<docs_context>".to_string()];
4624 let mut used = 0usize;
4625 for hit in hits {
4626 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4627 let path = Self::extract_docs_relative_path(&hit.chunk);
4628 let text = hit
4629 .chunk
4630 .content
4631 .split_whitespace()
4632 .take(70)
4633 .collect::<Vec<_>>()
4634 .join(" ");
4635 let line = format!(
4636 "- [{:.3}] {} (doc_path={}, source_url={})",
4637 hit.similarity, text, path, url
4638 );
4639 used = used.saturating_add(line.len());
4640 if used > 2800 {
4641 break;
4642 }
4643 out.push(line);
4644 }
4645 out.push("</docs_context>".to_string());
4646 out.join("\n")
4647 }
4648
4649 async fn search_embedded_docs(
4650 &self,
4651 query: &str,
4652 limit: usize,
4653 ) -> Vec<tandem_memory::types::MemorySearchResult> {
4654 let Some(manager) = self.open_memory_manager().await else {
4655 return Vec::new();
4656 };
4657 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
4658 manager
4659 .search(
4660 query,
4661 Some(MemoryTier::Global),
4662 None,
4663 None,
4664 Some(search_limit),
4665 )
4666 .await
4667 .unwrap_or_default()
4668 .into_iter()
4669 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
4670 .take(limit)
4671 .collect()
4672 }
4673
4674 fn should_skip_memory_injection(query: &str) -> bool {
4675 let trimmed = query.trim();
4676 if trimmed.is_empty() {
4677 return true;
4678 }
4679 let lower = trimmed.to_ascii_lowercase();
4680 let social = [
4681 "hi",
4682 "hello",
4683 "hey",
4684 "thanks",
4685 "thank you",
4686 "ok",
4687 "okay",
4688 "cool",
4689 "nice",
4690 "yo",
4691 "good morning",
4692 "good afternoon",
4693 "good evening",
4694 ];
4695 lower.len() <= 32 && social.contains(&lower.as_str())
4696 }
4697
4698 fn personality_preset_text(preset: &str) -> &'static str {
4699 match preset {
4700 "concise" => {
4701 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
4702 }
4703 "friendly" => {
4704 "Default style: friendly and supportive while staying technically rigorous and concrete."
4705 }
4706 "mentor" => {
4707 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
4708 }
4709 "critical" => {
4710 "Default style: critical and risk-first. Surface failure modes and assumptions early."
4711 }
4712 _ => {
4713 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
4714 }
4715 }
4716 }
4717
4718 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
4719 let allow_agent_override = agent_name
4720 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
4721 .unwrap_or(false);
4722 let legacy_bot_name = config
4723 .get("bot_name")
4724 .and_then(Value::as_str)
4725 .map(str::trim)
4726 .filter(|v| !v.is_empty());
4727 let bot_name = config
4728 .get("identity")
4729 .and_then(|identity| identity.get("bot"))
4730 .and_then(|bot| bot.get("canonical_name"))
4731 .and_then(Value::as_str)
4732 .map(str::trim)
4733 .filter(|v| !v.is_empty())
4734 .or(legacy_bot_name)
4735 .unwrap_or("Tandem");
4736
4737 let default_profile = config
4738 .get("identity")
4739 .and_then(|identity| identity.get("personality"))
4740 .and_then(|personality| personality.get("default"));
4741 let default_preset = default_profile
4742 .and_then(|profile| profile.get("preset"))
4743 .and_then(Value::as_str)
4744 .map(str::trim)
4745 .filter(|v| !v.is_empty())
4746 .unwrap_or("balanced");
4747 let default_custom = default_profile
4748 .and_then(|profile| profile.get("custom_instructions"))
4749 .and_then(Value::as_str)
4750 .map(str::trim)
4751 .filter(|v| !v.is_empty())
4752 .map(ToString::to_string);
4753 let legacy_persona = config
4754 .get("persona")
4755 .and_then(Value::as_str)
4756 .map(str::trim)
4757 .filter(|v| !v.is_empty())
4758 .map(ToString::to_string);
4759
4760 let per_agent_profile = if allow_agent_override {
4761 agent_name.and_then(|name| {
4762 config
4763 .get("identity")
4764 .and_then(|identity| identity.get("personality"))
4765 .and_then(|personality| personality.get("per_agent"))
4766 .and_then(|per_agent| per_agent.get(name))
4767 })
4768 } else {
4769 None
4770 };
4771 let preset = per_agent_profile
4772 .and_then(|profile| profile.get("preset"))
4773 .and_then(Value::as_str)
4774 .map(str::trim)
4775 .filter(|v| !v.is_empty())
4776 .unwrap_or(default_preset);
4777 let custom = per_agent_profile
4778 .and_then(|profile| profile.get("custom_instructions"))
4779 .and_then(Value::as_str)
4780 .map(str::trim)
4781 .filter(|v| !v.is_empty())
4782 .map(ToString::to_string)
4783 .or(default_custom)
4784 .or(legacy_persona);
4785
4786 let mut lines = vec![
4787 format!("You are {bot_name}, an AI assistant."),
4788 Self::personality_preset_text(preset).to_string(),
4789 ];
4790 if let Some(custom) = custom {
4791 lines.push(format!("Additional personality instructions: {custom}"));
4792 }
4793 Some(lines.join("\n"))
4794 }
4795
4796 fn build_memory_scope_block(
4797 session_id: &str,
4798 project_id: Option<&str>,
4799 workspace_root: Option<&str>,
4800 ) -> String {
4801 let mut lines = vec![
4802 "<memory_scope>".to_string(),
4803 format!("- current_session_id: {}", session_id),
4804 ];
4805 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
4806 lines.push(format!("- current_project_id: {}", project_id));
4807 }
4808 if let Some(workspace_root) = workspace_root
4809 .map(str::trim)
4810 .filter(|value| !value.is_empty())
4811 {
4812 lines.push(format!("- workspace_root: {}", workspace_root));
4813 }
4814 lines.push(
4815 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
4816 .to_string(),
4817 );
4818 lines.push(
4819 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
4820 .to_string(),
4821 );
4822 lines.push(
4823 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
4824 .to_string(),
4825 );
4826 lines.push("</memory_scope>".to_string());
4827 lines.join("\n")
4828 }
4829}
4830
4831impl PromptContextHook for ServerPromptContextHook {
4832 fn augment_provider_messages(
4833 &self,
4834 ctx: PromptContextHookContext,
4835 mut messages: Vec<ChatMessage>,
4836 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
4837 let this = self.clone();
4838 Box::pin(async move {
4839 if !this.state.is_ready() {
4842 return Ok(messages);
4843 }
4844 let run = this.state.run_registry.get(&ctx.session_id).await;
4845 let Some(run) = run else {
4846 return Ok(messages);
4847 };
4848 let config = this.state.config.get_effective_value().await;
4849 if let Some(identity_block) =
4850 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
4851 {
4852 messages.push(ChatMessage {
4853 role: "system".to_string(),
4854 content: identity_block,
4855 attachments: Vec::new(),
4856 });
4857 }
4858 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
4859 messages.push(ChatMessage {
4860 role: "system".to_string(),
4861 content: Self::build_memory_scope_block(
4862 &ctx.session_id,
4863 session.project_id.as_deref(),
4864 session.workspace_root.as_deref(),
4865 ),
4866 attachments: Vec::new(),
4867 });
4868 }
4869 let run_id = run.run_id;
4870 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
4871 let query = messages
4872 .iter()
4873 .rev()
4874 .find(|m| m.role == "user")
4875 .map(|m| m.content.clone())
4876 .unwrap_or_default();
4877 if query.trim().is_empty() {
4878 return Ok(messages);
4879 }
4880 if Self::should_skip_memory_injection(&query) {
4881 return Ok(messages);
4882 }
4883
4884 let docs_hits = this.search_embedded_docs(&query, 6).await;
4885 if !docs_hits.is_empty() {
4886 let docs_block = Self::build_docs_memory_block(&docs_hits);
4887 messages.push(ChatMessage {
4888 role: "system".to_string(),
4889 content: docs_block.clone(),
4890 attachments: Vec::new(),
4891 });
4892 this.state.event_bus.publish(EngineEvent::new(
4893 "memory.docs.context.injected",
4894 json!({
4895 "runID": run_id,
4896 "sessionID": ctx.session_id,
4897 "messageID": ctx.message_id,
4898 "iteration": ctx.iteration,
4899 "count": docs_hits.len(),
4900 "tokenSizeApprox": docs_block.split_whitespace().count(),
4901 "sourcePrefix": "guide_docs:"
4902 }),
4903 ));
4904 return Ok(messages);
4905 }
4906
4907 let Some(db) = this.open_memory_db().await else {
4908 return Ok(messages);
4909 };
4910 let started = now_ms();
4911 let hits = db
4912 .search_global_memory(&user_id, &query, 8, None, None, None)
4913 .await
4914 .unwrap_or_default();
4915 let latency_ms = now_ms().saturating_sub(started);
4916 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
4917 this.state.event_bus.publish(EngineEvent::new(
4918 "memory.search.performed",
4919 json!({
4920 "runID": run_id,
4921 "sessionID": ctx.session_id,
4922 "messageID": ctx.message_id,
4923 "providerID": ctx.provider_id,
4924 "modelID": ctx.model_id,
4925 "iteration": ctx.iteration,
4926 "queryHash": Self::hash_query(&query),
4927 "resultCount": hits.len(),
4928 "scoreMin": scores.iter().copied().reduce(f64::min),
4929 "scoreMax": scores.iter().copied().reduce(f64::max),
4930 "scores": scores,
4931 "latencyMs": latency_ms,
4932 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
4933 }),
4934 ));
4935
4936 if hits.is_empty() {
4937 return Ok(messages);
4938 }
4939
4940 let memory_block = Self::build_memory_block(&hits);
4941 messages.push(ChatMessage {
4942 role: "system".to_string(),
4943 content: memory_block.clone(),
4944 attachments: Vec::new(),
4945 });
4946 this.state.event_bus.publish(EngineEvent::new(
4947 "memory.context.injected",
4948 json!({
4949 "runID": run_id,
4950 "sessionID": ctx.session_id,
4951 "messageID": ctx.message_id,
4952 "iteration": ctx.iteration,
4953 "count": hits.len(),
4954 "tokenSizeApprox": memory_block.split_whitespace().count(),
4955 }),
4956 ));
4957 Ok(messages)
4958 })
4959 }
4960}
4961
4962fn extract_event_session_id(properties: &Value) -> Option<String> {
4963 properties
4964 .get("sessionID")
4965 .or_else(|| properties.get("sessionId"))
4966 .or_else(|| properties.get("id"))
4967 .or_else(|| {
4968 properties
4969 .get("part")
4970 .and_then(|part| part.get("sessionID"))
4971 })
4972 .or_else(|| {
4973 properties
4974 .get("part")
4975 .and_then(|part| part.get("sessionId"))
4976 })
4977 .and_then(|v| v.as_str())
4978 .map(|s| s.to_string())
4979}
4980
4981fn extract_event_run_id(properties: &Value) -> Option<String> {
4982 properties
4983 .get("runID")
4984 .or_else(|| properties.get("run_id"))
4985 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
4986 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
4987 .and_then(|v| v.as_str())
4988 .map(|s| s.to_string())
4989}
4990
4991pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
4992 let part = properties.get("part")?;
4993 let part_type = part
4994 .get("type")
4995 .and_then(|v| v.as_str())
4996 .unwrap_or_default()
4997 .to_ascii_lowercase();
4998 if part_type != "tool" && part_type != "tool-invocation" && part_type != "tool-result" {
4999 return None;
5000 }
5001 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
5002 let message_id = part
5003 .get("messageID")
5004 .or_else(|| part.get("message_id"))
5005 .and_then(|v| v.as_str())?
5006 .to_string();
5007 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
5008 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5009 if let Some(preview) = properties
5010 .get("toolCallDelta")
5011 .and_then(|delta| delta.get("parsedArgsPreview"))
5012 .cloned()
5013 {
5014 let preview_nonempty = !preview.is_null()
5015 && !preview.as_object().is_some_and(|value| value.is_empty())
5016 && !preview
5017 .as_str()
5018 .map(|value| value.trim().is_empty())
5019 .unwrap_or(false);
5020 if preview_nonempty {
5021 args = preview;
5022 }
5023 }
5024 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5025 if let Some(raw_preview) = properties
5026 .get("toolCallDelta")
5027 .and_then(|delta| delta.get("rawArgsPreview"))
5028 .and_then(|value| value.as_str())
5029 .map(str::trim)
5030 .filter(|value| !value.is_empty())
5031 {
5032 args = Value::String(raw_preview.to_string());
5033 }
5034 }
5035 }
5036 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
5037 {
5038 tracing::info!(
5039 message_id = %message_id,
5040 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
5041 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
5042 has_result = part.get("result").is_some(),
5043 has_error = part.get("error").is_some(),
5044 "persistable write tool part still has empty args"
5045 );
5046 }
5047 let result = part.get("result").cloned().filter(|value| !value.is_null());
5048 let error = part
5049 .get("error")
5050 .and_then(|v| v.as_str())
5051 .map(|value| value.to_string());
5052 Some((
5053 message_id,
5054 MessagePart::ToolInvocation {
5055 tool,
5056 args,
5057 result,
5058 error,
5059 },
5060 ))
5061}
5062
5063pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
5064 let session_id = extract_event_session_id(&event.properties)?;
5065 let run_id = extract_event_run_id(&event.properties);
5066 let key = format!("run/{session_id}/status");
5067
5068 let mut base = serde_json::Map::new();
5069 base.insert("sessionID".to_string(), Value::String(session_id));
5070 if let Some(run_id) = run_id {
5071 base.insert("runID".to_string(), Value::String(run_id));
5072 }
5073
5074 match event.event_type.as_str() {
5075 "session.run.started" => {
5076 base.insert("state".to_string(), Value::String("running".to_string()));
5077 base.insert("phase".to_string(), Value::String("run".to_string()));
5078 base.insert(
5079 "eventType".to_string(),
5080 Value::String("session.run.started".to_string()),
5081 );
5082 Some(StatusIndexUpdate {
5083 key,
5084 value: Value::Object(base),
5085 })
5086 }
5087 "session.run.finished" => {
5088 base.insert("state".to_string(), Value::String("finished".to_string()));
5089 base.insert("phase".to_string(), Value::String("run".to_string()));
5090 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
5091 base.insert("result".to_string(), Value::String(status.to_string()));
5092 }
5093 base.insert(
5094 "eventType".to_string(),
5095 Value::String("session.run.finished".to_string()),
5096 );
5097 Some(StatusIndexUpdate {
5098 key,
5099 value: Value::Object(base),
5100 })
5101 }
5102 "message.part.updated" => {
5103 let part = event.properties.get("part")?;
5104 let part_type = part.get("type").and_then(|v| v.as_str())?;
5105 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
5106 let (phase, tool_active) = match (part_type, part_state) {
5107 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
5108 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
5109 _ => return None,
5110 };
5111 base.insert("state".to_string(), Value::String("running".to_string()));
5112 base.insert("phase".to_string(), Value::String(phase.to_string()));
5113 base.insert("toolActive".to_string(), Value::Bool(tool_active));
5114 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
5115 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
5116 }
5117 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
5118 base.insert(
5119 "toolState".to_string(),
5120 Value::String(tool_state.to_string()),
5121 );
5122 }
5123 if let Some(tool_error) = part
5124 .get("error")
5125 .and_then(|v| v.as_str())
5126 .map(str::trim)
5127 .filter(|value| !value.is_empty())
5128 {
5129 base.insert(
5130 "toolError".to_string(),
5131 Value::String(tool_error.to_string()),
5132 );
5133 }
5134 if let Some(tool_call_id) = part
5135 .get("id")
5136 .and_then(|v| v.as_str())
5137 .map(str::trim)
5138 .filter(|value| !value.is_empty())
5139 {
5140 base.insert(
5141 "toolCallID".to_string(),
5142 Value::String(tool_call_id.to_string()),
5143 );
5144 }
5145 if let Some(args_preview) = part
5146 .get("args")
5147 .filter(|value| {
5148 !value.is_null()
5149 && !value.as_object().is_some_and(|map| map.is_empty())
5150 && !value
5151 .as_str()
5152 .map(|text| text.trim().is_empty())
5153 .unwrap_or(false)
5154 })
5155 .map(|value| truncate_text(&value.to_string(), 500))
5156 {
5157 base.insert(
5158 "toolArgsPreview".to_string(),
5159 Value::String(args_preview.to_string()),
5160 );
5161 }
5162 base.insert(
5163 "eventType".to_string(),
5164 Value::String("message.part.updated".to_string()),
5165 );
5166 Some(StatusIndexUpdate {
5167 key,
5168 value: Value::Object(base),
5169 })
5170 }
5171 _ => None,
5172 }
5173}
5174
5175pub async fn run_session_part_persister(state: AppState) {
5176 crate::app::tasks::run_session_part_persister(state).await
5177}
5178
5179pub async fn run_status_indexer(state: AppState) {
5180 crate::app::tasks::run_status_indexer(state).await
5181}
5182
5183pub async fn run_agent_team_supervisor(state: AppState) {
5184 crate::app::tasks::run_agent_team_supervisor(state).await
5185}
5186
5187pub async fn run_bug_monitor(state: AppState) {
5188 crate::app::tasks::run_bug_monitor(state).await
5189}
5190
5191pub async fn run_usage_aggregator(state: AppState) {
5192 crate::app::tasks::run_usage_aggregator(state).await
5193}
5194
5195pub async fn run_optimization_scheduler(state: AppState) {
5196 crate::app::tasks::run_optimization_scheduler(state).await
5197}
5198
5199pub async fn process_bug_monitor_event(
5200 state: &AppState,
5201 event: &EngineEvent,
5202 config: &BugMonitorConfig,
5203) -> anyhow::Result<BugMonitorIncidentRecord> {
5204 let submission =
5205 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
5206 .await?;
5207 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5208 state,
5209 submission.repo.as_deref().unwrap_or_default(),
5210 submission.fingerprint.as_deref().unwrap_or_default(),
5211 submission.title.as_deref(),
5212 submission.detail.as_deref(),
5213 &submission.excerpt,
5214 3,
5215 )
5216 .await;
5217 let fingerprint = submission
5218 .fingerprint
5219 .clone()
5220 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5221 let default_workspace_root = state.workspace_index.snapshot().await.root;
5222 let workspace_root = config
5223 .workspace_root
5224 .clone()
5225 .unwrap_or(default_workspace_root);
5226 let now = now_ms();
5227
5228 let existing = state
5229 .bug_monitor_incidents
5230 .read()
5231 .await
5232 .values()
5233 .find(|row| row.fingerprint == fingerprint)
5234 .cloned();
5235
5236 let mut incident = if let Some(mut row) = existing {
5237 row.occurrence_count = row.occurrence_count.saturating_add(1);
5238 row.updated_at_ms = now;
5239 row.last_seen_at_ms = Some(now);
5240 if row.excerpt.is_empty() {
5241 row.excerpt = submission.excerpt.clone();
5242 }
5243 row
5244 } else {
5245 BugMonitorIncidentRecord {
5246 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5247 fingerprint: fingerprint.clone(),
5248 event_type: event.event_type.clone(),
5249 status: "queued".to_string(),
5250 repo: submission.repo.clone().unwrap_or_default(),
5251 workspace_root,
5252 title: submission
5253 .title
5254 .clone()
5255 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5256 detail: submission.detail.clone(),
5257 excerpt: submission.excerpt.clone(),
5258 source: submission.source.clone(),
5259 run_id: submission.run_id.clone(),
5260 session_id: submission.session_id.clone(),
5261 correlation_id: submission.correlation_id.clone(),
5262 component: submission.component.clone(),
5263 level: submission.level.clone(),
5264 occurrence_count: 1,
5265 created_at_ms: now,
5266 updated_at_ms: now,
5267 last_seen_at_ms: Some(now),
5268 draft_id: None,
5269 triage_run_id: None,
5270 last_error: None,
5271 duplicate_summary: None,
5272 duplicate_matches: None,
5273 event_payload: Some(event.properties.clone()),
5274 }
5275 };
5276 state.put_bug_monitor_incident(incident.clone()).await?;
5277
5278 if !duplicate_matches.is_empty() {
5279 incident.status = "duplicate_suppressed".to_string();
5280 let duplicate_summary =
5281 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5282 incident.duplicate_summary = Some(duplicate_summary.clone());
5283 incident.duplicate_matches = Some(duplicate_matches.clone());
5284 incident.updated_at_ms = now_ms();
5285 state.put_bug_monitor_incident(incident.clone()).await?;
5286 state.event_bus.publish(EngineEvent::new(
5287 "bug_monitor.incident.duplicate_suppressed",
5288 serde_json::json!({
5289 "incident_id": incident.incident_id,
5290 "fingerprint": incident.fingerprint,
5291 "eventType": incident.event_type,
5292 "status": incident.status,
5293 "duplicate_summary": duplicate_summary,
5294 "duplicate_matches": duplicate_matches,
5295 }),
5296 ));
5297 return Ok(incident);
5298 }
5299
5300 let draft = match state.submit_bug_monitor_draft(submission).await {
5301 Ok(draft) => draft,
5302 Err(error) => {
5303 incident.status = "draft_failed".to_string();
5304 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5305 incident.updated_at_ms = now_ms();
5306 state.put_bug_monitor_incident(incident.clone()).await?;
5307 state.event_bus.publish(EngineEvent::new(
5308 "bug_monitor.incident.detected",
5309 serde_json::json!({
5310 "incident_id": incident.incident_id,
5311 "fingerprint": incident.fingerprint,
5312 "eventType": incident.event_type,
5313 "draft_id": incident.draft_id,
5314 "triage_run_id": incident.triage_run_id,
5315 "status": incident.status,
5316 "detail": incident.last_error,
5317 }),
5318 ));
5319 return Ok(incident);
5320 }
5321 };
5322 incident.draft_id = Some(draft.draft_id.clone());
5323 incident.status = "draft_created".to_string();
5324 state.put_bug_monitor_incident(incident.clone()).await?;
5325
5326 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5327 state.clone(),
5328 &draft.draft_id,
5329 true,
5330 )
5331 .await
5332 {
5333 Ok((updated_draft, _run_id, _deduped)) => {
5334 incident.triage_run_id = updated_draft.triage_run_id.clone();
5335 if incident.triage_run_id.is_some() {
5336 incident.status = "triage_queued".to_string();
5337 }
5338 incident.last_error = None;
5339 }
5340 Err(error) => {
5341 incident.status = "draft_created".to_string();
5342 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5343 }
5344 }
5345
5346 if let Some(draft_id) = incident.draft_id.clone() {
5347 let latest_draft = state
5348 .get_bug_monitor_draft(&draft_id)
5349 .await
5350 .unwrap_or(draft.clone());
5351 match crate::bug_monitor_github::publish_draft(
5352 state,
5353 &draft_id,
5354 Some(&incident.incident_id),
5355 crate::bug_monitor_github::PublishMode::Auto,
5356 )
5357 .await
5358 {
5359 Ok(outcome) => {
5360 incident.status = outcome.action;
5361 incident.last_error = None;
5362 }
5363 Err(error) => {
5364 let detail = truncate_text(&error.to_string(), 500);
5365 incident.last_error = Some(detail.clone());
5366 let mut failed_draft = latest_draft;
5367 failed_draft.status = "github_post_failed".to_string();
5368 failed_draft.github_status = Some("github_post_failed".to_string());
5369 failed_draft.last_post_error = Some(detail.clone());
5370 let evidence_digest = failed_draft.evidence_digest.clone();
5371 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5372 let _ = crate::bug_monitor_github::record_post_failure(
5373 state,
5374 &failed_draft,
5375 Some(&incident.incident_id),
5376 "auto_post",
5377 evidence_digest.as_deref(),
5378 &detail,
5379 )
5380 .await;
5381 }
5382 }
5383 }
5384
5385 incident.updated_at_ms = now_ms();
5386 state.put_bug_monitor_incident(incident.clone()).await?;
5387 state.event_bus.publish(EngineEvent::new(
5388 "bug_monitor.incident.detected",
5389 serde_json::json!({
5390 "incident_id": incident.incident_id,
5391 "fingerprint": incident.fingerprint,
5392 "eventType": incident.event_type,
5393 "draft_id": incident.draft_id,
5394 "triage_run_id": incident.triage_run_id,
5395 "status": incident.status,
5396 }),
5397 ));
5398 Ok(incident)
5399}
5400
5401pub fn sha256_hex(parts: &[&str]) -> String {
5402 let mut hasher = Sha256::new();
5403 for part in parts {
5404 hasher.update(part.as_bytes());
5405 hasher.update([0u8]);
5406 }
5407 format!("{:x}", hasher.finalize())
5408}
5409
5410pub async fn run_routine_scheduler(state: AppState) {
5411 crate::app::tasks::run_routine_scheduler(state).await
5412}
5413
5414pub async fn run_routine_executor(state: AppState) {
5415 crate::app::tasks::run_routine_executor(state).await
5416}
5417
5418pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
5419 crate::app::routines::build_routine_prompt(state, run).await
5420}
5421
5422pub fn truncate_text(input: &str, max_len: usize) -> String {
5423 if input.len() <= max_len {
5424 return input.to_string();
5425 }
5426 let mut out = input[..max_len].to_string();
5427 out.push_str("...<truncated>");
5428 out
5429}
5430
5431pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
5432 crate::app::routines::append_configured_output_artifacts(state, run).await
5433}
5434
5435pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
5436 let provider_id = config
5437 .get("default_provider")
5438 .and_then(|v| v.as_str())
5439 .map(str::trim)
5440 .filter(|v| !v.is_empty())?;
5441 let model_id = config
5442 .get("providers")
5443 .and_then(|v| v.get(provider_id))
5444 .and_then(|v| v.get("default_model"))
5445 .and_then(|v| v.as_str())
5446 .map(str::trim)
5447 .filter(|v| !v.is_empty())?;
5448 Some(ModelSpec {
5449 provider_id: provider_id.to_string(),
5450 model_id: model_id.to_string(),
5451 })
5452}
5453
5454pub async fn resolve_routine_model_spec_for_run(
5455 state: &AppState,
5456 run: &RoutineRunRecord,
5457) -> (Option<ModelSpec>, String) {
5458 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
5459}
5460
5461fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
5462 let mut out = Vec::new();
5463 let mut seen = std::collections::HashSet::new();
5464 for item in raw {
5465 let normalized = item.trim().to_string();
5466 if normalized.is_empty() {
5467 continue;
5468 }
5469 if seen.insert(normalized.clone()) {
5470 out.push(normalized);
5471 }
5472 }
5473 out
5474}
5475
5476#[cfg(not(feature = "browser"))]
5477impl AppState {
5478 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
5479 0
5480 }
5481
5482 pub async fn close_all_browser_sessions(&self) -> usize {
5483 0
5484 }
5485
5486 pub async fn browser_status(&self) -> serde_json::Value {
5487 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
5488 }
5489
5490 pub async fn browser_smoke_test(
5491 &self,
5492 _url: Option<String>,
5493 ) -> anyhow::Result<serde_json::Value> {
5494 anyhow::bail!("browser feature disabled")
5495 }
5496
5497 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
5498 anyhow::bail!("browser feature disabled")
5499 }
5500
5501 pub async fn browser_health_summary(&self) -> serde_json::Value {
5502 serde_json::json!({ "enabled": false })
5503 }
5504}
5505
5506pub mod automation;
5507pub use automation::*;
5508
5509#[cfg(test)]
5510mod tests;