1use crate::config::channels::normalize_allowed_tools;
2use std::ops::Deref;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, OnceLock};
7
8use chrono::{TimeZone, Utc};
9use chrono_tz::Tz;
10use cron::Schedule;
11use futures::future::BoxFuture;
12use serde::{Deserialize, Serialize};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use tandem_memory::types::MemoryTier;
16use tandem_orchestrator::MissionState;
17use tandem_types::{EngineEvent, HostRuntimeContext, MessagePart, ModelSpec};
18use tokio::fs;
19use tokio::sync::RwLock;
20
21use tandem_channels::config::{ChannelsConfig, DiscordConfig, SlackConfig, TelegramConfig};
22use tandem_core::{resolve_shared_paths, PromptContextHook, PromptContextHookContext};
23use tandem_memory::db::MemoryDatabase;
24use tandem_providers::ChatMessage;
25use tandem_workflows::{
26 load_registry as load_workflow_registry, validate_registry as validate_workflow_registry,
27 WorkflowHookBinding, WorkflowLoadSource, WorkflowRegistry, WorkflowRunRecord,
28 WorkflowRunStatus, WorkflowSourceKind, WorkflowSpec, WorkflowValidationMessage,
29};
30
31use crate::agent_teams::AgentTeamRuntime;
32use crate::app::startup::{StartupSnapshot, StartupState, StartupStatus};
33use crate::automation_v2::types::*;
34use crate::bug_monitor::types::*;
35use crate::capability_resolver::CapabilityResolver;
36use crate::config::{self, channels::ChannelsConfigFile, webui::WebUiConfig};
37use crate::memory::types::{GovernedMemoryRecord, MemoryAuditEvent};
38use crate::pack_manager::PackManager;
39use crate::preset_registry::PresetRegistry;
40use crate::routines::{errors::RoutineStoreError, types::*};
41use crate::runtime::{
42 lease::EngineLease, runs::RunRegistry, state::RuntimeState, worktrees::ManagedWorktreeRecord,
43};
44use crate::shared_resources::types::{ResourceConflict, ResourceStoreError, SharedResourceRecord};
45use crate::util::{host::detect_host_runtime_context, time::now_ms};
46use crate::{
47 derive_phase1_metrics_from_run, derive_phase1_validator_case_outcomes_from_run,
48 establish_phase1_baseline, evaluate_phase1_promotion, optimization_snapshot_hash,
49 parse_phase1_metrics, phase1_baseline_replay_due, validate_phase1_candidate_mutation,
50 OptimizationBaselineReplayRecord, OptimizationCampaignRecord, OptimizationCampaignStatus,
51 OptimizationExperimentRecord, OptimizationExperimentStatus, OptimizationMutableField,
52 OptimizationPromotionDecisionKind,
53};
54
55#[derive(Clone)]
56pub struct AppState {
57 pub runtime: Arc<OnceLock<RuntimeState>>,
58 pub startup: Arc<RwLock<StartupState>>,
59 pub in_process_mode: Arc<AtomicBool>,
60 pub api_token: Arc<RwLock<Option<String>>>,
61 pub engine_leases: Arc<RwLock<std::collections::HashMap<String, EngineLease>>>,
62 pub managed_worktrees: Arc<RwLock<std::collections::HashMap<String, ManagedWorktreeRecord>>>,
63 pub run_registry: RunRegistry,
64 pub run_stale_ms: u64,
65 pub memory_records: Arc<RwLock<std::collections::HashMap<String, GovernedMemoryRecord>>>,
66 pub memory_audit_log: Arc<RwLock<Vec<MemoryAuditEvent>>>,
67 pub missions: Arc<RwLock<std::collections::HashMap<String, MissionState>>>,
68 pub shared_resources: Arc<RwLock<std::collections::HashMap<String, SharedResourceRecord>>>,
69 pub shared_resources_path: PathBuf,
70 pub routines: Arc<RwLock<std::collections::HashMap<String, RoutineSpec>>>,
71 pub routine_history: Arc<RwLock<std::collections::HashMap<String, Vec<RoutineHistoryEvent>>>>,
72 pub routine_runs: Arc<RwLock<std::collections::HashMap<String, RoutineRunRecord>>>,
73 pub automations_v2: Arc<RwLock<std::collections::HashMap<String, AutomationV2Spec>>>,
74 pub automation_v2_runs: Arc<RwLock<std::collections::HashMap<String, AutomationV2RunRecord>>>,
75 pub automation_scheduler: Arc<RwLock<automation::AutomationScheduler>>,
76 pub automation_scheduler_stopping: Arc<AtomicBool>,
77 pub workflow_plans: Arc<RwLock<std::collections::HashMap<String, WorkflowPlan>>>,
78 pub workflow_plan_drafts:
79 Arc<RwLock<std::collections::HashMap<String, WorkflowPlanDraftRecord>>>,
80 pub optimization_campaigns:
81 Arc<RwLock<std::collections::HashMap<String, OptimizationCampaignRecord>>>,
82 pub optimization_experiments:
83 Arc<RwLock<std::collections::HashMap<String, OptimizationExperimentRecord>>>,
84 pub bug_monitor_config: Arc<RwLock<BugMonitorConfig>>,
85 pub bug_monitor_drafts: Arc<RwLock<std::collections::HashMap<String, BugMonitorDraftRecord>>>,
86 pub bug_monitor_incidents:
87 Arc<RwLock<std::collections::HashMap<String, BugMonitorIncidentRecord>>>,
88 pub bug_monitor_posts: Arc<RwLock<std::collections::HashMap<String, BugMonitorPostRecord>>>,
89 pub external_actions: Arc<RwLock<std::collections::HashMap<String, ExternalActionRecord>>>,
90 pub bug_monitor_runtime_status: Arc<RwLock<BugMonitorRuntimeStatus>>,
91 pub workflows: Arc<RwLock<WorkflowRegistry>>,
92 pub workflow_runs: Arc<RwLock<std::collections::HashMap<String, WorkflowRunRecord>>>,
93 pub workflow_hook_overrides: Arc<RwLock<std::collections::HashMap<String, bool>>>,
94 pub workflow_dispatch_seen: Arc<RwLock<std::collections::HashMap<String, u64>>>,
95 pub routine_session_policies:
96 Arc<RwLock<std::collections::HashMap<String, RoutineSessionPolicy>>>,
97 pub automation_v2_session_runs: Arc<RwLock<std::collections::HashMap<String, String>>>,
98 pub token_cost_per_1k_usd: f64,
99 pub routines_path: PathBuf,
100 pub routine_history_path: PathBuf,
101 pub routine_runs_path: PathBuf,
102 pub automations_v2_path: PathBuf,
103 pub automation_v2_runs_path: PathBuf,
104 pub optimization_campaigns_path: PathBuf,
105 pub optimization_experiments_path: PathBuf,
106 pub bug_monitor_config_path: PathBuf,
107 pub bug_monitor_drafts_path: PathBuf,
108 pub bug_monitor_incidents_path: PathBuf,
109 pub bug_monitor_posts_path: PathBuf,
110 pub external_actions_path: PathBuf,
111 pub workflow_runs_path: PathBuf,
112 pub workflow_hook_overrides_path: PathBuf,
113 pub agent_teams: AgentTeamRuntime,
114 pub web_ui_enabled: Arc<AtomicBool>,
115 pub web_ui_prefix: Arc<std::sync::RwLock<String>>,
116 pub server_base_url: Arc<std::sync::RwLock<String>>,
117 pub channels_runtime: Arc<tokio::sync::Mutex<ChannelRuntime>>,
118 pub host_runtime_context: HostRuntimeContext,
119 pub pack_manager: Arc<PackManager>,
120 pub capability_resolver: Arc<CapabilityResolver>,
121 pub preset_registry: Arc<PresetRegistry>,
122}
123#[derive(Debug, Clone, Serialize, Deserialize, Default)]
124pub struct ChannelStatus {
125 pub enabled: bool,
126 pub connected: bool,
127 pub last_error: Option<String>,
128 pub active_sessions: u64,
129 pub meta: Value,
130}
131#[derive(Debug, Clone, Serialize, Deserialize, Default)]
132struct EffectiveAppConfig {
133 #[serde(default)]
134 pub channels: ChannelsConfigFile,
135 #[serde(default)]
136 pub web_ui: WebUiConfig,
137 #[serde(default)]
138 pub browser: tandem_core::BrowserConfig,
139 #[serde(default)]
140 pub memory_consolidation: tandem_providers::MemoryConsolidationConfig,
141}
142
143#[derive(Default)]
144pub struct ChannelRuntime {
145 pub listeners: Option<tokio::task::JoinSet<()>>,
146 pub statuses: std::collections::HashMap<String, ChannelStatus>,
147}
148
149#[derive(Debug, Clone)]
150pub struct StatusIndexUpdate {
151 pub key: String,
152 pub value: Value,
153}
154
155impl AppState {
156 pub fn new_starting(attempt_id: String, in_process: bool) -> Self {
157 Self {
158 runtime: Arc::new(OnceLock::new()),
159 startup: Arc::new(RwLock::new(StartupState {
160 status: StartupStatus::Starting,
161 phase: "boot".to_string(),
162 started_at_ms: now_ms(),
163 attempt_id,
164 last_error: None,
165 })),
166 in_process_mode: Arc::new(AtomicBool::new(in_process)),
167 api_token: Arc::new(RwLock::new(None)),
168 engine_leases: Arc::new(RwLock::new(std::collections::HashMap::new())),
169 managed_worktrees: Arc::new(RwLock::new(std::collections::HashMap::new())),
170 run_registry: RunRegistry::new(),
171 run_stale_ms: config::env::resolve_run_stale_ms(),
172 memory_records: Arc::new(RwLock::new(std::collections::HashMap::new())),
173 memory_audit_log: Arc::new(RwLock::new(Vec::new())),
174 missions: Arc::new(RwLock::new(std::collections::HashMap::new())),
175 shared_resources: Arc::new(RwLock::new(std::collections::HashMap::new())),
176 shared_resources_path: config::paths::resolve_shared_resources_path(),
177 routines: Arc::new(RwLock::new(std::collections::HashMap::new())),
178 routine_history: Arc::new(RwLock::new(std::collections::HashMap::new())),
179 routine_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
180 automations_v2: Arc::new(RwLock::new(std::collections::HashMap::new())),
181 automation_v2_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
182 automation_scheduler: Arc::new(RwLock::new(automation::AutomationScheduler::new(
183 config::env::resolve_scheduler_max_concurrent_runs(),
184 ))),
185 automation_scheduler_stopping: Arc::new(AtomicBool::new(false)),
186 workflow_plans: Arc::new(RwLock::new(std::collections::HashMap::new())),
187 workflow_plan_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
188 optimization_campaigns: Arc::new(RwLock::new(std::collections::HashMap::new())),
189 optimization_experiments: Arc::new(RwLock::new(std::collections::HashMap::new())),
190 bug_monitor_config: Arc::new(
191 RwLock::new(config::env::resolve_bug_monitor_env_config()),
192 ),
193 bug_monitor_drafts: Arc::new(RwLock::new(std::collections::HashMap::new())),
194 bug_monitor_incidents: Arc::new(RwLock::new(std::collections::HashMap::new())),
195 bug_monitor_posts: Arc::new(RwLock::new(std::collections::HashMap::new())),
196 external_actions: Arc::new(RwLock::new(std::collections::HashMap::new())),
197 bug_monitor_runtime_status: Arc::new(RwLock::new(BugMonitorRuntimeStatus::default())),
198 workflows: Arc::new(RwLock::new(WorkflowRegistry::default())),
199 workflow_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
200 workflow_hook_overrides: Arc::new(RwLock::new(std::collections::HashMap::new())),
201 workflow_dispatch_seen: Arc::new(RwLock::new(std::collections::HashMap::new())),
202 routine_session_policies: Arc::new(RwLock::new(std::collections::HashMap::new())),
203 automation_v2_session_runs: Arc::new(RwLock::new(std::collections::HashMap::new())),
204 routines_path: config::paths::resolve_routines_path(),
205 routine_history_path: config::paths::resolve_routine_history_path(),
206 routine_runs_path: config::paths::resolve_routine_runs_path(),
207 automations_v2_path: config::paths::resolve_automations_v2_path(),
208 automation_v2_runs_path: config::paths::resolve_automation_v2_runs_path(),
209 optimization_campaigns_path: config::paths::resolve_optimization_campaigns_path(),
210 optimization_experiments_path: config::paths::resolve_optimization_experiments_path(),
211 bug_monitor_config_path: config::paths::resolve_bug_monitor_config_path(),
212 bug_monitor_drafts_path: config::paths::resolve_bug_monitor_drafts_path(),
213 bug_monitor_incidents_path: config::paths::resolve_bug_monitor_incidents_path(),
214 bug_monitor_posts_path: config::paths::resolve_bug_monitor_posts_path(),
215 external_actions_path: config::paths::resolve_external_actions_path(),
216 workflow_runs_path: config::paths::resolve_workflow_runs_path(),
217 workflow_hook_overrides_path: config::paths::resolve_workflow_hook_overrides_path(),
218 agent_teams: AgentTeamRuntime::new(config::paths::resolve_agent_team_audit_path()),
219 web_ui_enabled: Arc::new(AtomicBool::new(false)),
220 web_ui_prefix: Arc::new(std::sync::RwLock::new("/admin".to_string())),
221 server_base_url: Arc::new(std::sync::RwLock::new("http://127.0.0.1:39731".to_string())),
222 channels_runtime: Arc::new(tokio::sync::Mutex::new(ChannelRuntime::default())),
223 host_runtime_context: detect_host_runtime_context(),
224 token_cost_per_1k_usd: config::env::resolve_token_cost_per_1k_usd(),
225 pack_manager: Arc::new(PackManager::new(PackManager::default_root())),
226 capability_resolver: Arc::new(CapabilityResolver::new(PackManager::default_root())),
227 preset_registry: Arc::new(PresetRegistry::new(
228 PackManager::default_root(),
229 resolve_shared_paths()
230 .map(|paths| paths.canonical_root)
231 .unwrap_or_else(|_| {
232 dirs::home_dir()
233 .unwrap_or_else(|| PathBuf::from("."))
234 .join(".tandem")
235 }),
236 )),
237 }
238 }
239
240 pub fn is_ready(&self) -> bool {
241 self.runtime.get().is_some()
242 }
243
244 pub async fn wait_until_ready_or_failed(&self, attempts: usize, sleep_ms: u64) -> bool {
245 for _ in 0..attempts {
246 if self.is_ready() {
247 return true;
248 }
249 let startup = self.startup_snapshot().await;
250 if matches!(startup.status, StartupStatus::Failed) {
251 return false;
252 }
253 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
254 }
255 self.is_ready()
256 }
257
258 pub fn mode_label(&self) -> &'static str {
259 if self.in_process_mode.load(Ordering::Relaxed) {
260 "in-process"
261 } else {
262 "sidecar"
263 }
264 }
265
266 pub fn configure_web_ui(&self, enabled: bool, prefix: String) {
267 self.web_ui_enabled.store(enabled, Ordering::Relaxed);
268 if let Ok(mut guard) = self.web_ui_prefix.write() {
269 *guard = config::webui::normalize_web_ui_prefix(&prefix);
270 }
271 }
272
273 pub fn web_ui_enabled(&self) -> bool {
274 self.web_ui_enabled.load(Ordering::Relaxed)
275 }
276
277 pub fn web_ui_prefix(&self) -> String {
278 self.web_ui_prefix
279 .read()
280 .map(|v| v.clone())
281 .unwrap_or_else(|_| "/admin".to_string())
282 }
283
284 pub fn set_server_base_url(&self, base_url: String) {
285 if let Ok(mut guard) = self.server_base_url.write() {
286 *guard = base_url;
287 }
288 }
289
290 pub fn server_base_url(&self) -> String {
291 self.server_base_url
292 .read()
293 .map(|v| v.clone())
294 .unwrap_or_else(|_| "http://127.0.0.1:39731".to_string())
295 }
296
297 pub async fn api_token(&self) -> Option<String> {
298 self.api_token.read().await.clone()
299 }
300
301 pub async fn set_api_token(&self, token: Option<String>) {
302 *self.api_token.write().await = token;
303 }
304
305 pub async fn startup_snapshot(&self) -> StartupSnapshot {
306 let state = self.startup.read().await.clone();
307 StartupSnapshot {
308 elapsed_ms: now_ms().saturating_sub(state.started_at_ms),
309 status: state.status,
310 phase: state.phase,
311 started_at_ms: state.started_at_ms,
312 attempt_id: state.attempt_id,
313 last_error: state.last_error,
314 }
315 }
316
317 pub fn host_runtime_context(&self) -> HostRuntimeContext {
318 self.runtime
319 .get()
320 .map(|runtime| runtime.host_runtime_context.clone())
321 .unwrap_or_else(|| self.host_runtime_context.clone())
322 }
323
324 pub async fn set_phase(&self, phase: impl Into<String>) {
325 let mut startup = self.startup.write().await;
326 startup.phase = phase.into();
327 }
328
329 pub async fn mark_ready(&self, runtime: RuntimeState) -> anyhow::Result<()> {
330 self.runtime
331 .set(runtime)
332 .map_err(|_| anyhow::anyhow!("runtime already initialized"))?;
333 #[cfg(feature = "browser")]
334 self.register_browser_tools().await?;
335 self.tools
336 .register_tool(
337 "pack_builder".to_string(),
338 Arc::new(crate::pack_builder::PackBuilderTool::new(self.clone())),
339 )
340 .await;
341 self.tools
342 .register_tool(
343 "mcp_list".to_string(),
344 Arc::new(crate::http::mcp::McpListTool::new(self.clone())),
345 )
346 .await;
347 self.engine_loop
348 .set_spawn_agent_hook(std::sync::Arc::new(
349 crate::agent_teams::ServerSpawnAgentHook::new(self.clone()),
350 ))
351 .await;
352 self.engine_loop
353 .set_tool_policy_hook(std::sync::Arc::new(
354 crate::agent_teams::ServerToolPolicyHook::new(self.clone()),
355 ))
356 .await;
357 self.engine_loop
358 .set_prompt_context_hook(std::sync::Arc::new(ServerPromptContextHook::new(
359 self.clone(),
360 )))
361 .await;
362 let _ = self.load_shared_resources().await;
363 self.load_routines().await?;
364 let _ = self.load_routine_history().await;
365 let _ = self.load_routine_runs().await;
366 self.load_automations_v2().await?;
367 let _ = self.load_automation_v2_runs().await;
368 let _ = self.load_optimization_campaigns().await;
369 let _ = self.load_optimization_experiments().await;
370 let _ = self.load_bug_monitor_config().await;
371 let _ = self.load_bug_monitor_drafts().await;
372 let _ = self.load_bug_monitor_incidents().await;
373 let _ = self.load_bug_monitor_posts().await;
374 let _ = self.load_external_actions().await;
375 let _ = self.load_workflow_runs().await;
376 let _ = self.load_workflow_hook_overrides().await;
377 let _ = self.reload_workflows().await;
378 let workspace_root = self.workspace_index.snapshot().await.root;
379 let _ = self
380 .agent_teams
381 .ensure_loaded_for_workspace(&workspace_root)
382 .await;
383 let mut startup = self.startup.write().await;
384 startup.status = StartupStatus::Ready;
385 startup.phase = "ready".to_string();
386 startup.last_error = None;
387 Ok(())
388 }
389
390 pub async fn mark_failed(&self, phase: impl Into<String>, error: impl Into<String>) {
391 let mut startup = self.startup.write().await;
392 startup.status = StartupStatus::Failed;
393 startup.phase = phase.into();
394 startup.last_error = Some(error.into());
395 }
396
397 pub async fn channel_statuses(&self) -> std::collections::HashMap<String, ChannelStatus> {
398 let runtime = self.channels_runtime.lock().await;
399 runtime.statuses.clone()
400 }
401
402 pub async fn restart_channel_listeners(&self) -> anyhow::Result<()> {
403 let effective = self.config.get_effective_value().await;
404 let parsed: EffectiveAppConfig = serde_json::from_value(effective).unwrap_or_default();
405 self.configure_web_ui(parsed.web_ui.enabled, parsed.web_ui.path_prefix.clone());
406
407 let mut runtime = self.channels_runtime.lock().await;
408 if let Some(listeners) = runtime.listeners.as_mut() {
409 listeners.abort_all();
410 }
411 runtime.listeners = None;
412 runtime.statuses.clear();
413
414 let mut status_map = std::collections::HashMap::new();
415 status_map.insert(
416 "telegram".to_string(),
417 ChannelStatus {
418 enabled: parsed.channels.telegram.is_some(),
419 connected: false,
420 last_error: None,
421 active_sessions: 0,
422 meta: serde_json::json!({}),
423 },
424 );
425 status_map.insert(
426 "discord".to_string(),
427 ChannelStatus {
428 enabled: parsed.channels.discord.is_some(),
429 connected: false,
430 last_error: None,
431 active_sessions: 0,
432 meta: serde_json::json!({}),
433 },
434 );
435 status_map.insert(
436 "slack".to_string(),
437 ChannelStatus {
438 enabled: parsed.channels.slack.is_some(),
439 connected: false,
440 last_error: None,
441 active_sessions: 0,
442 meta: serde_json::json!({}),
443 },
444 );
445
446 if let Some(channels_cfg) = build_channels_config(self, &parsed.channels).await {
447 let listeners = tandem_channels::start_channel_listeners(channels_cfg).await;
448 runtime.listeners = Some(listeners);
449 for status in status_map.values_mut() {
450 if status.enabled {
451 status.connected = true;
452 }
453 }
454 }
455
456 runtime.statuses = status_map.clone();
457 drop(runtime);
458
459 self.event_bus.publish(EngineEvent::new(
460 "channel.status.changed",
461 serde_json::json!({ "channels": status_map }),
462 ));
463 Ok(())
464 }
465
466 pub async fn load_shared_resources(&self) -> anyhow::Result<()> {
467 if !self.shared_resources_path.exists() {
468 return Ok(());
469 }
470 let raw = fs::read_to_string(&self.shared_resources_path).await?;
471 let parsed =
472 serde_json::from_str::<std::collections::HashMap<String, SharedResourceRecord>>(&raw)
473 .unwrap_or_default();
474 let mut guard = self.shared_resources.write().await;
475 *guard = parsed;
476 Ok(())
477 }
478
479 pub async fn persist_shared_resources(&self) -> anyhow::Result<()> {
480 if let Some(parent) = self.shared_resources_path.parent() {
481 fs::create_dir_all(parent).await?;
482 }
483 let payload = {
484 let guard = self.shared_resources.read().await;
485 serde_json::to_string_pretty(&*guard)?
486 };
487 fs::write(&self.shared_resources_path, payload).await?;
488 Ok(())
489 }
490
491 pub async fn get_shared_resource(&self, key: &str) -> Option<SharedResourceRecord> {
492 self.shared_resources.read().await.get(key).cloned()
493 }
494
495 pub async fn list_shared_resources(
496 &self,
497 prefix: Option<&str>,
498 limit: usize,
499 ) -> Vec<SharedResourceRecord> {
500 let limit = limit.clamp(1, 500);
501 let mut rows = self
502 .shared_resources
503 .read()
504 .await
505 .values()
506 .filter(|record| {
507 if let Some(prefix) = prefix {
508 record.key.starts_with(prefix)
509 } else {
510 true
511 }
512 })
513 .cloned()
514 .collect::<Vec<_>>();
515 rows.sort_by(|a, b| a.key.cmp(&b.key));
516 rows.truncate(limit);
517 rows
518 }
519
520 pub async fn put_shared_resource(
521 &self,
522 key: String,
523 value: Value,
524 if_match_rev: Option<u64>,
525 updated_by: String,
526 ttl_ms: Option<u64>,
527 ) -> Result<SharedResourceRecord, ResourceStoreError> {
528 if !is_valid_resource_key(&key) {
529 return Err(ResourceStoreError::InvalidKey { key });
530 }
531
532 let now = now_ms();
533 let mut guard = self.shared_resources.write().await;
534 let existing = guard.get(&key).cloned();
535
536 if let Some(expected) = if_match_rev {
537 let current = existing.as_ref().map(|row| row.rev);
538 if current != Some(expected) {
539 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
540 key,
541 expected_rev: Some(expected),
542 current_rev: current,
543 }));
544 }
545 }
546
547 let next_rev = existing
548 .as_ref()
549 .map(|row| row.rev.saturating_add(1))
550 .unwrap_or(1);
551
552 let record = SharedResourceRecord {
553 key: key.clone(),
554 value,
555 rev: next_rev,
556 updated_at_ms: now,
557 updated_by,
558 ttl_ms,
559 };
560
561 let previous = guard.insert(key.clone(), record.clone());
562 drop(guard);
563
564 if let Err(error) = self.persist_shared_resources().await {
565 let mut rollback = self.shared_resources.write().await;
566 if let Some(previous) = previous {
567 rollback.insert(key, previous);
568 } else {
569 rollback.remove(&key);
570 }
571 return Err(ResourceStoreError::PersistFailed {
572 message: error.to_string(),
573 });
574 }
575
576 Ok(record)
577 }
578
579 pub async fn delete_shared_resource(
580 &self,
581 key: &str,
582 if_match_rev: Option<u64>,
583 ) -> Result<Option<SharedResourceRecord>, ResourceStoreError> {
584 if !is_valid_resource_key(key) {
585 return Err(ResourceStoreError::InvalidKey {
586 key: key.to_string(),
587 });
588 }
589
590 let mut guard = self.shared_resources.write().await;
591 let current = guard.get(key).cloned();
592 if let Some(expected) = if_match_rev {
593 let current_rev = current.as_ref().map(|row| row.rev);
594 if current_rev != Some(expected) {
595 return Err(ResourceStoreError::RevisionConflict(ResourceConflict {
596 key: key.to_string(),
597 expected_rev: Some(expected),
598 current_rev,
599 }));
600 }
601 }
602
603 let removed = guard.remove(key);
604 drop(guard);
605
606 if let Err(error) = self.persist_shared_resources().await {
607 if let Some(record) = removed.clone() {
608 self.shared_resources
609 .write()
610 .await
611 .insert(record.key.clone(), record);
612 }
613 return Err(ResourceStoreError::PersistFailed {
614 message: error.to_string(),
615 });
616 }
617
618 Ok(removed)
619 }
620
621 pub async fn load_routines(&self) -> anyhow::Result<()> {
622 if !self.routines_path.exists() {
623 return Ok(());
624 }
625 let raw = fs::read_to_string(&self.routines_path).await?;
626 match serde_json::from_str::<std::collections::HashMap<String, RoutineSpec>>(&raw) {
627 Ok(parsed) => {
628 let mut guard = self.routines.write().await;
629 *guard = parsed;
630 Ok(())
631 }
632 Err(primary_err) => {
633 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
634 if backup_path.exists() {
635 let backup_raw = fs::read_to_string(&backup_path).await?;
636 if let Ok(parsed_backup) = serde_json::from_str::<
637 std::collections::HashMap<String, RoutineSpec>,
638 >(&backup_raw)
639 {
640 let mut guard = self.routines.write().await;
641 *guard = parsed_backup;
642 return Ok(());
643 }
644 }
645 Err(anyhow::anyhow!(
646 "failed to parse routines store {}: {primary_err}",
647 self.routines_path.display()
648 ))
649 }
650 }
651 }
652
653 pub async fn load_routine_history(&self) -> anyhow::Result<()> {
654 if !self.routine_history_path.exists() {
655 return Ok(());
656 }
657 let raw = fs::read_to_string(&self.routine_history_path).await?;
658 let parsed = serde_json::from_str::<
659 std::collections::HashMap<String, Vec<RoutineHistoryEvent>>,
660 >(&raw)
661 .unwrap_or_default();
662 let mut guard = self.routine_history.write().await;
663 *guard = parsed;
664 Ok(())
665 }
666
667 pub async fn load_routine_runs(&self) -> anyhow::Result<()> {
668 if !self.routine_runs_path.exists() {
669 return Ok(());
670 }
671 let raw = fs::read_to_string(&self.routine_runs_path).await?;
672 let parsed =
673 serde_json::from_str::<std::collections::HashMap<String, RoutineRunRecord>>(&raw)
674 .unwrap_or_default();
675 let mut guard = self.routine_runs.write().await;
676 *guard = parsed;
677 Ok(())
678 }
679
680 async fn persist_routines_inner(&self, allow_empty_overwrite: bool) -> anyhow::Result<()> {
681 if let Some(parent) = self.routines_path.parent() {
682 fs::create_dir_all(parent).await?;
683 }
684 let (payload, is_empty) = {
685 let guard = self.routines.read().await;
686 (serde_json::to_string_pretty(&*guard)?, guard.is_empty())
687 };
688 if is_empty && !allow_empty_overwrite && self.routines_path.exists() {
689 let existing_raw = fs::read_to_string(&self.routines_path)
690 .await
691 .unwrap_or_default();
692 let existing_has_rows = serde_json::from_str::<
693 std::collections::HashMap<String, RoutineSpec>,
694 >(&existing_raw)
695 .map(|rows| !rows.is_empty())
696 .unwrap_or(true);
697 if existing_has_rows {
698 return Err(anyhow::anyhow!(
699 "refusing to overwrite non-empty routines store {} with empty in-memory state",
700 self.routines_path.display()
701 ));
702 }
703 }
704 let backup_path = config::paths::sibling_backup_path(&self.routines_path);
705 if self.routines_path.exists() {
706 let _ = fs::copy(&self.routines_path, &backup_path).await;
707 }
708 let tmp_path = config::paths::sibling_tmp_path(&self.routines_path);
709 fs::write(&tmp_path, payload).await?;
710 fs::rename(&tmp_path, &self.routines_path).await?;
711 Ok(())
712 }
713
714 pub async fn persist_routines(&self) -> anyhow::Result<()> {
715 self.persist_routines_inner(false).await
716 }
717
718 pub async fn persist_routine_history(&self) -> anyhow::Result<()> {
719 if let Some(parent) = self.routine_history_path.parent() {
720 fs::create_dir_all(parent).await?;
721 }
722 let payload = {
723 let guard = self.routine_history.read().await;
724 serde_json::to_string_pretty(&*guard)?
725 };
726 fs::write(&self.routine_history_path, payload).await?;
727 Ok(())
728 }
729
730 pub async fn persist_routine_runs(&self) -> anyhow::Result<()> {
731 if let Some(parent) = self.routine_runs_path.parent() {
732 fs::create_dir_all(parent).await?;
733 }
734 let payload = {
735 let guard = self.routine_runs.read().await;
736 serde_json::to_string_pretty(&*guard)?
737 };
738 fs::write(&self.routine_runs_path, payload).await?;
739 Ok(())
740 }
741
742 pub async fn put_routine(
743 &self,
744 mut routine: RoutineSpec,
745 ) -> Result<RoutineSpec, RoutineStoreError> {
746 if routine.routine_id.trim().is_empty() {
747 return Err(RoutineStoreError::InvalidRoutineId {
748 routine_id: routine.routine_id,
749 });
750 }
751
752 routine.allowed_tools = config::channels::normalize_allowed_tools(routine.allowed_tools);
753 routine.output_targets = normalize_non_empty_list(routine.output_targets);
754
755 let now = now_ms();
756 let next_schedule_fire =
757 compute_next_schedule_fire_at_ms(&routine.schedule, &routine.timezone, now)
758 .ok_or_else(|| RoutineStoreError::InvalidSchedule {
759 detail: "invalid schedule or timezone".to_string(),
760 })?;
761 match routine.schedule {
762 RoutineSchedule::IntervalSeconds { seconds } => {
763 if seconds == 0 {
764 return Err(RoutineStoreError::InvalidSchedule {
765 detail: "interval_seconds must be > 0".to_string(),
766 });
767 }
768 let _ = seconds;
769 }
770 RoutineSchedule::Cron { .. } => {}
771 }
772 if routine.next_fire_at_ms.is_none() {
773 routine.next_fire_at_ms = Some(next_schedule_fire);
774 }
775
776 let mut guard = self.routines.write().await;
777 let previous = guard.insert(routine.routine_id.clone(), routine.clone());
778 drop(guard);
779
780 if let Err(error) = self.persist_routines().await {
781 let mut rollback = self.routines.write().await;
782 if let Some(previous) = previous {
783 rollback.insert(previous.routine_id.clone(), previous);
784 } else {
785 rollback.remove(&routine.routine_id);
786 }
787 return Err(RoutineStoreError::PersistFailed {
788 message: error.to_string(),
789 });
790 }
791
792 Ok(routine)
793 }
794
795 pub async fn list_routines(&self) -> Vec<RoutineSpec> {
796 let mut rows = self
797 .routines
798 .read()
799 .await
800 .values()
801 .cloned()
802 .collect::<Vec<_>>();
803 rows.sort_by(|a, b| a.routine_id.cmp(&b.routine_id));
804 rows
805 }
806
807 pub async fn get_routine(&self, routine_id: &str) -> Option<RoutineSpec> {
808 self.routines.read().await.get(routine_id).cloned()
809 }
810
811 pub async fn delete_routine(
812 &self,
813 routine_id: &str,
814 ) -> Result<Option<RoutineSpec>, RoutineStoreError> {
815 let mut guard = self.routines.write().await;
816 let removed = guard.remove(routine_id);
817 drop(guard);
818
819 let allow_empty_overwrite = self.routines.read().await.is_empty();
820 if let Err(error) = self.persist_routines_inner(allow_empty_overwrite).await {
821 if let Some(removed) = removed.clone() {
822 self.routines
823 .write()
824 .await
825 .insert(removed.routine_id.clone(), removed);
826 }
827 return Err(RoutineStoreError::PersistFailed {
828 message: error.to_string(),
829 });
830 }
831 Ok(removed)
832 }
833
834 pub async fn evaluate_routine_misfires(&self, now_ms: u64) -> Vec<RoutineTriggerPlan> {
835 let mut plans = Vec::new();
836 let mut guard = self.routines.write().await;
837 for routine in guard.values_mut() {
838 if routine.status != RoutineStatus::Active {
839 continue;
840 }
841 let Some(next_fire_at_ms) = routine.next_fire_at_ms else {
842 continue;
843 };
844 if now_ms < next_fire_at_ms {
845 continue;
846 }
847 let (run_count, next_fire_at_ms) = compute_misfire_plan_for_schedule(
848 now_ms,
849 next_fire_at_ms,
850 &routine.schedule,
851 &routine.timezone,
852 &routine.misfire_policy,
853 );
854 routine.next_fire_at_ms = Some(next_fire_at_ms);
855 if run_count == 0 {
856 continue;
857 }
858 plans.push(RoutineTriggerPlan {
859 routine_id: routine.routine_id.clone(),
860 run_count,
861 scheduled_at_ms: now_ms,
862 next_fire_at_ms,
863 });
864 }
865 drop(guard);
866 let _ = self.persist_routines().await;
867 plans
868 }
869
870 pub async fn mark_routine_fired(
871 &self,
872 routine_id: &str,
873 fired_at_ms: u64,
874 ) -> Option<RoutineSpec> {
875 let mut guard = self.routines.write().await;
876 let routine = guard.get_mut(routine_id)?;
877 routine.last_fired_at_ms = Some(fired_at_ms);
878 let updated = routine.clone();
879 drop(guard);
880 let _ = self.persist_routines().await;
881 Some(updated)
882 }
883
884 pub async fn append_routine_history(&self, event: RoutineHistoryEvent) {
885 let mut history = self.routine_history.write().await;
886 history
887 .entry(event.routine_id.clone())
888 .or_default()
889 .push(event);
890 drop(history);
891 let _ = self.persist_routine_history().await;
892 }
893
894 pub async fn list_routine_history(
895 &self,
896 routine_id: &str,
897 limit: usize,
898 ) -> Vec<RoutineHistoryEvent> {
899 let limit = limit.clamp(1, 500);
900 let mut rows = self
901 .routine_history
902 .read()
903 .await
904 .get(routine_id)
905 .cloned()
906 .unwrap_or_default();
907 rows.sort_by(|a, b| b.fired_at_ms.cmp(&a.fired_at_ms));
908 rows.truncate(limit);
909 rows
910 }
911
912 pub async fn create_routine_run(
913 &self,
914 routine: &RoutineSpec,
915 trigger_type: &str,
916 run_count: u32,
917 status: RoutineRunStatus,
918 detail: Option<String>,
919 ) -> RoutineRunRecord {
920 let now = now_ms();
921 let record = RoutineRunRecord {
922 run_id: format!("routine-run-{}", uuid::Uuid::new_v4()),
923 routine_id: routine.routine_id.clone(),
924 trigger_type: trigger_type.to_string(),
925 run_count,
926 status,
927 created_at_ms: now,
928 updated_at_ms: now,
929 fired_at_ms: Some(now),
930 started_at_ms: None,
931 finished_at_ms: None,
932 requires_approval: routine.requires_approval,
933 approval_reason: None,
934 denial_reason: None,
935 paused_reason: None,
936 detail,
937 entrypoint: routine.entrypoint.clone(),
938 args: routine.args.clone(),
939 allowed_tools: routine.allowed_tools.clone(),
940 output_targets: routine.output_targets.clone(),
941 artifacts: Vec::new(),
942 active_session_ids: Vec::new(),
943 latest_session_id: None,
944 prompt_tokens: 0,
945 completion_tokens: 0,
946 total_tokens: 0,
947 estimated_cost_usd: 0.0,
948 };
949 self.routine_runs
950 .write()
951 .await
952 .insert(record.run_id.clone(), record.clone());
953 let _ = self.persist_routine_runs().await;
954 record
955 }
956
957 pub async fn get_routine_run(&self, run_id: &str) -> Option<RoutineRunRecord> {
958 self.routine_runs.read().await.get(run_id).cloned()
959 }
960
961 pub async fn list_routine_runs(
962 &self,
963 routine_id: Option<&str>,
964 limit: usize,
965 ) -> Vec<RoutineRunRecord> {
966 let mut rows = self
967 .routine_runs
968 .read()
969 .await
970 .values()
971 .filter(|row| {
972 if let Some(id) = routine_id {
973 row.routine_id == id
974 } else {
975 true
976 }
977 })
978 .cloned()
979 .collect::<Vec<_>>();
980 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
981 rows.truncate(limit.clamp(1, 500));
982 rows
983 }
984
985 pub async fn claim_next_queued_routine_run(&self) -> Option<RoutineRunRecord> {
986 let mut guard = self.routine_runs.write().await;
987 let next_run_id = guard
988 .values()
989 .filter(|row| row.status == RoutineRunStatus::Queued)
990 .min_by(|a, b| {
991 a.created_at_ms
992 .cmp(&b.created_at_ms)
993 .then_with(|| a.run_id.cmp(&b.run_id))
994 })
995 .map(|row| row.run_id.clone())?;
996 let now = now_ms();
997 let row = guard.get_mut(&next_run_id)?;
998 row.status = RoutineRunStatus::Running;
999 row.updated_at_ms = now;
1000 row.started_at_ms = Some(now);
1001 let claimed = row.clone();
1002 drop(guard);
1003 let _ = self.persist_routine_runs().await;
1004 Some(claimed)
1005 }
1006
1007 pub async fn set_routine_session_policy(
1008 &self,
1009 session_id: String,
1010 run_id: String,
1011 routine_id: String,
1012 allowed_tools: Vec<String>,
1013 ) {
1014 let policy = RoutineSessionPolicy {
1015 session_id: session_id.clone(),
1016 run_id,
1017 routine_id,
1018 allowed_tools: config::channels::normalize_allowed_tools(allowed_tools),
1019 };
1020 self.routine_session_policies
1021 .write()
1022 .await
1023 .insert(session_id, policy);
1024 }
1025
1026 pub async fn routine_session_policy(&self, session_id: &str) -> Option<RoutineSessionPolicy> {
1027 self.routine_session_policies
1028 .read()
1029 .await
1030 .get(session_id)
1031 .cloned()
1032 }
1033
1034 pub async fn clear_routine_session_policy(&self, session_id: &str) {
1035 self.routine_session_policies
1036 .write()
1037 .await
1038 .remove(session_id);
1039 }
1040
1041 pub async fn update_routine_run_status(
1042 &self,
1043 run_id: &str,
1044 status: RoutineRunStatus,
1045 reason: Option<String>,
1046 ) -> Option<RoutineRunRecord> {
1047 let mut guard = self.routine_runs.write().await;
1048 let row = guard.get_mut(run_id)?;
1049 row.status = status.clone();
1050 row.updated_at_ms = now_ms();
1051 match status {
1052 RoutineRunStatus::PendingApproval => row.approval_reason = reason,
1053 RoutineRunStatus::Running => {
1054 row.started_at_ms.get_or_insert_with(now_ms);
1055 if let Some(detail) = reason {
1056 row.detail = Some(detail);
1057 }
1058 }
1059 RoutineRunStatus::Denied => row.denial_reason = reason,
1060 RoutineRunStatus::Paused => row.paused_reason = reason,
1061 RoutineRunStatus::Completed
1062 | RoutineRunStatus::Failed
1063 | RoutineRunStatus::Cancelled => {
1064 row.finished_at_ms = Some(now_ms());
1065 if let Some(detail) = reason {
1066 row.detail = Some(detail);
1067 }
1068 }
1069 _ => {
1070 if let Some(detail) = reason {
1071 row.detail = Some(detail);
1072 }
1073 }
1074 }
1075 let updated = row.clone();
1076 drop(guard);
1077 let _ = self.persist_routine_runs().await;
1078 Some(updated)
1079 }
1080
1081 pub async fn append_routine_run_artifact(
1082 &self,
1083 run_id: &str,
1084 artifact: RoutineRunArtifact,
1085 ) -> Option<RoutineRunRecord> {
1086 let mut guard = self.routine_runs.write().await;
1087 let row = guard.get_mut(run_id)?;
1088 row.updated_at_ms = now_ms();
1089 row.artifacts.push(artifact);
1090 let updated = row.clone();
1091 drop(guard);
1092 let _ = self.persist_routine_runs().await;
1093 Some(updated)
1094 }
1095
1096 pub async fn add_active_session_id(
1097 &self,
1098 run_id: &str,
1099 session_id: String,
1100 ) -> Option<RoutineRunRecord> {
1101 let mut guard = self.routine_runs.write().await;
1102 let row = guard.get_mut(run_id)?;
1103 if !row.active_session_ids.iter().any(|id| id == &session_id) {
1104 row.active_session_ids.push(session_id);
1105 }
1106 row.latest_session_id = row.active_session_ids.last().cloned();
1107 row.updated_at_ms = now_ms();
1108 let updated = row.clone();
1109 drop(guard);
1110 let _ = self.persist_routine_runs().await;
1111 Some(updated)
1112 }
1113
1114 pub async fn clear_active_session_id(
1115 &self,
1116 run_id: &str,
1117 session_id: &str,
1118 ) -> Option<RoutineRunRecord> {
1119 let mut guard = self.routine_runs.write().await;
1120 let row = guard.get_mut(run_id)?;
1121 row.active_session_ids.retain(|id| id != session_id);
1122 row.updated_at_ms = now_ms();
1123 let updated = row.clone();
1124 drop(guard);
1125 let _ = self.persist_routine_runs().await;
1126 Some(updated)
1127 }
1128
1129 pub async fn load_automations_v2(&self) -> anyhow::Result<()> {
1130 let mut merged = std::collections::HashMap::<String, AutomationV2Spec>::new();
1131 let mut loaded_from_alternate = false;
1132 let mut migrated = false;
1133 let mut path_counts = Vec::new();
1134 let mut canonical_loaded = false;
1135 if self.automations_v2_path.exists() {
1136 let raw = fs::read_to_string(&self.automations_v2_path).await?;
1137 if raw.trim().is_empty() || raw.trim() == "{}" {
1138 path_counts.push((self.automations_v2_path.clone(), 0usize));
1139 } else {
1140 let parsed = parse_automation_v2_file(&raw);
1141 path_counts.push((self.automations_v2_path.clone(), parsed.len()));
1142 canonical_loaded = !parsed.is_empty();
1143 merged = parsed;
1144 }
1145 } else {
1146 path_counts.push((self.automations_v2_path.clone(), 0usize));
1147 }
1148 if !canonical_loaded {
1149 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1150 if path == self.automations_v2_path {
1151 continue;
1152 }
1153 if !path.exists() {
1154 path_counts.push((path, 0usize));
1155 continue;
1156 }
1157 let raw = fs::read_to_string(&path).await?;
1158 if raw.trim().is_empty() || raw.trim() == "{}" {
1159 path_counts.push((path, 0usize));
1160 continue;
1161 }
1162 let parsed = parse_automation_v2_file(&raw);
1163 path_counts.push((path.clone(), parsed.len()));
1164 if !parsed.is_empty() {
1165 loaded_from_alternate = true;
1166 }
1167 for (automation_id, automation) in parsed {
1168 match merged.get(&automation_id) {
1169 Some(existing) if existing.updated_at_ms > automation.updated_at_ms => {}
1170 _ => {
1171 merged.insert(automation_id, automation);
1172 }
1173 }
1174 }
1175 }
1176 } else {
1177 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1178 if path == self.automations_v2_path {
1179 continue;
1180 }
1181 if !path.exists() {
1182 path_counts.push((path, 0usize));
1183 continue;
1184 }
1185 let raw = fs::read_to_string(&path).await?;
1186 let count = if raw.trim().is_empty() || raw.trim() == "{}" {
1187 0usize
1188 } else {
1189 parse_automation_v2_file(&raw).len()
1190 };
1191 path_counts.push((path, count));
1192 }
1193 }
1194 let active_path = self.automations_v2_path.display().to_string();
1195 let path_count_summary = path_counts
1196 .iter()
1197 .map(|(path, count)| format!("{}={count}", path.display()))
1198 .collect::<Vec<_>>();
1199 tracing::info!(
1200 active_path,
1201 canonical_loaded,
1202 path_counts = ?path_count_summary,
1203 merged_count = merged.len(),
1204 "loaded automation v2 definitions"
1205 );
1206 for automation in merged.values_mut() {
1207 migrated = migrate_bundled_studio_research_split_automation(automation) || migrated;
1208 }
1209 *self.automations_v2.write().await = merged;
1210 if loaded_from_alternate || migrated {
1211 let _ = self.persist_automations_v2().await;
1212 } else if canonical_loaded {
1213 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1214 }
1215 Ok(())
1216 }
1217
1218 pub async fn persist_automations_v2(&self) -> anyhow::Result<()> {
1219 let payload = {
1220 let guard = self.automations_v2.read().await;
1221 serde_json::to_string_pretty(&*guard)?
1222 };
1223 if let Some(parent) = self.automations_v2_path.parent() {
1224 fs::create_dir_all(parent).await?;
1225 }
1226 fs::write(&self.automations_v2_path, &payload).await?;
1227 let _ = cleanup_stale_legacy_automations_v2_file(&self.automations_v2_path).await;
1228 Ok(())
1229 }
1230
1231 pub async fn load_automation_v2_runs(&self) -> anyhow::Result<()> {
1232 let mut merged = std::collections::HashMap::<String, AutomationV2RunRecord>::new();
1233 let mut loaded_from_alternate = false;
1234 let mut path_counts = Vec::new();
1235 for path in candidate_automation_v2_runs_paths(&self.automation_v2_runs_path) {
1236 if !path.exists() {
1237 path_counts.push((path, 0usize));
1238 continue;
1239 }
1240 let raw = fs::read_to_string(&path).await?;
1241 if raw.trim().is_empty() || raw.trim() == "{}" {
1242 path_counts.push((path, 0usize));
1243 continue;
1244 }
1245 let parsed = parse_automation_v2_runs_file(&raw);
1246 path_counts.push((path.clone(), parsed.len()));
1247 if path != self.automation_v2_runs_path {
1248 loaded_from_alternate = loaded_from_alternate || !parsed.is_empty();
1249 }
1250 for (run_id, run) in parsed {
1251 match merged.get(&run_id) {
1252 Some(existing) if existing.updated_at_ms > run.updated_at_ms => {}
1253 _ => {
1254 merged.insert(run_id, run);
1255 }
1256 }
1257 }
1258 }
1259 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1260 let run_path_count_summary = path_counts
1261 .iter()
1262 .map(|(path, count)| format!("{}={count}", path.display()))
1263 .collect::<Vec<_>>();
1264 tracing::info!(
1265 active_path = active_runs_path,
1266 path_counts = ?run_path_count_summary,
1267 merged_count = merged.len(),
1268 "loaded automation v2 runs"
1269 );
1270 *self.automation_v2_runs.write().await = merged;
1271 let recovered = self
1272 .recover_automation_definitions_from_run_snapshots()
1273 .await?;
1274 let automation_count = self.automations_v2.read().await.len();
1275 let run_count = self.automation_v2_runs.read().await.len();
1276 if automation_count == 0 && run_count > 0 {
1277 let active_automations_path = self.automations_v2_path.display().to_string();
1278 let active_runs_path = self.automation_v2_runs_path.display().to_string();
1279 tracing::warn!(
1280 active_automations_path,
1281 active_runs_path,
1282 run_count,
1283 "automation v2 definitions are empty while run history exists"
1284 );
1285 }
1286 if loaded_from_alternate || recovered > 0 {
1287 let _ = self.persist_automation_v2_runs().await;
1288 }
1289 Ok(())
1290 }
1291
1292 pub async fn persist_automation_v2_runs(&self) -> anyhow::Result<()> {
1293 let payload = {
1294 let guard = self.automation_v2_runs.read().await;
1295 serde_json::to_string_pretty(&*guard)?
1296 };
1297 if let Some(parent) = self.automation_v2_runs_path.parent() {
1298 fs::create_dir_all(parent).await?;
1299 }
1300 fs::write(&self.automation_v2_runs_path, &payload).await?;
1301 Ok(())
1302 }
1303
1304 pub async fn load_optimization_campaigns(&self) -> anyhow::Result<()> {
1305 if !self.optimization_campaigns_path.exists() {
1306 return Ok(());
1307 }
1308 let raw = fs::read_to_string(&self.optimization_campaigns_path).await?;
1309 let parsed = parse_optimization_campaigns_file(&raw);
1310 *self.optimization_campaigns.write().await = parsed;
1311 Ok(())
1312 }
1313
1314 pub async fn persist_optimization_campaigns(&self) -> anyhow::Result<()> {
1315 let payload = {
1316 let guard = self.optimization_campaigns.read().await;
1317 serde_json::to_string_pretty(&*guard)?
1318 };
1319 if let Some(parent) = self.optimization_campaigns_path.parent() {
1320 fs::create_dir_all(parent).await?;
1321 }
1322 fs::write(&self.optimization_campaigns_path, payload).await?;
1323 Ok(())
1324 }
1325
1326 pub async fn load_optimization_experiments(&self) -> anyhow::Result<()> {
1327 if !self.optimization_experiments_path.exists() {
1328 return Ok(());
1329 }
1330 let raw = fs::read_to_string(&self.optimization_experiments_path).await?;
1331 let parsed = parse_optimization_experiments_file(&raw);
1332 *self.optimization_experiments.write().await = parsed;
1333 Ok(())
1334 }
1335
1336 pub async fn persist_optimization_experiments(&self) -> anyhow::Result<()> {
1337 let payload = {
1338 let guard = self.optimization_experiments.read().await;
1339 serde_json::to_string_pretty(&*guard)?
1340 };
1341 if let Some(parent) = self.optimization_experiments_path.parent() {
1342 fs::create_dir_all(parent).await?;
1343 }
1344 fs::write(&self.optimization_experiments_path, payload).await?;
1345 Ok(())
1346 }
1347
1348 async fn verify_automation_v2_persisted(
1349 &self,
1350 automation_id: &str,
1351 expected_present: bool,
1352 ) -> anyhow::Result<()> {
1353 let active_raw = if self.automations_v2_path.exists() {
1354 fs::read_to_string(&self.automations_v2_path).await?
1355 } else {
1356 String::new()
1357 };
1358 let active_parsed = parse_automation_v2_file(&active_raw);
1359 let active_present = active_parsed.contains_key(automation_id);
1360 if active_present != expected_present {
1361 let active_path = self.automations_v2_path.display().to_string();
1362 tracing::error!(
1363 automation_id,
1364 expected_present,
1365 actual_present = active_present,
1366 count = active_parsed.len(),
1367 active_path,
1368 "automation v2 persistence verification failed"
1369 );
1370 anyhow::bail!(
1371 "automation v2 persistence verification failed for `{}`",
1372 automation_id
1373 );
1374 }
1375 let mut alternate_mismatches = Vec::new();
1376 for path in candidate_automations_v2_paths(&self.automations_v2_path) {
1377 if path == self.automations_v2_path {
1378 continue;
1379 }
1380 let raw = if path.exists() {
1381 fs::read_to_string(&path).await?
1382 } else {
1383 String::new()
1384 };
1385 let parsed = parse_automation_v2_file(&raw);
1386 let present = parsed.contains_key(automation_id);
1387 if present != expected_present {
1388 alternate_mismatches.push(format!(
1389 "{} expected_present={} actual_present={} count={}",
1390 path.display(),
1391 expected_present,
1392 present,
1393 parsed.len()
1394 ));
1395 }
1396 }
1397 if !alternate_mismatches.is_empty() {
1398 let active_path = self.automations_v2_path.display().to_string();
1399 tracing::warn!(
1400 automation_id,
1401 expected_present,
1402 mismatches = ?alternate_mismatches,
1403 active_path,
1404 "automation v2 alternate persistence paths are stale"
1405 );
1406 }
1407 Ok(())
1408 }
1409
1410 async fn recover_automation_definitions_from_run_snapshots(&self) -> anyhow::Result<usize> {
1411 let runs = self
1412 .automation_v2_runs
1413 .read()
1414 .await
1415 .values()
1416 .cloned()
1417 .collect::<Vec<_>>();
1418 let mut guard = self.automations_v2.write().await;
1419 let mut recovered = 0usize;
1420 for run in runs {
1421 let Some(snapshot) = run.automation_snapshot.clone() else {
1422 continue;
1423 };
1424 let should_replace = match guard.get(&run.automation_id) {
1425 Some(existing) => existing.updated_at_ms < snapshot.updated_at_ms,
1426 None => true,
1427 };
1428 if should_replace {
1429 if !guard.contains_key(&run.automation_id) {
1430 recovered += 1;
1431 }
1432 guard.insert(run.automation_id.clone(), snapshot);
1433 }
1434 }
1435 drop(guard);
1436 if recovered > 0 {
1437 let active_path = self.automations_v2_path.display().to_string();
1438 tracing::warn!(
1439 recovered,
1440 active_path,
1441 "recovered automation v2 definitions from run snapshots"
1442 );
1443 self.persist_automations_v2().await?;
1444 }
1445 Ok(recovered)
1446 }
1447
1448 pub async fn load_bug_monitor_config(&self) -> anyhow::Result<()> {
1449 let path = if self.bug_monitor_config_path.exists() {
1450 self.bug_monitor_config_path.clone()
1451 } else if config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1452 .exists()
1453 {
1454 config::paths::legacy_failure_reporter_path("failure_reporter_config.json")
1455 } else {
1456 return Ok(());
1457 };
1458 let raw = fs::read_to_string(path).await?;
1459 let parsed = serde_json::from_str::<BugMonitorConfig>(&raw)
1460 .unwrap_or_else(|_| config::env::resolve_bug_monitor_env_config());
1461 *self.bug_monitor_config.write().await = parsed;
1462 Ok(())
1463 }
1464
1465 pub async fn persist_bug_monitor_config(&self) -> anyhow::Result<()> {
1466 if let Some(parent) = self.bug_monitor_config_path.parent() {
1467 fs::create_dir_all(parent).await?;
1468 }
1469 let payload = {
1470 let guard = self.bug_monitor_config.read().await;
1471 serde_json::to_string_pretty(&*guard)?
1472 };
1473 fs::write(&self.bug_monitor_config_path, payload).await?;
1474 Ok(())
1475 }
1476
1477 pub async fn bug_monitor_config(&self) -> BugMonitorConfig {
1478 self.bug_monitor_config.read().await.clone()
1479 }
1480
1481 pub async fn put_bug_monitor_config(
1482 &self,
1483 mut config: BugMonitorConfig,
1484 ) -> anyhow::Result<BugMonitorConfig> {
1485 config.workspace_root = config
1486 .workspace_root
1487 .as_ref()
1488 .map(|v| v.trim().to_string())
1489 .filter(|v| !v.is_empty());
1490 if let Some(repo) = config.repo.as_ref() {
1491 if !repo.is_empty() && !is_valid_owner_repo_slug(repo) {
1492 anyhow::bail!("repo must be in owner/repo format");
1493 }
1494 }
1495 if let Some(server) = config.mcp_server.as_ref() {
1496 let servers = self.mcp.list().await;
1497 if !servers.contains_key(server) {
1498 anyhow::bail!("unknown mcp server `{server}`");
1499 }
1500 }
1501 if let Some(model_policy) = config.model_policy.as_ref() {
1502 crate::http::routines_automations::validate_model_policy(model_policy)
1503 .map_err(anyhow::Error::msg)?;
1504 }
1505 config.updated_at_ms = now_ms();
1506 *self.bug_monitor_config.write().await = config.clone();
1507 self.persist_bug_monitor_config().await?;
1508 Ok(config)
1509 }
1510
1511 pub async fn load_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1512 let path = if self.bug_monitor_drafts_path.exists() {
1513 self.bug_monitor_drafts_path.clone()
1514 } else if config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1515 .exists()
1516 {
1517 config::paths::legacy_failure_reporter_path("failure_reporter_drafts.json")
1518 } else {
1519 return Ok(());
1520 };
1521 let raw = fs::read_to_string(path).await?;
1522 let parsed =
1523 serde_json::from_str::<std::collections::HashMap<String, BugMonitorDraftRecord>>(&raw)
1524 .unwrap_or_default();
1525 *self.bug_monitor_drafts.write().await = parsed;
1526 Ok(())
1527 }
1528
1529 pub async fn persist_bug_monitor_drafts(&self) -> anyhow::Result<()> {
1530 if let Some(parent) = self.bug_monitor_drafts_path.parent() {
1531 fs::create_dir_all(parent).await?;
1532 }
1533 let payload = {
1534 let guard = self.bug_monitor_drafts.read().await;
1535 serde_json::to_string_pretty(&*guard)?
1536 };
1537 fs::write(&self.bug_monitor_drafts_path, payload).await?;
1538 Ok(())
1539 }
1540
1541 pub async fn load_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1542 let path = if self.bug_monitor_incidents_path.exists() {
1543 self.bug_monitor_incidents_path.clone()
1544 } else if config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1545 .exists()
1546 {
1547 config::paths::legacy_failure_reporter_path("failure_reporter_incidents.json")
1548 } else {
1549 return Ok(());
1550 };
1551 let raw = fs::read_to_string(path).await?;
1552 let parsed = serde_json::from_str::<
1553 std::collections::HashMap<String, BugMonitorIncidentRecord>,
1554 >(&raw)
1555 .unwrap_or_default();
1556 *self.bug_monitor_incidents.write().await = parsed;
1557 Ok(())
1558 }
1559
1560 pub async fn persist_bug_monitor_incidents(&self) -> anyhow::Result<()> {
1561 if let Some(parent) = self.bug_monitor_incidents_path.parent() {
1562 fs::create_dir_all(parent).await?;
1563 }
1564 let payload = {
1565 let guard = self.bug_monitor_incidents.read().await;
1566 serde_json::to_string_pretty(&*guard)?
1567 };
1568 fs::write(&self.bug_monitor_incidents_path, payload).await?;
1569 Ok(())
1570 }
1571
1572 pub async fn load_bug_monitor_posts(&self) -> anyhow::Result<()> {
1573 let path = if self.bug_monitor_posts_path.exists() {
1574 self.bug_monitor_posts_path.clone()
1575 } else if config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1576 .exists()
1577 {
1578 config::paths::legacy_failure_reporter_path("failure_reporter_posts.json")
1579 } else {
1580 return Ok(());
1581 };
1582 let raw = fs::read_to_string(path).await?;
1583 let parsed =
1584 serde_json::from_str::<std::collections::HashMap<String, BugMonitorPostRecord>>(&raw)
1585 .unwrap_or_default();
1586 *self.bug_monitor_posts.write().await = parsed;
1587 Ok(())
1588 }
1589
1590 pub async fn persist_bug_monitor_posts(&self) -> anyhow::Result<()> {
1591 if let Some(parent) = self.bug_monitor_posts_path.parent() {
1592 fs::create_dir_all(parent).await?;
1593 }
1594 let payload = {
1595 let guard = self.bug_monitor_posts.read().await;
1596 serde_json::to_string_pretty(&*guard)?
1597 };
1598 fs::write(&self.bug_monitor_posts_path, payload).await?;
1599 Ok(())
1600 }
1601
1602 pub async fn load_external_actions(&self) -> anyhow::Result<()> {
1603 if !self.external_actions_path.exists() {
1604 return Ok(());
1605 }
1606 let raw = fs::read_to_string(&self.external_actions_path).await?;
1607 let parsed =
1608 serde_json::from_str::<std::collections::HashMap<String, ExternalActionRecord>>(&raw)
1609 .unwrap_or_default();
1610 *self.external_actions.write().await = parsed;
1611 Ok(())
1612 }
1613
1614 pub async fn persist_external_actions(&self) -> anyhow::Result<()> {
1615 if let Some(parent) = self.external_actions_path.parent() {
1616 fs::create_dir_all(parent).await?;
1617 }
1618 let payload = {
1619 let guard = self.external_actions.read().await;
1620 serde_json::to_string_pretty(&*guard)?
1621 };
1622 fs::write(&self.external_actions_path, payload).await?;
1623 Ok(())
1624 }
1625
1626 pub async fn list_bug_monitor_incidents(&self, limit: usize) -> Vec<BugMonitorIncidentRecord> {
1627 let mut rows = self
1628 .bug_monitor_incidents
1629 .read()
1630 .await
1631 .values()
1632 .cloned()
1633 .collect::<Vec<_>>();
1634 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1635 rows.truncate(limit.clamp(1, 200));
1636 rows
1637 }
1638
1639 pub async fn get_bug_monitor_incident(
1640 &self,
1641 incident_id: &str,
1642 ) -> Option<BugMonitorIncidentRecord> {
1643 self.bug_monitor_incidents
1644 .read()
1645 .await
1646 .get(incident_id)
1647 .cloned()
1648 }
1649
1650 pub async fn put_bug_monitor_incident(
1651 &self,
1652 incident: BugMonitorIncidentRecord,
1653 ) -> anyhow::Result<BugMonitorIncidentRecord> {
1654 self.bug_monitor_incidents
1655 .write()
1656 .await
1657 .insert(incident.incident_id.clone(), incident.clone());
1658 self.persist_bug_monitor_incidents().await?;
1659 Ok(incident)
1660 }
1661
1662 pub async fn list_bug_monitor_posts(&self, limit: usize) -> Vec<BugMonitorPostRecord> {
1663 let mut rows = self
1664 .bug_monitor_posts
1665 .read()
1666 .await
1667 .values()
1668 .cloned()
1669 .collect::<Vec<_>>();
1670 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1671 rows.truncate(limit.clamp(1, 200));
1672 rows
1673 }
1674
1675 pub async fn get_bug_monitor_post(&self, post_id: &str) -> Option<BugMonitorPostRecord> {
1676 self.bug_monitor_posts.read().await.get(post_id).cloned()
1677 }
1678
1679 pub async fn put_bug_monitor_post(
1680 &self,
1681 post: BugMonitorPostRecord,
1682 ) -> anyhow::Result<BugMonitorPostRecord> {
1683 self.bug_monitor_posts
1684 .write()
1685 .await
1686 .insert(post.post_id.clone(), post.clone());
1687 self.persist_bug_monitor_posts().await?;
1688 Ok(post)
1689 }
1690
1691 pub async fn list_external_actions(&self, limit: usize) -> Vec<ExternalActionRecord> {
1692 let mut rows = self
1693 .external_actions
1694 .read()
1695 .await
1696 .values()
1697 .cloned()
1698 .collect::<Vec<_>>();
1699 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
1700 rows.truncate(limit.clamp(1, 200));
1701 rows
1702 }
1703
1704 pub async fn get_external_action(&self, action_id: &str) -> Option<ExternalActionRecord> {
1705 self.external_actions.read().await.get(action_id).cloned()
1706 }
1707
1708 pub async fn get_external_action_by_idempotency_key(
1709 &self,
1710 idempotency_key: &str,
1711 ) -> Option<ExternalActionRecord> {
1712 let normalized = idempotency_key.trim();
1713 if normalized.is_empty() {
1714 return None;
1715 }
1716 self.external_actions
1717 .read()
1718 .await
1719 .values()
1720 .find(|action| {
1721 action
1722 .idempotency_key
1723 .as_deref()
1724 .map(str::trim)
1725 .filter(|value| !value.is_empty())
1726 == Some(normalized)
1727 })
1728 .cloned()
1729 }
1730
1731 pub async fn put_external_action(
1732 &self,
1733 action: ExternalActionRecord,
1734 ) -> anyhow::Result<ExternalActionRecord> {
1735 self.external_actions
1736 .write()
1737 .await
1738 .insert(action.action_id.clone(), action.clone());
1739 self.persist_external_actions().await?;
1740 Ok(action)
1741 }
1742
1743 pub async fn record_external_action(
1744 &self,
1745 action: ExternalActionRecord,
1746 ) -> anyhow::Result<ExternalActionRecord> {
1747 let action = {
1748 let mut guard = self.external_actions.write().await;
1749 if let Some(idempotency_key) = action
1750 .idempotency_key
1751 .as_deref()
1752 .map(str::trim)
1753 .filter(|value| !value.is_empty())
1754 {
1755 if let Some(existing) = guard
1756 .values()
1757 .find(|existing| {
1758 existing
1759 .idempotency_key
1760 .as_deref()
1761 .map(str::trim)
1762 .filter(|value| !value.is_empty())
1763 == Some(idempotency_key)
1764 })
1765 .cloned()
1766 {
1767 return Ok(existing);
1768 }
1769 }
1770 guard.insert(action.action_id.clone(), action.clone());
1771 action
1772 };
1773 self.persist_external_actions().await?;
1774 if let Some(run_id) = action.routine_run_id.as_deref() {
1775 let artifact = RoutineRunArtifact {
1776 artifact_id: format!("external-action-{}", action.action_id),
1777 uri: format!("external-action://{}", action.action_id),
1778 kind: "external_action_receipt".to_string(),
1779 label: Some(format!("external action receipt: {}", action.operation)),
1780 created_at_ms: action.updated_at_ms,
1781 metadata: Some(json!({
1782 "actionID": action.action_id,
1783 "operation": action.operation,
1784 "status": action.status,
1785 "sourceKind": action.source_kind,
1786 "sourceID": action.source_id,
1787 "capabilityID": action.capability_id,
1788 "target": action.target,
1789 })),
1790 };
1791 let _ = self
1792 .append_routine_run_artifact(run_id, artifact.clone())
1793 .await;
1794 if let Some(runtime) = self.runtime.get() {
1795 runtime.event_bus.publish(EngineEvent::new(
1796 "routine.run.artifact_added",
1797 json!({
1798 "runID": run_id,
1799 "artifact": artifact,
1800 }),
1801 ));
1802 }
1803 }
1804 if let Some(context_run_id) = action.context_run_id.as_deref() {
1805 let payload = serde_json::to_value(&action)?;
1806 if let Err(error) = crate::http::context_runs::append_json_artifact_to_context_run(
1807 self,
1808 context_run_id,
1809 &format!("external-action-{}", action.action_id),
1810 "external_action_receipt",
1811 &format!("external-actions/{}.json", action.action_id),
1812 &payload,
1813 )
1814 .await
1815 {
1816 tracing::warn!(
1817 "failed to append external action artifact {} to context run {}: {}",
1818 action.action_id,
1819 context_run_id,
1820 error
1821 );
1822 }
1823 }
1824 Ok(action)
1825 }
1826
1827 pub async fn update_bug_monitor_runtime_status(
1828 &self,
1829 update: impl FnOnce(&mut BugMonitorRuntimeStatus),
1830 ) -> BugMonitorRuntimeStatus {
1831 let mut guard = self.bug_monitor_runtime_status.write().await;
1832 update(&mut guard);
1833 guard.clone()
1834 }
1835
1836 pub async fn list_bug_monitor_drafts(&self, limit: usize) -> Vec<BugMonitorDraftRecord> {
1837 let mut rows = self
1838 .bug_monitor_drafts
1839 .read()
1840 .await
1841 .values()
1842 .cloned()
1843 .collect::<Vec<_>>();
1844 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
1845 rows.truncate(limit.clamp(1, 200));
1846 rows
1847 }
1848
1849 pub async fn get_bug_monitor_draft(&self, draft_id: &str) -> Option<BugMonitorDraftRecord> {
1850 self.bug_monitor_drafts.read().await.get(draft_id).cloned()
1851 }
1852
1853 pub async fn put_bug_monitor_draft(
1854 &self,
1855 draft: BugMonitorDraftRecord,
1856 ) -> anyhow::Result<BugMonitorDraftRecord> {
1857 self.bug_monitor_drafts
1858 .write()
1859 .await
1860 .insert(draft.draft_id.clone(), draft.clone());
1861 self.persist_bug_monitor_drafts().await?;
1862 Ok(draft)
1863 }
1864
1865 pub async fn submit_bug_monitor_draft(
1866 &self,
1867 mut submission: BugMonitorSubmission,
1868 ) -> anyhow::Result<BugMonitorDraftRecord> {
1869 fn normalize_optional(value: Option<String>) -> Option<String> {
1870 value
1871 .map(|v| v.trim().to_string())
1872 .filter(|v| !v.is_empty())
1873 }
1874
1875 fn compute_fingerprint(parts: &[&str]) -> String {
1876 use std::hash::{Hash, Hasher};
1877
1878 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1879 for part in parts {
1880 part.hash(&mut hasher);
1881 }
1882 format!("{:016x}", hasher.finish())
1883 }
1884
1885 submission.repo = normalize_optional(submission.repo);
1886 submission.title = normalize_optional(submission.title);
1887 submission.detail = normalize_optional(submission.detail);
1888 submission.source = normalize_optional(submission.source);
1889 submission.run_id = normalize_optional(submission.run_id);
1890 submission.session_id = normalize_optional(submission.session_id);
1891 submission.correlation_id = normalize_optional(submission.correlation_id);
1892 submission.file_name = normalize_optional(submission.file_name);
1893 submission.process = normalize_optional(submission.process);
1894 submission.component = normalize_optional(submission.component);
1895 submission.event = normalize_optional(submission.event);
1896 submission.level = normalize_optional(submission.level);
1897 submission.fingerprint = normalize_optional(submission.fingerprint);
1898 submission.excerpt = submission
1899 .excerpt
1900 .into_iter()
1901 .map(|line| line.trim_end().to_string())
1902 .filter(|line| !line.is_empty())
1903 .take(50)
1904 .collect();
1905
1906 let config = self.bug_monitor_config().await;
1907 let repo = submission
1908 .repo
1909 .clone()
1910 .or(config.repo.clone())
1911 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1912 if !is_valid_owner_repo_slug(&repo) {
1913 anyhow::bail!("Bug Monitor repo must be in owner/repo format");
1914 }
1915
1916 let title = submission.title.clone().unwrap_or_else(|| {
1917 if let Some(event) = submission.event.as_ref() {
1918 format!("Failure detected in {event}")
1919 } else if let Some(component) = submission.component.as_ref() {
1920 format!("Failure detected in {component}")
1921 } else if let Some(process) = submission.process.as_ref() {
1922 format!("Failure detected in {process}")
1923 } else if let Some(source) = submission.source.as_ref() {
1924 format!("Failure report from {source}")
1925 } else {
1926 "Failure report".to_string()
1927 }
1928 });
1929
1930 let mut detail_lines = Vec::new();
1931 if let Some(source) = submission.source.as_ref() {
1932 detail_lines.push(format!("source: {source}"));
1933 }
1934 if let Some(file_name) = submission.file_name.as_ref() {
1935 detail_lines.push(format!("file: {file_name}"));
1936 }
1937 if let Some(level) = submission.level.as_ref() {
1938 detail_lines.push(format!("level: {level}"));
1939 }
1940 if let Some(process) = submission.process.as_ref() {
1941 detail_lines.push(format!("process: {process}"));
1942 }
1943 if let Some(component) = submission.component.as_ref() {
1944 detail_lines.push(format!("component: {component}"));
1945 }
1946 if let Some(event) = submission.event.as_ref() {
1947 detail_lines.push(format!("event: {event}"));
1948 }
1949 if let Some(run_id) = submission.run_id.as_ref() {
1950 detail_lines.push(format!("run_id: {run_id}"));
1951 }
1952 if let Some(session_id) = submission.session_id.as_ref() {
1953 detail_lines.push(format!("session_id: {session_id}"));
1954 }
1955 if let Some(correlation_id) = submission.correlation_id.as_ref() {
1956 detail_lines.push(format!("correlation_id: {correlation_id}"));
1957 }
1958 if let Some(detail) = submission.detail.as_ref() {
1959 detail_lines.push(String::new());
1960 detail_lines.push(detail.clone());
1961 }
1962 if !submission.excerpt.is_empty() {
1963 if !detail_lines.is_empty() {
1964 detail_lines.push(String::new());
1965 }
1966 detail_lines.push("excerpt:".to_string());
1967 detail_lines.extend(submission.excerpt.iter().map(|line| format!(" {line}")));
1968 }
1969 let detail = if detail_lines.is_empty() {
1970 None
1971 } else {
1972 Some(detail_lines.join("\n"))
1973 };
1974
1975 let fingerprint = submission.fingerprint.clone().unwrap_or_else(|| {
1976 compute_fingerprint(&[
1977 repo.as_str(),
1978 title.as_str(),
1979 detail.as_deref().unwrap_or(""),
1980 submission.source.as_deref().unwrap_or(""),
1981 submission.run_id.as_deref().unwrap_or(""),
1982 submission.session_id.as_deref().unwrap_or(""),
1983 submission.correlation_id.as_deref().unwrap_or(""),
1984 ])
1985 });
1986
1987 let mut drafts = self.bug_monitor_drafts.write().await;
1988 if let Some(existing) = drafts
1989 .values()
1990 .find(|row| row.repo == repo && row.fingerprint == fingerprint)
1991 .cloned()
1992 {
1993 return Ok(existing);
1994 }
1995
1996 let draft = BugMonitorDraftRecord {
1997 draft_id: format!("failure-draft-{}", uuid::Uuid::new_v4().simple()),
1998 fingerprint,
1999 repo,
2000 status: if config.require_approval_for_new_issues {
2001 "approval_required".to_string()
2002 } else {
2003 "draft_ready".to_string()
2004 },
2005 created_at_ms: now_ms(),
2006 triage_run_id: None,
2007 issue_number: None,
2008 title: Some(title),
2009 detail,
2010 github_status: None,
2011 github_issue_url: None,
2012 github_comment_url: None,
2013 github_posted_at_ms: None,
2014 matched_issue_number: None,
2015 matched_issue_state: None,
2016 evidence_digest: None,
2017 last_post_error: None,
2018 };
2019 drafts.insert(draft.draft_id.clone(), draft.clone());
2020 drop(drafts);
2021 self.persist_bug_monitor_drafts().await?;
2022 Ok(draft)
2023 }
2024
2025 pub async fn update_bug_monitor_draft_status(
2026 &self,
2027 draft_id: &str,
2028 next_status: &str,
2029 reason: Option<&str>,
2030 ) -> anyhow::Result<BugMonitorDraftRecord> {
2031 let normalized_status = next_status.trim().to_ascii_lowercase();
2032 if normalized_status != "draft_ready" && normalized_status != "denied" {
2033 anyhow::bail!("unsupported Bug Monitor draft status");
2034 }
2035
2036 let mut drafts = self.bug_monitor_drafts.write().await;
2037 let Some(draft) = drafts.get_mut(draft_id) else {
2038 anyhow::bail!("Bug Monitor draft not found");
2039 };
2040 if !draft.status.eq_ignore_ascii_case("approval_required") {
2041 anyhow::bail!("Bug Monitor draft is not waiting for approval");
2042 }
2043 draft.status = normalized_status.clone();
2044 if let Some(reason) = reason
2045 .map(|value| value.trim())
2046 .filter(|value| !value.is_empty())
2047 {
2048 let next_detail = if let Some(detail) = draft.detail.as_ref() {
2049 format!("{detail}\n\noperator_note: {reason}")
2050 } else {
2051 format!("operator_note: {reason}")
2052 };
2053 draft.detail = Some(next_detail);
2054 }
2055 let updated = draft.clone();
2056 drop(drafts);
2057 self.persist_bug_monitor_drafts().await?;
2058
2059 let event_name = if normalized_status == "draft_ready" {
2060 "bug_monitor.draft.approved"
2061 } else {
2062 "bug_monitor.draft.denied"
2063 };
2064 self.event_bus.publish(EngineEvent::new(
2065 event_name,
2066 serde_json::json!({
2067 "draft_id": updated.draft_id,
2068 "repo": updated.repo,
2069 "status": updated.status,
2070 "reason": reason,
2071 }),
2072 ));
2073 Ok(updated)
2074 }
2075
2076 pub async fn bug_monitor_status(&self) -> BugMonitorStatus {
2077 let required_capabilities = vec![
2078 "github.list_issues".to_string(),
2079 "github.get_issue".to_string(),
2080 "github.create_issue".to_string(),
2081 "github.comment_on_issue".to_string(),
2082 ];
2083 let config = self.bug_monitor_config().await;
2084 let drafts = self.bug_monitor_drafts.read().await;
2085 let incidents = self.bug_monitor_incidents.read().await;
2086 let posts = self.bug_monitor_posts.read().await;
2087 let total_incidents = incidents.len();
2088 let pending_incidents = incidents
2089 .values()
2090 .filter(|row| {
2091 matches!(
2092 row.status.as_str(),
2093 "queued"
2094 | "draft_created"
2095 | "triage_queued"
2096 | "analysis_queued"
2097 | "triage_pending"
2098 | "issue_draft_pending"
2099 )
2100 })
2101 .count();
2102 let pending_drafts = drafts
2103 .values()
2104 .filter(|row| row.status.eq_ignore_ascii_case("approval_required"))
2105 .count();
2106 let pending_posts = posts
2107 .values()
2108 .filter(|row| matches!(row.status.as_str(), "queued" | "failed"))
2109 .count();
2110 let last_activity_at_ms = drafts
2111 .values()
2112 .map(|row| row.created_at_ms)
2113 .chain(posts.values().map(|row| row.updated_at_ms))
2114 .max();
2115 drop(drafts);
2116 drop(incidents);
2117 drop(posts);
2118 let mut runtime = self.bug_monitor_runtime_status.read().await.clone();
2119 runtime.paused = config.paused;
2120 runtime.total_incidents = total_incidents;
2121 runtime.pending_incidents = pending_incidents;
2122 runtime.pending_posts = pending_posts;
2123
2124 let mut status = BugMonitorStatus {
2125 config: config.clone(),
2126 runtime,
2127 pending_drafts,
2128 pending_posts,
2129 last_activity_at_ms,
2130 ..BugMonitorStatus::default()
2131 };
2132 let repo_valid = config
2133 .repo
2134 .as_ref()
2135 .map(|repo| is_valid_owner_repo_slug(repo))
2136 .unwrap_or(false);
2137 let servers = self.mcp.list().await;
2138 let selected_server = config
2139 .mcp_server
2140 .as_ref()
2141 .and_then(|name| servers.get(name))
2142 .cloned();
2143 let provider_catalog = self.providers.list().await;
2144 let selected_model = config
2145 .model_policy
2146 .as_ref()
2147 .and_then(|policy| policy.get("default_model"))
2148 .and_then(crate::app::routines::parse_model_spec);
2149 let selected_model_ready = selected_model
2150 .as_ref()
2151 .map(|spec| crate::app::routines::provider_catalog_has_model(&provider_catalog, spec))
2152 .unwrap_or(false);
2153 let selected_server_tools = if let Some(server_name) = config.mcp_server.as_ref() {
2154 self.mcp.server_tools(server_name).await
2155 } else {
2156 Vec::new()
2157 };
2158 let discovered_tools = self
2159 .capability_resolver
2160 .discover_from_runtime(selected_server_tools, Vec::new())
2161 .await;
2162 status.discovered_mcp_tools = discovered_tools
2163 .iter()
2164 .map(|row| row.tool_name.clone())
2165 .collect();
2166 let discovered_providers = discovered_tools
2167 .iter()
2168 .map(|row| row.provider.to_ascii_lowercase())
2169 .collect::<std::collections::HashSet<_>>();
2170 let provider_preference = match config.provider_preference {
2171 BugMonitorProviderPreference::OfficialGithub => {
2172 vec![
2173 "mcp".to_string(),
2174 "composio".to_string(),
2175 "arcade".to_string(),
2176 ]
2177 }
2178 BugMonitorProviderPreference::Composio => {
2179 vec![
2180 "composio".to_string(),
2181 "mcp".to_string(),
2182 "arcade".to_string(),
2183 ]
2184 }
2185 BugMonitorProviderPreference::Arcade => {
2186 vec![
2187 "arcade".to_string(),
2188 "mcp".to_string(),
2189 "composio".to_string(),
2190 ]
2191 }
2192 BugMonitorProviderPreference::Auto => {
2193 vec![
2194 "mcp".to_string(),
2195 "composio".to_string(),
2196 "arcade".to_string(),
2197 ]
2198 }
2199 };
2200 let capability_resolution = self
2201 .capability_resolver
2202 .resolve(
2203 crate::capability_resolver::CapabilityResolveInput {
2204 workflow_id: Some("bug_monitor".to_string()),
2205 required_capabilities: required_capabilities.clone(),
2206 optional_capabilities: Vec::new(),
2207 provider_preference,
2208 available_tools: discovered_tools,
2209 },
2210 Vec::new(),
2211 )
2212 .await
2213 .ok();
2214 let bindings_file = self.capability_resolver.list_bindings().await.ok();
2215 if let Some(bindings) = bindings_file.as_ref() {
2216 status.binding_source_version = bindings.builtin_version.clone();
2217 status.bindings_last_merged_at_ms = bindings.last_merged_at_ms;
2218 status.selected_server_binding_candidates = bindings
2219 .bindings
2220 .iter()
2221 .filter(|binding| required_capabilities.contains(&binding.capability_id))
2222 .filter(|binding| {
2223 discovered_providers.is_empty()
2224 || discovered_providers.contains(&binding.provider.to_ascii_lowercase())
2225 })
2226 .map(|binding| {
2227 let binding_key = format!(
2228 "{}::{}",
2229 binding.capability_id,
2230 binding.tool_name.to_ascii_lowercase()
2231 );
2232 let matched = capability_resolution
2233 .as_ref()
2234 .map(|resolution| {
2235 resolution.resolved.iter().any(|row| {
2236 row.capability_id == binding.capability_id
2237 && format!(
2238 "{}::{}",
2239 row.capability_id,
2240 row.tool_name.to_ascii_lowercase()
2241 ) == binding_key
2242 })
2243 })
2244 .unwrap_or(false);
2245 BugMonitorBindingCandidate {
2246 capability_id: binding.capability_id.clone(),
2247 binding_tool_name: binding.tool_name.clone(),
2248 aliases: binding.tool_name_aliases.clone(),
2249 matched,
2250 }
2251 })
2252 .collect();
2253 status.selected_server_binding_candidates.sort_by(|a, b| {
2254 a.capability_id
2255 .cmp(&b.capability_id)
2256 .then_with(|| a.binding_tool_name.cmp(&b.binding_tool_name))
2257 });
2258 }
2259 let capability_ready = |capability_id: &str| -> bool {
2260 capability_resolution
2261 .as_ref()
2262 .map(|resolved| {
2263 resolved
2264 .resolved
2265 .iter()
2266 .any(|row| row.capability_id == capability_id)
2267 })
2268 .unwrap_or(false)
2269 };
2270 if let Some(resolution) = capability_resolution.as_ref() {
2271 status.missing_required_capabilities = resolution.missing_required.clone();
2272 status.resolved_capabilities = resolution
2273 .resolved
2274 .iter()
2275 .map(|row| BugMonitorCapabilityMatch {
2276 capability_id: row.capability_id.clone(),
2277 provider: row.provider.clone(),
2278 tool_name: row.tool_name.clone(),
2279 binding_index: row.binding_index,
2280 })
2281 .collect();
2282 } else {
2283 status.missing_required_capabilities = required_capabilities.clone();
2284 }
2285 status.required_capabilities = BugMonitorCapabilityReadiness {
2286 github_list_issues: capability_ready("github.list_issues"),
2287 github_get_issue: capability_ready("github.get_issue"),
2288 github_create_issue: capability_ready("github.create_issue"),
2289 github_comment_on_issue: capability_ready("github.comment_on_issue"),
2290 };
2291 status.selected_model = selected_model;
2292 status.readiness = BugMonitorReadiness {
2293 config_valid: repo_valid
2294 && selected_server.is_some()
2295 && status.required_capabilities.github_list_issues
2296 && status.required_capabilities.github_get_issue
2297 && status.required_capabilities.github_create_issue
2298 && status.required_capabilities.github_comment_on_issue
2299 && selected_model_ready,
2300 repo_valid,
2301 mcp_server_present: selected_server.is_some(),
2302 mcp_connected: selected_server
2303 .as_ref()
2304 .map(|row| row.connected)
2305 .unwrap_or(false),
2306 github_read_ready: status.required_capabilities.github_list_issues
2307 && status.required_capabilities.github_get_issue,
2308 github_write_ready: status.required_capabilities.github_create_issue
2309 && status.required_capabilities.github_comment_on_issue,
2310 selected_model_ready,
2311 ingest_ready: config.enabled && !config.paused && repo_valid,
2312 publish_ready: config.enabled
2313 && !config.paused
2314 && repo_valid
2315 && selected_server
2316 .as_ref()
2317 .map(|row| row.connected)
2318 .unwrap_or(false)
2319 && status.required_capabilities.github_list_issues
2320 && status.required_capabilities.github_get_issue
2321 && status.required_capabilities.github_create_issue
2322 && status.required_capabilities.github_comment_on_issue
2323 && selected_model_ready,
2324 runtime_ready: config.enabled
2325 && !config.paused
2326 && repo_valid
2327 && selected_server
2328 .as_ref()
2329 .map(|row| row.connected)
2330 .unwrap_or(false)
2331 && status.required_capabilities.github_list_issues
2332 && status.required_capabilities.github_get_issue
2333 && status.required_capabilities.github_create_issue
2334 && status.required_capabilities.github_comment_on_issue
2335 && selected_model_ready,
2336 };
2337 if config.enabled {
2338 if config.paused {
2339 status.last_error = Some("Bug monitor monitoring is paused.".to_string());
2340 } else if !repo_valid {
2341 status.last_error = Some("Target repo is missing or invalid.".to_string());
2342 } else if selected_server.is_none() {
2343 status.last_error = Some("Selected MCP server is missing.".to_string());
2344 } else if !status.readiness.mcp_connected {
2345 status.last_error = Some("Selected MCP server is disconnected.".to_string());
2346 } else if !selected_model_ready {
2347 status.last_error = Some(
2348 "Selected provider/model is unavailable. Bug monitor is fail-closed."
2349 .to_string(),
2350 );
2351 } else if !status.readiness.github_read_ready || !status.readiness.github_write_ready {
2352 let missing = if status.missing_required_capabilities.is_empty() {
2353 "unknown".to_string()
2354 } else {
2355 status.missing_required_capabilities.join(", ")
2356 };
2357 status.last_error = Some(format!(
2358 "Selected MCP server is missing required GitHub capabilities: {missing}"
2359 ));
2360 }
2361 }
2362 status.runtime.monitoring_active = status.readiness.ingest_ready;
2363 status
2364 }
2365
2366 pub async fn load_workflow_runs(&self) -> anyhow::Result<()> {
2367 if !self.workflow_runs_path.exists() {
2368 return Ok(());
2369 }
2370 let raw = fs::read_to_string(&self.workflow_runs_path).await?;
2371 let parsed =
2372 serde_json::from_str::<std::collections::HashMap<String, WorkflowRunRecord>>(&raw)
2373 .unwrap_or_default();
2374 *self.workflow_runs.write().await = parsed;
2375 Ok(())
2376 }
2377
2378 pub async fn persist_workflow_runs(&self) -> anyhow::Result<()> {
2379 if let Some(parent) = self.workflow_runs_path.parent() {
2380 fs::create_dir_all(parent).await?;
2381 }
2382 let payload = {
2383 let guard = self.workflow_runs.read().await;
2384 serde_json::to_string_pretty(&*guard)?
2385 };
2386 fs::write(&self.workflow_runs_path, payload).await?;
2387 Ok(())
2388 }
2389
2390 pub async fn load_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2391 if !self.workflow_hook_overrides_path.exists() {
2392 return Ok(());
2393 }
2394 let raw = fs::read_to_string(&self.workflow_hook_overrides_path).await?;
2395 let parsed = serde_json::from_str::<std::collections::HashMap<String, bool>>(&raw)
2396 .unwrap_or_default();
2397 *self.workflow_hook_overrides.write().await = parsed;
2398 Ok(())
2399 }
2400
2401 pub async fn persist_workflow_hook_overrides(&self) -> anyhow::Result<()> {
2402 if let Some(parent) = self.workflow_hook_overrides_path.parent() {
2403 fs::create_dir_all(parent).await?;
2404 }
2405 let payload = {
2406 let guard = self.workflow_hook_overrides.read().await;
2407 serde_json::to_string_pretty(&*guard)?
2408 };
2409 fs::write(&self.workflow_hook_overrides_path, payload).await?;
2410 Ok(())
2411 }
2412
2413 pub async fn reload_workflows(&self) -> anyhow::Result<Vec<WorkflowValidationMessage>> {
2414 let mut sources = Vec::new();
2415 sources.push(WorkflowLoadSource {
2416 root: config::paths::resolve_builtin_workflows_dir(),
2417 kind: WorkflowSourceKind::BuiltIn,
2418 pack_id: None,
2419 });
2420
2421 let workspace_root = self.workspace_index.snapshot().await.root;
2422 sources.push(WorkflowLoadSource {
2423 root: PathBuf::from(workspace_root).join(".tandem"),
2424 kind: WorkflowSourceKind::Workspace,
2425 pack_id: None,
2426 });
2427
2428 if let Ok(packs) = self.pack_manager.list().await {
2429 for pack in packs {
2430 sources.push(WorkflowLoadSource {
2431 root: PathBuf::from(pack.install_path),
2432 kind: WorkflowSourceKind::Pack,
2433 pack_id: Some(pack.pack_id),
2434 });
2435 }
2436 }
2437
2438 let mut registry = load_workflow_registry(&sources)?;
2439 let overrides = self.workflow_hook_overrides.read().await.clone();
2440 for hook in &mut registry.hooks {
2441 if let Some(enabled) = overrides.get(&hook.binding_id) {
2442 hook.enabled = *enabled;
2443 }
2444 }
2445 for workflow in registry.workflows.values_mut() {
2446 workflow.hooks = registry
2447 .hooks
2448 .iter()
2449 .filter(|hook| hook.workflow_id == workflow.workflow_id)
2450 .cloned()
2451 .collect();
2452 }
2453 let messages = validate_workflow_registry(®istry);
2454 *self.workflows.write().await = registry;
2455 Ok(messages)
2456 }
2457
2458 pub async fn workflow_registry(&self) -> WorkflowRegistry {
2459 self.workflows.read().await.clone()
2460 }
2461
2462 pub async fn list_workflows(&self) -> Vec<WorkflowSpec> {
2463 let mut rows = self
2464 .workflows
2465 .read()
2466 .await
2467 .workflows
2468 .values()
2469 .cloned()
2470 .collect::<Vec<_>>();
2471 rows.sort_by(|a, b| a.workflow_id.cmp(&b.workflow_id));
2472 rows
2473 }
2474
2475 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowSpec> {
2476 self.workflows
2477 .read()
2478 .await
2479 .workflows
2480 .get(workflow_id)
2481 .cloned()
2482 }
2483
2484 pub async fn list_workflow_hooks(&self, workflow_id: Option<&str>) -> Vec<WorkflowHookBinding> {
2485 let mut rows = self
2486 .workflows
2487 .read()
2488 .await
2489 .hooks
2490 .iter()
2491 .filter(|hook| workflow_id.map(|id| hook.workflow_id == id).unwrap_or(true))
2492 .cloned()
2493 .collect::<Vec<_>>();
2494 rows.sort_by(|a, b| a.binding_id.cmp(&b.binding_id));
2495 rows
2496 }
2497
2498 pub async fn set_workflow_hook_enabled(
2499 &self,
2500 binding_id: &str,
2501 enabled: bool,
2502 ) -> anyhow::Result<Option<WorkflowHookBinding>> {
2503 self.workflow_hook_overrides
2504 .write()
2505 .await
2506 .insert(binding_id.to_string(), enabled);
2507 self.persist_workflow_hook_overrides().await?;
2508 let _ = self.reload_workflows().await?;
2509 Ok(self
2510 .workflows
2511 .read()
2512 .await
2513 .hooks
2514 .iter()
2515 .find(|hook| hook.binding_id == binding_id)
2516 .cloned())
2517 }
2518
2519 pub async fn put_workflow_run(&self, run: WorkflowRunRecord) -> anyhow::Result<()> {
2520 self.workflow_runs
2521 .write()
2522 .await
2523 .insert(run.run_id.clone(), run);
2524 self.persist_workflow_runs().await
2525 }
2526
2527 pub async fn update_workflow_run(
2528 &self,
2529 run_id: &str,
2530 update: impl FnOnce(&mut WorkflowRunRecord),
2531 ) -> Option<WorkflowRunRecord> {
2532 let mut guard = self.workflow_runs.write().await;
2533 let row = guard.get_mut(run_id)?;
2534 update(row);
2535 row.updated_at_ms = now_ms();
2536 if matches!(
2537 row.status,
2538 WorkflowRunStatus::Completed | WorkflowRunStatus::Failed
2539 ) {
2540 row.finished_at_ms.get_or_insert_with(now_ms);
2541 }
2542 let out = row.clone();
2543 drop(guard);
2544 let _ = self.persist_workflow_runs().await;
2545 Some(out)
2546 }
2547
2548 pub async fn list_workflow_runs(
2549 &self,
2550 workflow_id: Option<&str>,
2551 limit: usize,
2552 ) -> Vec<WorkflowRunRecord> {
2553 let mut rows = self
2554 .workflow_runs
2555 .read()
2556 .await
2557 .values()
2558 .filter(|row| workflow_id.map(|id| row.workflow_id == id).unwrap_or(true))
2559 .cloned()
2560 .collect::<Vec<_>>();
2561 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
2562 rows.truncate(limit.clamp(1, 500));
2563 rows
2564 }
2565
2566 pub async fn get_workflow_run(&self, run_id: &str) -> Option<WorkflowRunRecord> {
2567 self.workflow_runs.read().await.get(run_id).cloned()
2568 }
2569
2570 pub async fn put_automation_v2(
2571 &self,
2572 mut automation: AutomationV2Spec,
2573 ) -> anyhow::Result<AutomationV2Spec> {
2574 if automation.automation_id.trim().is_empty() {
2575 anyhow::bail!("automation_id is required");
2576 }
2577 for agent in &mut automation.agents {
2578 if agent.display_name.trim().is_empty() {
2579 agent.display_name = auto_generated_agent_name(&agent.agent_id);
2580 }
2581 agent.tool_policy.allowlist =
2582 config::channels::normalize_allowed_tools(agent.tool_policy.allowlist.clone());
2583 agent.tool_policy.denylist =
2584 config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
2585 agent.mcp_policy.allowed_servers =
2586 normalize_non_empty_list(agent.mcp_policy.allowed_servers.clone());
2587 agent.mcp_policy.allowed_tools = agent
2588 .mcp_policy
2589 .allowed_tools
2590 .take()
2591 .map(normalize_allowed_tools);
2592 }
2593 let now = now_ms();
2594 if automation.created_at_ms == 0 {
2595 automation.created_at_ms = now;
2596 }
2597 automation.updated_at_ms = now;
2598 if automation.next_fire_at_ms.is_none() {
2599 automation.next_fire_at_ms =
2600 automation_schedule_next_fire_at_ms(&automation.schedule, now);
2601 }
2602 migrate_bundled_studio_research_split_automation(&mut automation);
2603 self.automations_v2
2604 .write()
2605 .await
2606 .insert(automation.automation_id.clone(), automation.clone());
2607 self.persist_automations_v2().await?;
2608 self.verify_automation_v2_persisted(&automation.automation_id, true)
2609 .await?;
2610 Ok(automation)
2611 }
2612
2613 pub async fn get_automation_v2(&self, automation_id: &str) -> Option<AutomationV2Spec> {
2614 self.automations_v2.read().await.get(automation_id).cloned()
2615 }
2616
2617 pub fn automation_v2_runtime_context(
2618 &self,
2619 run: &AutomationV2RunRecord,
2620 ) -> Option<AutomationRuntimeContextMaterialization> {
2621 run.runtime_context.clone().or_else(|| {
2622 run.automation_snapshot.as_ref().and_then(|automation| {
2623 automation
2624 .runtime_context_materialization()
2625 .or_else(|| automation.approved_plan_runtime_context_materialization())
2626 })
2627 })
2628 }
2629
2630 pub(crate) fn automation_v2_approved_plan_materialization(
2631 &self,
2632 run: &AutomationV2RunRecord,
2633 ) -> Option<tandem_plan_compiler::api::ApprovedPlanMaterialization> {
2634 run.automation_snapshot
2635 .as_ref()
2636 .and_then(AutomationV2Spec::approved_plan_materialization)
2637 }
2638
2639 pub async fn put_workflow_plan(&self, plan: WorkflowPlan) {
2640 self.workflow_plans
2641 .write()
2642 .await
2643 .insert(plan.plan_id.clone(), plan);
2644 }
2645
2646 pub async fn get_workflow_plan(&self, plan_id: &str) -> Option<WorkflowPlan> {
2647 self.workflow_plans.read().await.get(plan_id).cloned()
2648 }
2649
2650 pub async fn put_workflow_plan_draft(&self, draft: WorkflowPlanDraftRecord) {
2651 self.workflow_plan_drafts
2652 .write()
2653 .await
2654 .insert(draft.current_plan.plan_id.clone(), draft.clone());
2655 self.put_workflow_plan(draft.current_plan).await;
2656 }
2657
2658 pub async fn get_workflow_plan_draft(&self, plan_id: &str) -> Option<WorkflowPlanDraftRecord> {
2659 self.workflow_plan_drafts.read().await.get(plan_id).cloned()
2660 }
2661
2662 pub async fn put_optimization_campaign(
2663 &self,
2664 mut campaign: OptimizationCampaignRecord,
2665 ) -> anyhow::Result<OptimizationCampaignRecord> {
2666 if campaign.optimization_id.trim().is_empty() {
2667 anyhow::bail!("optimization_id is required");
2668 }
2669 if campaign.source_workflow_id.trim().is_empty() {
2670 anyhow::bail!("source_workflow_id is required");
2671 }
2672 if campaign.name.trim().is_empty() {
2673 anyhow::bail!("name is required");
2674 }
2675 let now = now_ms();
2676 if campaign.created_at_ms == 0 {
2677 campaign.created_at_ms = now;
2678 }
2679 campaign.updated_at_ms = now;
2680 campaign.source_workflow_snapshot_hash =
2681 optimization_snapshot_hash(&campaign.source_workflow_snapshot);
2682 campaign.baseline_snapshot_hash = optimization_snapshot_hash(&campaign.baseline_snapshot);
2683 self.optimization_campaigns
2684 .write()
2685 .await
2686 .insert(campaign.optimization_id.clone(), campaign.clone());
2687 self.persist_optimization_campaigns().await?;
2688 Ok(campaign)
2689 }
2690
2691 pub async fn get_optimization_campaign(
2692 &self,
2693 optimization_id: &str,
2694 ) -> Option<OptimizationCampaignRecord> {
2695 self.optimization_campaigns
2696 .read()
2697 .await
2698 .get(optimization_id)
2699 .cloned()
2700 }
2701
2702 pub async fn list_optimization_campaigns(&self) -> Vec<OptimizationCampaignRecord> {
2703 let mut rows = self
2704 .optimization_campaigns
2705 .read()
2706 .await
2707 .values()
2708 .cloned()
2709 .collect::<Vec<_>>();
2710 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2711 rows
2712 }
2713
2714 pub async fn put_optimization_experiment(
2715 &self,
2716 mut experiment: OptimizationExperimentRecord,
2717 ) -> anyhow::Result<OptimizationExperimentRecord> {
2718 if experiment.experiment_id.trim().is_empty() {
2719 anyhow::bail!("experiment_id is required");
2720 }
2721 if experiment.optimization_id.trim().is_empty() {
2722 anyhow::bail!("optimization_id is required");
2723 }
2724 let now = now_ms();
2725 if experiment.created_at_ms == 0 {
2726 experiment.created_at_ms = now;
2727 }
2728 experiment.updated_at_ms = now;
2729 experiment.candidate_snapshot_hash =
2730 optimization_snapshot_hash(&experiment.candidate_snapshot);
2731 self.optimization_experiments
2732 .write()
2733 .await
2734 .insert(experiment.experiment_id.clone(), experiment.clone());
2735 self.persist_optimization_experiments().await?;
2736 Ok(experiment)
2737 }
2738
2739 pub async fn get_optimization_experiment(
2740 &self,
2741 optimization_id: &str,
2742 experiment_id: &str,
2743 ) -> Option<OptimizationExperimentRecord> {
2744 self.optimization_experiments
2745 .read()
2746 .await
2747 .get(experiment_id)
2748 .filter(|row| row.optimization_id == optimization_id)
2749 .cloned()
2750 }
2751
2752 pub async fn list_optimization_experiments(
2753 &self,
2754 optimization_id: &str,
2755 ) -> Vec<OptimizationExperimentRecord> {
2756 let mut rows = self
2757 .optimization_experiments
2758 .read()
2759 .await
2760 .values()
2761 .filter(|row| row.optimization_id == optimization_id)
2762 .cloned()
2763 .collect::<Vec<_>>();
2764 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
2765 rows
2766 }
2767
2768 pub async fn count_optimization_experiments(&self, optimization_id: &str) -> usize {
2769 self.optimization_experiments
2770 .read()
2771 .await
2772 .values()
2773 .filter(|row| row.optimization_id == optimization_id)
2774 .count()
2775 }
2776
2777 fn automation_run_is_terminal(status: &crate::AutomationRunStatus) -> bool {
2778 matches!(
2779 status,
2780 crate::AutomationRunStatus::Completed
2781 | crate::AutomationRunStatus::Blocked
2782 | crate::AutomationRunStatus::Failed
2783 | crate::AutomationRunStatus::Cancelled
2784 )
2785 }
2786
2787 fn optimization_consecutive_failure_count(
2788 experiments: &[OptimizationExperimentRecord],
2789 ) -> usize {
2790 let mut ordered = experiments.to_vec();
2791 ordered.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
2792 ordered
2793 .iter()
2794 .rev()
2795 .take_while(|experiment| experiment.status == OptimizationExperimentStatus::Failed)
2796 .count()
2797 }
2798
2799 fn optimization_mutation_field_path(field: OptimizationMutableField) -> &'static str {
2800 match field {
2801 OptimizationMutableField::Objective => "objective",
2802 OptimizationMutableField::OutputContractSummaryGuidance => {
2803 "output_contract.summary_guidance"
2804 }
2805 OptimizationMutableField::TimeoutMs => "timeout_ms",
2806 OptimizationMutableField::RetryPolicyMaxAttempts => "retry_policy.max_attempts",
2807 OptimizationMutableField::RetryPolicyRetries => "retry_policy.retries",
2808 }
2809 }
2810
2811 fn optimization_node_field_value(
2812 node: &crate::AutomationFlowNode,
2813 field: OptimizationMutableField,
2814 ) -> Result<Value, String> {
2815 match field {
2816 OptimizationMutableField::Objective => Ok(Value::String(node.objective.clone())),
2817 OptimizationMutableField::OutputContractSummaryGuidance => node
2818 .output_contract
2819 .as_ref()
2820 .and_then(|contract| contract.summary_guidance.clone())
2821 .map(Value::String)
2822 .ok_or_else(|| {
2823 format!(
2824 "node `{}` is missing output_contract.summary_guidance",
2825 node.node_id
2826 )
2827 }),
2828 OptimizationMutableField::TimeoutMs => node
2829 .timeout_ms
2830 .map(|value| json!(value))
2831 .ok_or_else(|| format!("node `{}` is missing timeout_ms", node.node_id)),
2832 OptimizationMutableField::RetryPolicyMaxAttempts => node
2833 .retry_policy
2834 .as_ref()
2835 .and_then(Value::as_object)
2836 .and_then(|policy| policy.get("max_attempts"))
2837 .cloned()
2838 .ok_or_else(|| {
2839 format!(
2840 "node `{}` is missing retry_policy.max_attempts",
2841 node.node_id
2842 )
2843 }),
2844 OptimizationMutableField::RetryPolicyRetries => node
2845 .retry_policy
2846 .as_ref()
2847 .and_then(Value::as_object)
2848 .and_then(|policy| policy.get("retries"))
2849 .cloned()
2850 .ok_or_else(|| format!("node `{}` is missing retry_policy.retries", node.node_id)),
2851 }
2852 }
2853
2854 fn set_optimization_node_field_value(
2855 node: &mut crate::AutomationFlowNode,
2856 field: OptimizationMutableField,
2857 value: &Value,
2858 ) -> Result<(), String> {
2859 match field {
2860 OptimizationMutableField::Objective => {
2861 node.objective = value
2862 .as_str()
2863 .ok_or_else(|| "objective apply value must be a string".to_string())?
2864 .to_string();
2865 }
2866 OptimizationMutableField::OutputContractSummaryGuidance => {
2867 let guidance = value
2868 .as_str()
2869 .ok_or_else(|| {
2870 "output_contract.summary_guidance apply value must be a string".to_string()
2871 })?
2872 .to_string();
2873 let contract = node.output_contract.as_mut().ok_or_else(|| {
2874 format!(
2875 "node `{}` is missing output_contract for apply",
2876 node.node_id
2877 )
2878 })?;
2879 contract.summary_guidance = Some(guidance);
2880 }
2881 OptimizationMutableField::TimeoutMs => {
2882 node.timeout_ms = Some(
2883 value
2884 .as_u64()
2885 .ok_or_else(|| "timeout_ms apply value must be an integer".to_string())?,
2886 );
2887 }
2888 OptimizationMutableField::RetryPolicyMaxAttempts => {
2889 let next = value.as_i64().ok_or_else(|| {
2890 "retry_policy.max_attempts apply value must be an integer".to_string()
2891 })?;
2892 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
2893 let object = policy.as_object_mut().ok_or_else(|| {
2894 format!("node `{}` retry_policy must be a JSON object", node.node_id)
2895 })?;
2896 object.insert("max_attempts".to_string(), json!(next));
2897 }
2898 OptimizationMutableField::RetryPolicyRetries => {
2899 let next = value.as_i64().ok_or_else(|| {
2900 "retry_policy.retries apply value must be an integer".to_string()
2901 })?;
2902 let policy = node.retry_policy.get_or_insert_with(|| json!({}));
2903 let object = policy.as_object_mut().ok_or_else(|| {
2904 format!("node `{}` retry_policy must be a JSON object", node.node_id)
2905 })?;
2906 object.insert("retries".to_string(), json!(next));
2907 }
2908 }
2909 Ok(())
2910 }
2911
2912 fn append_optimization_apply_metadata(
2913 metadata: Option<Value>,
2914 record: Value,
2915 ) -> Result<Option<Value>, String> {
2916 let mut root = match metadata {
2917 Some(Value::Object(map)) => map,
2918 Some(_) => return Err("automation metadata must be a JSON object".to_string()),
2919 None => serde_json::Map::new(),
2920 };
2921 let history = root
2922 .entry("optimization_apply_history".to_string())
2923 .or_insert_with(|| Value::Array(Vec::new()));
2924 let Some(entries) = history.as_array_mut() else {
2925 return Err("optimization_apply_history metadata must be an array".to_string());
2926 };
2927 entries.push(record.clone());
2928 root.insert("last_optimization_apply".to_string(), record);
2929 Ok(Some(Value::Object(root)))
2930 }
2931
2932 fn build_optimization_apply_patch(
2933 baseline: &crate::AutomationV2Spec,
2934 candidate: &crate::AutomationV2Spec,
2935 mutation: &crate::OptimizationValidatedMutation,
2936 approved_at_ms: u64,
2937 ) -> Result<Value, String> {
2938 let baseline_node = baseline
2939 .flow
2940 .nodes
2941 .iter()
2942 .find(|node| node.node_id == mutation.node_id)
2943 .ok_or_else(|| format!("baseline node `{}` not found", mutation.node_id))?;
2944 let candidate_node = candidate
2945 .flow
2946 .nodes
2947 .iter()
2948 .find(|node| node.node_id == mutation.node_id)
2949 .ok_or_else(|| format!("candidate node `{}` not found", mutation.node_id))?;
2950 let before = Self::optimization_node_field_value(baseline_node, mutation.field)?;
2951 let after = Self::optimization_node_field_value(candidate_node, mutation.field)?;
2952 Ok(json!({
2953 "node_id": mutation.node_id,
2954 "field": mutation.field,
2955 "field_path": Self::optimization_mutation_field_path(mutation.field),
2956 "expected_before": before,
2957 "apply_value": after,
2958 "approved_at_ms": approved_at_ms,
2959 }))
2960 }
2961
2962 pub async fn apply_optimization_winner(
2963 &self,
2964 optimization_id: &str,
2965 experiment_id: &str,
2966 ) -> Result<
2967 (
2968 OptimizationCampaignRecord,
2969 OptimizationExperimentRecord,
2970 crate::AutomationV2Spec,
2971 ),
2972 String,
2973 > {
2974 let campaign = self
2975 .get_optimization_campaign(optimization_id)
2976 .await
2977 .ok_or_else(|| "optimization not found".to_string())?;
2978 let mut experiment = self
2979 .get_optimization_experiment(optimization_id, experiment_id)
2980 .await
2981 .ok_or_else(|| "experiment not found".to_string())?;
2982 if experiment.status != OptimizationExperimentStatus::PromotionApproved {
2983 return Err("only approved winner experiments may be applied".to_string());
2984 }
2985 if campaign.baseline_snapshot_hash != experiment.candidate_snapshot_hash {
2986 return Err(
2987 "only the latest approved winner may be applied to the live workflow".to_string(),
2988 );
2989 }
2990 let patch = experiment
2991 .metadata
2992 .as_ref()
2993 .and_then(|metadata| metadata.get("apply_patch"))
2994 .cloned()
2995 .ok_or_else(|| "approved experiment is missing apply_patch metadata".to_string())?;
2996 let node_id = patch
2997 .get("node_id")
2998 .and_then(Value::as_str)
2999 .map(str::to_string)
3000 .filter(|value| !value.is_empty())
3001 .ok_or_else(|| "apply_patch.node_id is required".to_string())?;
3002 let field: OptimizationMutableField = serde_json::from_value(
3003 patch
3004 .get("field")
3005 .cloned()
3006 .ok_or_else(|| "apply_patch.field is required".to_string())?,
3007 )
3008 .map_err(|error| format!("invalid apply_patch.field: {error}"))?;
3009 let expected_before = patch
3010 .get("expected_before")
3011 .cloned()
3012 .ok_or_else(|| "apply_patch.expected_before is required".to_string())?;
3013 let apply_value = patch
3014 .get("apply_value")
3015 .cloned()
3016 .ok_or_else(|| "apply_patch.apply_value is required".to_string())?;
3017 let mut live = self
3018 .get_automation_v2(&campaign.source_workflow_id)
3019 .await
3020 .ok_or_else(|| "source workflow not found".to_string())?;
3021 let current_value = {
3022 let live_node = live
3023 .flow
3024 .nodes
3025 .iter()
3026 .find(|node| node.node_id == node_id)
3027 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3028 Self::optimization_node_field_value(live_node, field)?
3029 };
3030 if current_value != expected_before {
3031 return Err(format!(
3032 "live workflow drift detected for node `{node_id}` {}",
3033 Self::optimization_mutation_field_path(field)
3034 ));
3035 }
3036 let live_node = live
3037 .flow
3038 .nodes
3039 .iter_mut()
3040 .find(|node| node.node_id == node_id)
3041 .ok_or_else(|| format!("live workflow node `{node_id}` not found"))?;
3042 Self::set_optimization_node_field_value(live_node, field, &apply_value)?;
3043 let applied_at_ms = now_ms();
3044 let apply_record = json!({
3045 "optimization_id": campaign.optimization_id,
3046 "experiment_id": experiment.experiment_id,
3047 "node_id": node_id,
3048 "field": field,
3049 "field_path": Self::optimization_mutation_field_path(field),
3050 "previous_value": expected_before,
3051 "new_value": apply_value,
3052 "applied_at_ms": applied_at_ms,
3053 });
3054 live.metadata =
3055 Self::append_optimization_apply_metadata(live.metadata.clone(), apply_record)?;
3056 let stored_live = self
3057 .put_automation_v2(live)
3058 .await
3059 .map_err(|error| error.to_string())?;
3060 let mut metadata = match experiment.metadata.take() {
3061 Some(Value::Object(map)) => map,
3062 Some(_) => return Err("experiment metadata must be a JSON object".to_string()),
3063 None => serde_json::Map::new(),
3064 };
3065 metadata.insert(
3066 "applied_to_live".to_string(),
3067 json!({
3068 "automation_id": stored_live.automation_id,
3069 "applied_at_ms": applied_at_ms,
3070 "field": field,
3071 "node_id": node_id,
3072 }),
3073 );
3074 experiment.metadata = Some(Value::Object(metadata));
3075 let stored_experiment = self
3076 .put_optimization_experiment(experiment)
3077 .await
3078 .map_err(|error| error.to_string())?;
3079 Ok((campaign, stored_experiment, stored_live))
3080 }
3081
3082 fn optimization_objective_hint(text: &str) -> String {
3083 let cleaned = text
3084 .lines()
3085 .map(str::trim)
3086 .filter(|line| !line.is_empty() && !line.starts_with('#'))
3087 .collect::<Vec<_>>()
3088 .join(" ");
3089 let hint = if cleaned.is_empty() {
3090 "Prioritize validator-complete output with explicit evidence."
3091 } else {
3092 cleaned.as_str()
3093 };
3094 let trimmed = hint.trim();
3095 let clipped = if trimmed.len() > 140 {
3096 trimmed[..140].trim_end()
3097 } else {
3098 trimmed
3099 };
3100 let mut sentence = clipped.trim_end_matches('.').to_string();
3101 if sentence.is_empty() {
3102 sentence = "Prioritize validator-complete output with explicit evidence".to_string();
3103 }
3104 sentence.push('.');
3105 sentence
3106 }
3107
3108 fn build_phase1_candidate_options(
3109 baseline: &crate::AutomationV2Spec,
3110 phase1: &crate::OptimizationPhase1Config,
3111 ) -> Vec<(
3112 crate::AutomationV2Spec,
3113 crate::OptimizationValidatedMutation,
3114 )> {
3115 let mut options = Vec::new();
3116 let hint = Self::optimization_objective_hint(&phase1.objective_markdown);
3117 for (index, node) in baseline.flow.nodes.iter().enumerate() {
3118 if phase1
3119 .mutation_policy
3120 .allowed_text_fields
3121 .contains(&OptimizationMutableField::Objective)
3122 {
3123 let addition = if node.objective.contains(&hint) {
3124 "Prioritize validator-complete output with concrete evidence."
3125 } else {
3126 &hint
3127 };
3128 let mut candidate = baseline.clone();
3129 candidate.flow.nodes[index].objective =
3130 format!("{} {}", node.objective.trim(), addition.trim())
3131 .trim()
3132 .to_string();
3133 if let Ok(validated) =
3134 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3135 {
3136 options.push((candidate, validated));
3137 }
3138 }
3139 if phase1
3140 .mutation_policy
3141 .allowed_text_fields
3142 .contains(&OptimizationMutableField::OutputContractSummaryGuidance)
3143 {
3144 if let Some(summary_guidance) = node
3145 .output_contract
3146 .as_ref()
3147 .and_then(|contract| contract.summary_guidance.as_ref())
3148 {
3149 let addition = if summary_guidance.contains("Cite concrete evidence") {
3150 "Keep evidence explicit."
3151 } else {
3152 "Cite concrete evidence in the summary."
3153 };
3154 let mut candidate = baseline.clone();
3155 if let Some(contract) = candidate.flow.nodes[index].output_contract.as_mut() {
3156 contract.summary_guidance = Some(
3157 format!("{} {}", summary_guidance.trim(), addition)
3158 .trim()
3159 .to_string(),
3160 );
3161 }
3162 if let Ok(validated) =
3163 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3164 {
3165 options.push((candidate, validated));
3166 }
3167 }
3168 }
3169 if phase1
3170 .mutation_policy
3171 .allowed_knob_fields
3172 .contains(&OptimizationMutableField::TimeoutMs)
3173 {
3174 if let Some(timeout_ms) = node.timeout_ms {
3175 let delta_by_percent = ((timeout_ms as f64)
3176 * phase1.mutation_policy.timeout_delta_percent)
3177 .round() as u64;
3178 let delta = delta_by_percent
3179 .min(phase1.mutation_policy.timeout_delta_ms)
3180 .max(1);
3181 let next = timeout_ms
3182 .saturating_add(delta)
3183 .min(phase1.mutation_policy.timeout_max_ms);
3184 if next != timeout_ms {
3185 let mut candidate = baseline.clone();
3186 candidate.flow.nodes[index].timeout_ms = Some(next);
3187 if let Ok(validated) =
3188 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3189 {
3190 options.push((candidate, validated));
3191 }
3192 }
3193 }
3194 }
3195 if phase1
3196 .mutation_policy
3197 .allowed_knob_fields
3198 .contains(&OptimizationMutableField::RetryPolicyMaxAttempts)
3199 {
3200 let current = node
3201 .retry_policy
3202 .as_ref()
3203 .and_then(Value::as_object)
3204 .and_then(|row| row.get("max_attempts"))
3205 .and_then(Value::as_i64);
3206 if let Some(before) = current {
3207 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3208 if next != before {
3209 let mut candidate = baseline.clone();
3210 let policy = candidate.flow.nodes[index]
3211 .retry_policy
3212 .get_or_insert_with(|| json!({}));
3213 if let Some(object) = policy.as_object_mut() {
3214 object.insert("max_attempts".to_string(), json!(next));
3215 }
3216 if let Ok(validated) =
3217 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3218 {
3219 options.push((candidate, validated));
3220 }
3221 }
3222 }
3223 }
3224 if phase1
3225 .mutation_policy
3226 .allowed_knob_fields
3227 .contains(&OptimizationMutableField::RetryPolicyRetries)
3228 {
3229 let current = node
3230 .retry_policy
3231 .as_ref()
3232 .and_then(Value::as_object)
3233 .and_then(|row| row.get("retries"))
3234 .and_then(Value::as_i64);
3235 if let Some(before) = current {
3236 let next = (before + 1).min(phase1.mutation_policy.retry_max as i64);
3237 if next != before {
3238 let mut candidate = baseline.clone();
3239 let policy = candidate.flow.nodes[index]
3240 .retry_policy
3241 .get_or_insert_with(|| json!({}));
3242 if let Some(object) = policy.as_object_mut() {
3243 object.insert("retries".to_string(), json!(next));
3244 }
3245 if let Ok(validated) =
3246 validate_phase1_candidate_mutation(baseline, &candidate, phase1)
3247 {
3248 options.push((candidate, validated));
3249 }
3250 }
3251 }
3252 }
3253 }
3254 options
3255 }
3256
3257 async fn maybe_queue_phase1_candidate_experiment(
3258 &self,
3259 campaign: &mut OptimizationCampaignRecord,
3260 ) -> Result<bool, String> {
3261 let Some(phase1) = campaign.phase1.as_ref() else {
3262 return Ok(false);
3263 };
3264 let experiment_count = self
3265 .count_optimization_experiments(&campaign.optimization_id)
3266 .await;
3267 if experiment_count >= phase1.budget.max_experiments as usize {
3268 campaign.status = OptimizationCampaignStatus::Completed;
3269 campaign.last_pause_reason = Some("phase 1 experiment budget exhausted".to_string());
3270 campaign.updated_at_ms = now_ms();
3271 return Ok(true);
3272 }
3273 if campaign.baseline_metrics.is_none() || campaign.pending_promotion_experiment_id.is_some()
3274 {
3275 return Ok(false);
3276 }
3277 let existing = self
3278 .list_optimization_experiments(&campaign.optimization_id)
3279 .await;
3280 let active_eval_exists = existing.iter().any(|experiment| {
3281 matches!(experiment.status, OptimizationExperimentStatus::Draft)
3282 && experiment
3283 .metadata
3284 .as_ref()
3285 .and_then(|metadata| metadata.get("eval_run_id"))
3286 .and_then(Value::as_str)
3287 .is_some()
3288 });
3289 if active_eval_exists {
3290 return Ok(false);
3291 }
3292 let existing_hashes = existing
3293 .iter()
3294 .map(|experiment| experiment.candidate_snapshot_hash.clone())
3295 .collect::<std::collections::HashSet<_>>();
3296 let options = Self::build_phase1_candidate_options(&campaign.baseline_snapshot, phase1);
3297 let Some((candidate_snapshot, mutation)) = options.into_iter().find(|(candidate, _)| {
3298 !existing_hashes.contains(&optimization_snapshot_hash(candidate))
3299 }) else {
3300 campaign.status = OptimizationCampaignStatus::Completed;
3301 campaign.last_pause_reason = Some(
3302 "phase 1 deterministic candidate mutator exhausted available mutations".to_string(),
3303 );
3304 campaign.updated_at_ms = now_ms();
3305 return Ok(true);
3306 };
3307 let eval_run = self
3308 .create_automation_v2_run(&candidate_snapshot, "optimization_candidate_eval")
3309 .await
3310 .map_err(|error| error.to_string())?;
3311 let now = now_ms();
3312 let experiment = OptimizationExperimentRecord {
3313 experiment_id: format!("opt-exp-{}", uuid::Uuid::new_v4()),
3314 optimization_id: campaign.optimization_id.clone(),
3315 status: OptimizationExperimentStatus::Draft,
3316 candidate_snapshot: candidate_snapshot.clone(),
3317 candidate_snapshot_hash: optimization_snapshot_hash(&candidate_snapshot),
3318 baseline_snapshot_hash: campaign.baseline_snapshot_hash.clone(),
3319 mutation_summary: Some(mutation.summary.clone()),
3320 metrics: None,
3321 phase1_metrics: None,
3322 promotion_recommendation: None,
3323 promotion_decision: None,
3324 created_at_ms: now,
3325 updated_at_ms: now,
3326 metadata: Some(json!({
3327 "generator": "phase1_deterministic_v1",
3328 "eval_run_id": eval_run.run_id,
3329 "mutation": mutation,
3330 })),
3331 };
3332 self.put_optimization_experiment(experiment)
3333 .await
3334 .map_err(|error| error.to_string())?;
3335 campaign.last_pause_reason = Some("waiting for phase 1 candidate evaluation".to_string());
3336 campaign.updated_at_ms = now_ms();
3337 Ok(true)
3338 }
3339
3340 async fn reconcile_phase1_candidate_experiments(
3341 &self,
3342 campaign: &mut OptimizationCampaignRecord,
3343 ) -> Result<bool, String> {
3344 let Some(phase1) = campaign.phase1.as_ref() else {
3345 return Ok(false);
3346 };
3347 let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() else {
3348 return Ok(false);
3349 };
3350 let experiments = self
3351 .list_optimization_experiments(&campaign.optimization_id)
3352 .await;
3353 let mut changed = false;
3354 for mut experiment in experiments {
3355 if experiment.status != OptimizationExperimentStatus::Draft {
3356 continue;
3357 }
3358 let Some(eval_run_id) = experiment
3359 .metadata
3360 .as_ref()
3361 .and_then(|metadata| metadata.get("eval_run_id"))
3362 .and_then(Value::as_str)
3363 .map(str::to_string)
3364 else {
3365 continue;
3366 };
3367 let Some(run) = self.get_automation_v2_run(&eval_run_id).await else {
3368 continue;
3369 };
3370 if !Self::automation_run_is_terminal(&run.status) {
3371 continue;
3372 }
3373 if run.status != crate::AutomationRunStatus::Completed {
3374 experiment.status = OptimizationExperimentStatus::Failed;
3375 let mut metadata = match experiment.metadata.take() {
3376 Some(Value::Object(map)) => map,
3377 Some(_) => serde_json::Map::new(),
3378 None => serde_json::Map::new(),
3379 };
3380 metadata.insert(
3381 "eval_failure".to_string(),
3382 json!({
3383 "run_id": run.run_id,
3384 "status": run.status,
3385 }),
3386 );
3387 experiment.metadata = Some(Value::Object(metadata));
3388 self.put_optimization_experiment(experiment)
3389 .await
3390 .map_err(|error| error.to_string())?;
3391 changed = true;
3392 continue;
3393 }
3394 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3395 experiment.status = OptimizationExperimentStatus::Failed;
3396 let mut metadata = match experiment.metadata.take() {
3397 Some(Value::Object(map)) => map,
3398 Some(_) => serde_json::Map::new(),
3399 None => serde_json::Map::new(),
3400 };
3401 metadata.insert(
3402 "eval_failure".to_string(),
3403 json!({
3404 "run_id": run.run_id,
3405 "status": run.status,
3406 "reason": "experiment baseline_snapshot_hash does not match current campaign baseline",
3407 }),
3408 );
3409 experiment.metadata = Some(Value::Object(metadata));
3410 self.put_optimization_experiment(experiment)
3411 .await
3412 .map_err(|error| error.to_string())?;
3413 changed = true;
3414 continue;
3415 }
3416 let metrics =
3417 match derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1) {
3418 Ok(metrics) => metrics,
3419 Err(error) => {
3420 experiment.status = OptimizationExperimentStatus::Failed;
3421 let mut metadata = match experiment.metadata.take() {
3422 Some(Value::Object(map)) => map,
3423 Some(_) => serde_json::Map::new(),
3424 None => serde_json::Map::new(),
3425 };
3426 metadata.insert(
3427 "eval_failure".to_string(),
3428 json!({
3429 "run_id": run.run_id,
3430 "status": run.status,
3431 "reason": error,
3432 }),
3433 );
3434 experiment.metadata = Some(Value::Object(metadata));
3435 self.put_optimization_experiment(experiment)
3436 .await
3437 .map_err(|error| error.to_string())?;
3438 changed = true;
3439 continue;
3440 }
3441 };
3442 let decision = evaluate_phase1_promotion(baseline_metrics, &metrics);
3443 experiment.phase1_metrics = Some(metrics.clone());
3444 experiment.metrics = Some(json!({
3445 "artifact_validator_pass_rate": metrics.artifact_validator_pass_rate,
3446 "unmet_requirement_count": metrics.unmet_requirement_count,
3447 "blocked_node_rate": metrics.blocked_node_rate,
3448 "budget_within_limits": metrics.budget_within_limits,
3449 }));
3450 experiment.promotion_recommendation = Some(
3451 match decision.decision {
3452 OptimizationPromotionDecisionKind::Promote => "promote",
3453 OptimizationPromotionDecisionKind::Discard => "discard",
3454 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3455 "needs_operator_review"
3456 }
3457 }
3458 .to_string(),
3459 );
3460 experiment.promotion_decision = Some(decision.clone());
3461 match decision.decision {
3462 OptimizationPromotionDecisionKind::Promote
3463 | OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3464 experiment.status = OptimizationExperimentStatus::PromotionRecommended;
3465 campaign.pending_promotion_experiment_id =
3466 Some(experiment.experiment_id.clone());
3467 campaign.status = OptimizationCampaignStatus::AwaitingPromotionApproval;
3468 campaign.last_pause_reason = Some(decision.reason.clone());
3469 }
3470 OptimizationPromotionDecisionKind::Discard => {
3471 experiment.status = OptimizationExperimentStatus::Discarded;
3472 if campaign.status == OptimizationCampaignStatus::Running {
3473 campaign.last_pause_reason = Some(decision.reason.clone());
3474 }
3475 }
3476 }
3477 self.put_optimization_experiment(experiment)
3478 .await
3479 .map_err(|error| error.to_string())?;
3480 changed = true;
3481 }
3482 let refreshed = self
3483 .list_optimization_experiments(&campaign.optimization_id)
3484 .await;
3485 let consecutive_failures = Self::optimization_consecutive_failure_count(&refreshed);
3486 if consecutive_failures >= phase1.budget.max_consecutive_failures as usize
3487 && phase1.budget.max_consecutive_failures > 0
3488 {
3489 campaign.status = OptimizationCampaignStatus::Failed;
3490 campaign.last_pause_reason = Some(format!(
3491 "phase 1 candidate evaluations reached {} consecutive failures",
3492 consecutive_failures
3493 ));
3494 changed = true;
3495 }
3496 Ok(changed)
3497 }
3498
3499 async fn reconcile_pending_baseline_replays(
3500 &self,
3501 campaign: &mut OptimizationCampaignRecord,
3502 ) -> Result<bool, String> {
3503 let Some(phase1) = campaign.phase1.as_ref() else {
3504 return Ok(false);
3505 };
3506 let mut changed = false;
3507 let mut remaining = Vec::new();
3508 for run_id in campaign.pending_baseline_run_ids.clone() {
3509 let Some(run) = self.get_automation_v2_run(&run_id).await else {
3510 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3511 campaign.last_pause_reason = Some(format!(
3512 "baseline replay run `{run_id}` was not found during optimization reconciliation"
3513 ));
3514 changed = true;
3515 continue;
3516 };
3517 if !Self::automation_run_is_terminal(&run.status) {
3518 remaining.push(run_id);
3519 continue;
3520 }
3521 if run.status != crate::AutomationRunStatus::Completed {
3522 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3523 campaign.last_pause_reason = Some(format!(
3524 "baseline replay run `{}` finished with status `{:?}`",
3525 run.run_id, run.status
3526 ));
3527 changed = true;
3528 continue;
3529 }
3530 if run.automation_id != campaign.source_workflow_id {
3531 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3532 campaign.last_pause_reason = Some(
3533 "baseline replay run must belong to the optimization source workflow"
3534 .to_string(),
3535 );
3536 changed = true;
3537 continue;
3538 }
3539 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3540 "baseline replay run must include an automation snapshot".to_string()
3541 })?;
3542 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3543 campaign.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3544 campaign.last_pause_reason = Some(
3545 "baseline replay run does not match the current campaign baseline snapshot"
3546 .to_string(),
3547 );
3548 changed = true;
3549 continue;
3550 }
3551 let metrics =
3552 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3553 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3554 campaign
3555 .baseline_replays
3556 .push(OptimizationBaselineReplayRecord {
3557 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3558 automation_run_id: Some(run.run_id.clone()),
3559 phase1_metrics: metrics,
3560 validator_case_outcomes,
3561 experiment_count_at_recording: self
3562 .count_optimization_experiments(&campaign.optimization_id)
3563 .await as u64,
3564 recorded_at_ms: now_ms(),
3565 });
3566 changed = true;
3567 }
3568 if remaining != campaign.pending_baseline_run_ids {
3569 campaign.pending_baseline_run_ids = remaining;
3570 changed = true;
3571 }
3572 Ok(changed)
3573 }
3574
3575 pub async fn reconcile_optimization_campaigns(&self) -> Result<usize, String> {
3576 let campaigns = self.list_optimization_campaigns().await;
3577 let mut updated = 0usize;
3578 for campaign in campaigns {
3579 let Some(mut latest) = self
3580 .get_optimization_campaign(&campaign.optimization_id)
3581 .await
3582 else {
3583 continue;
3584 };
3585 let Some(phase1) = latest.phase1.clone() else {
3586 continue;
3587 };
3588 let mut changed = self.reconcile_pending_baseline_replays(&mut latest).await?;
3589 changed |= self
3590 .reconcile_phase1_candidate_experiments(&mut latest)
3591 .await?;
3592 let experiment_count = self
3593 .count_optimization_experiments(&latest.optimization_id)
3594 .await;
3595 if latest.pending_baseline_run_ids.is_empty() {
3596 if phase1_baseline_replay_due(
3597 &latest.baseline_replays,
3598 latest.pending_baseline_run_ids.len(),
3599 &phase1,
3600 experiment_count,
3601 now_ms(),
3602 ) {
3603 if self.maybe_queue_phase1_baseline_replay(&mut latest).await? {
3604 latest.status = OptimizationCampaignStatus::Draft;
3605 changed = true;
3606 }
3607 } else if latest.baseline_replays.len()
3608 >= phase1.eval.campaign_start_baseline_runs.max(1) as usize
3609 {
3610 match establish_phase1_baseline(&latest.baseline_replays, &phase1) {
3611 Ok(metrics) => {
3612 if latest.baseline_metrics.as_ref() != Some(&metrics) {
3613 latest.baseline_metrics = Some(metrics);
3614 changed = true;
3615 }
3616 if matches!(
3617 latest.status,
3618 OptimizationCampaignStatus::Draft
3619 | OptimizationCampaignStatus::PausedEvaluatorUnstable
3620 ) || (latest.status == OptimizationCampaignStatus::Running
3621 && latest.last_pause_reason.is_some())
3622 {
3623 latest.status = OptimizationCampaignStatus::Running;
3624 latest.last_pause_reason = None;
3625 changed = true;
3626 }
3627 }
3628 Err(error) => {
3629 if matches!(
3630 latest.status,
3631 OptimizationCampaignStatus::Draft
3632 | OptimizationCampaignStatus::Running
3633 | OptimizationCampaignStatus::PausedEvaluatorUnstable
3634 ) && (latest.status
3635 != OptimizationCampaignStatus::PausedEvaluatorUnstable
3636 || latest.last_pause_reason.as_deref() != Some(error.as_str()))
3637 {
3638 latest.status = OptimizationCampaignStatus::PausedEvaluatorUnstable;
3639 latest.last_pause_reason = Some(error);
3640 changed = true;
3641 }
3642 }
3643 }
3644 }
3645 } else if latest.last_pause_reason.as_deref()
3646 != Some("waiting for phase 1 baseline replay completion")
3647 {
3648 latest.last_pause_reason =
3649 Some("waiting for phase 1 baseline replay completion".to_string());
3650 changed = true;
3651 }
3652 if latest.status == OptimizationCampaignStatus::Running
3653 && latest.pending_baseline_run_ids.is_empty()
3654 {
3655 changed |= self
3656 .maybe_queue_phase1_candidate_experiment(&mut latest)
3657 .await?;
3658 }
3659 if changed {
3660 self.put_optimization_campaign(latest)
3661 .await
3662 .map_err(|error| error.to_string())?;
3663 updated = updated.saturating_add(1);
3664 }
3665 }
3666 Ok(updated)
3667 }
3668
3669 async fn maybe_queue_phase1_baseline_replay(
3670 &self,
3671 campaign: &mut OptimizationCampaignRecord,
3672 ) -> Result<bool, String> {
3673 let Some(phase1) = campaign.phase1.as_ref() else {
3674 return Ok(false);
3675 };
3676 if !campaign.pending_baseline_run_ids.is_empty() {
3677 campaign.last_pause_reason =
3678 Some("waiting for phase 1 baseline replay completion".into());
3679 campaign.updated_at_ms = now_ms();
3680 return Ok(true);
3681 }
3682 let experiment_count = self
3683 .count_optimization_experiments(&campaign.optimization_id)
3684 .await;
3685 if !phase1_baseline_replay_due(
3686 &campaign.baseline_replays,
3687 campaign.pending_baseline_run_ids.len(),
3688 phase1,
3689 experiment_count,
3690 now_ms(),
3691 ) {
3692 return Ok(false);
3693 }
3694 let replay_run = self
3695 .create_automation_v2_run(&campaign.baseline_snapshot, "optimization_baseline_replay")
3696 .await
3697 .map_err(|error| error.to_string())?;
3698 if !campaign
3699 .pending_baseline_run_ids
3700 .iter()
3701 .any(|value| value == &replay_run.run_id)
3702 {
3703 campaign
3704 .pending_baseline_run_ids
3705 .push(replay_run.run_id.clone());
3706 }
3707 campaign.last_pause_reason = Some("waiting for phase 1 baseline replay completion".into());
3708 campaign.updated_at_ms = now_ms();
3709 Ok(true)
3710 }
3711
3712 async fn maybe_queue_initial_phase1_baseline_replay(
3713 &self,
3714 campaign: &mut OptimizationCampaignRecord,
3715 ) -> Result<bool, String> {
3716 let Some(phase1) = campaign.phase1.as_ref() else {
3717 return Ok(false);
3718 };
3719 let required_runs = phase1.eval.campaign_start_baseline_runs.max(1) as usize;
3720 if campaign.baseline_replays.len() >= required_runs {
3721 return Ok(false);
3722 }
3723 self.maybe_queue_phase1_baseline_replay(campaign).await
3724 }
3725
3726 pub async fn apply_optimization_action(
3727 &self,
3728 optimization_id: &str,
3729 action: &str,
3730 experiment_id: Option<&str>,
3731 run_id: Option<&str>,
3732 reason: Option<&str>,
3733 ) -> Result<OptimizationCampaignRecord, String> {
3734 let normalized = action.trim().to_ascii_lowercase();
3735 let mut campaign = self
3736 .get_optimization_campaign(optimization_id)
3737 .await
3738 .ok_or_else(|| "optimization not found".to_string())?;
3739 match normalized.as_str() {
3740 "start" => {
3741 if campaign.phase1.is_some() {
3742 if self
3743 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3744 .await?
3745 {
3746 campaign.status = OptimizationCampaignStatus::Draft;
3747 } else {
3748 let phase1 = campaign
3749 .phase1
3750 .as_ref()
3751 .ok_or_else(|| "phase 1 config is required".to_string())?;
3752 match establish_phase1_baseline(&campaign.baseline_replays, phase1) {
3753 Ok(metrics) => {
3754 campaign.baseline_metrics = Some(metrics);
3755 campaign.status = OptimizationCampaignStatus::Running;
3756 campaign.last_pause_reason = None;
3757 }
3758 Err(error) => {
3759 campaign.status =
3760 OptimizationCampaignStatus::PausedEvaluatorUnstable;
3761 campaign.last_pause_reason = Some(error);
3762 }
3763 }
3764 }
3765 } else {
3766 campaign.status = OptimizationCampaignStatus::Running;
3767 campaign.last_pause_reason = None;
3768 }
3769 }
3770 "pause" => {
3771 campaign.status = OptimizationCampaignStatus::PausedManual;
3772 campaign.last_pause_reason = reason
3773 .map(str::trim)
3774 .filter(|value| !value.is_empty())
3775 .map(str::to_string);
3776 }
3777 "resume" => {
3778 if self
3779 .maybe_queue_initial_phase1_baseline_replay(&mut campaign)
3780 .await?
3781 {
3782 campaign.status = OptimizationCampaignStatus::Draft;
3783 } else {
3784 campaign.status = OptimizationCampaignStatus::Running;
3785 campaign.last_pause_reason = None;
3786 }
3787 }
3788 "queue_baseline_replay" => {
3789 let replay_run = self
3790 .create_automation_v2_run(
3791 &campaign.baseline_snapshot,
3792 "optimization_baseline_replay",
3793 )
3794 .await
3795 .map_err(|error| error.to_string())?;
3796 if !campaign
3797 .pending_baseline_run_ids
3798 .iter()
3799 .any(|value| value == &replay_run.run_id)
3800 {
3801 campaign
3802 .pending_baseline_run_ids
3803 .push(replay_run.run_id.clone());
3804 }
3805 campaign.updated_at_ms = now_ms();
3806 }
3807 "record_baseline_replay" => {
3808 let run_id = run_id
3809 .map(str::trim)
3810 .filter(|value| !value.is_empty())
3811 .ok_or_else(|| "run_id is required for record_baseline_replay".to_string())?;
3812 let phase1 = campaign
3813 .phase1
3814 .as_ref()
3815 .ok_or_else(|| "phase 1 config is required for baseline replay".to_string())?;
3816 let run = self
3817 .get_automation_v2_run(run_id)
3818 .await
3819 .ok_or_else(|| "automation run not found".to_string())?;
3820 if run.automation_id != campaign.source_workflow_id {
3821 return Err(
3822 "baseline replay run must belong to the optimization source workflow"
3823 .to_string(),
3824 );
3825 }
3826 let snapshot = run.automation_snapshot.as_ref().ok_or_else(|| {
3827 "baseline replay run must include an automation snapshot".to_string()
3828 })?;
3829 if optimization_snapshot_hash(snapshot) != campaign.baseline_snapshot_hash {
3830 return Err(
3831 "baseline replay run does not match the current campaign baseline snapshot"
3832 .to_string(),
3833 );
3834 }
3835 let metrics =
3836 derive_phase1_metrics_from_run(&run, &campaign.baseline_snapshot, phase1)?;
3837 let validator_case_outcomes = derive_phase1_validator_case_outcomes_from_run(&run);
3838 campaign
3839 .baseline_replays
3840 .push(OptimizationBaselineReplayRecord {
3841 replay_id: format!("baseline-replay-{}", uuid::Uuid::new_v4()),
3842 automation_run_id: Some(run.run_id.clone()),
3843 phase1_metrics: metrics,
3844 validator_case_outcomes,
3845 experiment_count_at_recording: self
3846 .count_optimization_experiments(&campaign.optimization_id)
3847 .await as u64,
3848 recorded_at_ms: now_ms(),
3849 });
3850 campaign
3851 .pending_baseline_run_ids
3852 .retain(|value| value != run_id);
3853 campaign.updated_at_ms = now_ms();
3854 }
3855 "approve_winner" => {
3856 let experiment_id = experiment_id
3857 .map(str::trim)
3858 .filter(|value| !value.is_empty())
3859 .ok_or_else(|| "experiment_id is required for approve_winner".to_string())?;
3860 let mut experiment = self
3861 .get_optimization_experiment(optimization_id, experiment_id)
3862 .await
3863 .ok_or_else(|| "experiment not found".to_string())?;
3864 if experiment.baseline_snapshot_hash != campaign.baseline_snapshot_hash {
3865 return Err(
3866 "experiment baseline_snapshot_hash does not match current campaign baseline"
3867 .to_string(),
3868 );
3869 }
3870 if let Some(phase1) = campaign.phase1.as_ref() {
3871 let validated = validate_phase1_candidate_mutation(
3872 &campaign.baseline_snapshot,
3873 &experiment.candidate_snapshot,
3874 phase1,
3875 )?;
3876 if experiment.mutation_summary.is_none() {
3877 experiment.mutation_summary = Some(validated.summary.clone());
3878 }
3879 let approved_at_ms = now_ms();
3880 let apply_patch = Self::build_optimization_apply_patch(
3881 &campaign.baseline_snapshot,
3882 &experiment.candidate_snapshot,
3883 &validated,
3884 approved_at_ms,
3885 )?;
3886 let mut metadata = match experiment.metadata.take() {
3887 Some(Value::Object(map)) => map,
3888 Some(_) => {
3889 return Err("experiment metadata must be a JSON object".to_string());
3890 }
3891 None => serde_json::Map::new(),
3892 };
3893 metadata.insert("apply_patch".to_string(), apply_patch);
3894 experiment.metadata = Some(Value::Object(metadata));
3895 if let Some(baseline_metrics) = campaign.baseline_metrics.as_ref() {
3896 let candidate_metrics = experiment
3897 .phase1_metrics
3898 .clone()
3899 .or_else(|| {
3900 experiment
3901 .metrics
3902 .as_ref()
3903 .and_then(|metrics| parse_phase1_metrics(metrics).ok())
3904 })
3905 .ok_or_else(|| {
3906 "phase 1 candidate is missing promotion metrics".to_string()
3907 })?;
3908 let decision =
3909 evaluate_phase1_promotion(baseline_metrics, &candidate_metrics);
3910 experiment.promotion_recommendation = Some(
3911 match decision.decision {
3912 OptimizationPromotionDecisionKind::Promote => "promote",
3913 OptimizationPromotionDecisionKind::Discard => "discard",
3914 OptimizationPromotionDecisionKind::NeedsOperatorReview => {
3915 "needs_operator_review"
3916 }
3917 }
3918 .to_string(),
3919 );
3920 experiment.promotion_decision = Some(decision.clone());
3921 if decision.decision != OptimizationPromotionDecisionKind::Promote {
3922 let _ = self
3923 .put_optimization_experiment(experiment)
3924 .await
3925 .map_err(|e| e.to_string())?;
3926 return Err(decision.reason);
3927 }
3928 campaign.baseline_metrics = Some(candidate_metrics);
3929 }
3930 }
3931 campaign.baseline_snapshot = experiment.candidate_snapshot.clone();
3932 campaign.baseline_snapshot_hash = experiment.candidate_snapshot_hash.clone();
3933 campaign.baseline_replays.clear();
3934 campaign.pending_baseline_run_ids.clear();
3935 campaign.pending_promotion_experiment_id = None;
3936 campaign.status = OptimizationCampaignStatus::Draft;
3937 campaign.last_pause_reason = None;
3938 experiment.status = OptimizationExperimentStatus::PromotionApproved;
3939 let _ = self
3940 .put_optimization_experiment(experiment)
3941 .await
3942 .map_err(|e| e.to_string())?;
3943 }
3944 "reject_winner" => {
3945 let experiment_id = experiment_id
3946 .map(str::trim)
3947 .filter(|value| !value.is_empty())
3948 .ok_or_else(|| "experiment_id is required for reject_winner".to_string())?;
3949 let mut experiment = self
3950 .get_optimization_experiment(optimization_id, experiment_id)
3951 .await
3952 .ok_or_else(|| "experiment not found".to_string())?;
3953 campaign.pending_promotion_experiment_id = None;
3954 campaign.status = OptimizationCampaignStatus::Draft;
3955 campaign.last_pause_reason = reason
3956 .map(str::trim)
3957 .filter(|value| !value.is_empty())
3958 .map(str::to_string);
3959 experiment.status = OptimizationExperimentStatus::PromotionRejected;
3960 let _ = self
3961 .put_optimization_experiment(experiment)
3962 .await
3963 .map_err(|e| e.to_string())?;
3964 }
3965 _ => return Err("unsupported optimization action".to_string()),
3966 }
3967 self.put_optimization_campaign(campaign)
3968 .await
3969 .map_err(|e| e.to_string())
3970 }
3971
3972 pub async fn list_automations_v2(&self) -> Vec<AutomationV2Spec> {
3973 let mut rows = self
3974 .automations_v2
3975 .read()
3976 .await
3977 .values()
3978 .cloned()
3979 .collect::<Vec<_>>();
3980 rows.sort_by(|a, b| a.automation_id.cmp(&b.automation_id));
3981 rows
3982 }
3983
3984 pub async fn delete_automation_v2(
3985 &self,
3986 automation_id: &str,
3987 ) -> anyhow::Result<Option<AutomationV2Spec>> {
3988 let removed = self.automations_v2.write().await.remove(automation_id);
3989 let removed_run_count = {
3990 let mut runs = self.automation_v2_runs.write().await;
3991 let before = runs.len();
3992 runs.retain(|_, run| run.automation_id != automation_id);
3993 before.saturating_sub(runs.len())
3994 };
3995 self.persist_automations_v2().await?;
3996 if removed_run_count > 0 {
3997 self.persist_automation_v2_runs().await?;
3998 }
3999 self.verify_automation_v2_persisted(automation_id, false)
4000 .await?;
4001 Ok(removed)
4002 }
4003
4004 pub async fn create_automation_v2_run(
4005 &self,
4006 automation: &AutomationV2Spec,
4007 trigger_type: &str,
4008 ) -> anyhow::Result<AutomationV2RunRecord> {
4009 let now = now_ms();
4010 let pending_nodes = automation
4011 .flow
4012 .nodes
4013 .iter()
4014 .map(|n| n.node_id.clone())
4015 .collect::<Vec<_>>();
4016 let run = AutomationV2RunRecord {
4017 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4018 automation_id: automation.automation_id.clone(),
4019 trigger_type: trigger_type.to_string(),
4020 status: AutomationRunStatus::Queued,
4021 created_at_ms: now,
4022 updated_at_ms: now,
4023 started_at_ms: None,
4024 finished_at_ms: None,
4025 active_session_ids: Vec::new(),
4026 latest_session_id: None,
4027 active_instance_ids: Vec::new(),
4028 checkpoint: AutomationRunCheckpoint {
4029 completed_nodes: Vec::new(),
4030 pending_nodes,
4031 node_outputs: std::collections::HashMap::new(),
4032 node_attempts: std::collections::HashMap::new(),
4033 blocked_nodes: Vec::new(),
4034 awaiting_gate: None,
4035 gate_history: Vec::new(),
4036 lifecycle_history: Vec::new(),
4037 last_failure: None,
4038 },
4039 runtime_context: automation
4040 .runtime_context_materialization()
4041 .or_else(|| automation.approved_plan_runtime_context_materialization()),
4042 automation_snapshot: Some(automation.clone()),
4043 pause_reason: None,
4044 resume_reason: None,
4045 detail: None,
4046 stop_kind: None,
4047 stop_reason: None,
4048 prompt_tokens: 0,
4049 completion_tokens: 0,
4050 total_tokens: 0,
4051 estimated_cost_usd: 0.0,
4052 scheduler: None,
4053 };
4054 self.automation_v2_runs
4055 .write()
4056 .await
4057 .insert(run.run_id.clone(), run.clone());
4058 self.persist_automation_v2_runs().await?;
4059 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4060 .await
4061 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4062 Ok(run)
4063 }
4064
4065 pub async fn create_automation_v2_dry_run(
4066 &self,
4067 automation: &AutomationV2Spec,
4068 trigger_type: &str,
4069 ) -> anyhow::Result<AutomationV2RunRecord> {
4070 let now = now_ms();
4071 let run = AutomationV2RunRecord {
4072 run_id: format!("automation-v2-run-{}", uuid::Uuid::new_v4()),
4073 automation_id: automation.automation_id.clone(),
4074 trigger_type: format!("{trigger_type}_dry_run"),
4075 status: AutomationRunStatus::Completed,
4076 created_at_ms: now,
4077 updated_at_ms: now,
4078 started_at_ms: Some(now),
4079 finished_at_ms: Some(now),
4080 active_session_ids: Vec::new(),
4081 latest_session_id: None,
4082 active_instance_ids: Vec::new(),
4083 checkpoint: AutomationRunCheckpoint {
4084 completed_nodes: Vec::new(),
4085 pending_nodes: Vec::new(),
4086 node_outputs: std::collections::HashMap::new(),
4087 node_attempts: std::collections::HashMap::new(),
4088 blocked_nodes: Vec::new(),
4089 awaiting_gate: None,
4090 gate_history: Vec::new(),
4091 lifecycle_history: Vec::new(),
4092 last_failure: None,
4093 },
4094 runtime_context: automation
4095 .runtime_context_materialization()
4096 .or_else(|| automation.approved_plan_runtime_context_materialization()),
4097 automation_snapshot: Some(automation.clone()),
4098 pause_reason: None,
4099 resume_reason: None,
4100 detail: Some("dry_run".to_string()),
4101 stop_kind: None,
4102 stop_reason: None,
4103 prompt_tokens: 0,
4104 completion_tokens: 0,
4105 total_tokens: 0,
4106 estimated_cost_usd: 0.0,
4107 scheduler: None,
4108 };
4109 self.automation_v2_runs
4110 .write()
4111 .await
4112 .insert(run.run_id.clone(), run.clone());
4113 self.persist_automation_v2_runs().await?;
4114 crate::http::context_runs::sync_automation_v2_run_blackboard(self, automation, &run)
4115 .await
4116 .map_err(|status| anyhow::anyhow!("failed to sync automation context run: {status}"))?;
4117 Ok(run)
4118 }
4119
4120 pub async fn get_automation_v2_run(&self, run_id: &str) -> Option<AutomationV2RunRecord> {
4121 self.automation_v2_runs.read().await.get(run_id).cloned()
4122 }
4123
4124 pub async fn list_automation_v2_runs(
4125 &self,
4126 automation_id: Option<&str>,
4127 limit: usize,
4128 ) -> Vec<AutomationV2RunRecord> {
4129 let mut rows = self
4130 .automation_v2_runs
4131 .read()
4132 .await
4133 .values()
4134 .filter(|row| {
4135 if let Some(id) = automation_id {
4136 row.automation_id == id
4137 } else {
4138 true
4139 }
4140 })
4141 .cloned()
4142 .collect::<Vec<_>>();
4143 rows.sort_by(|a, b| b.created_at_ms.cmp(&a.created_at_ms));
4144 rows.truncate(limit.clamp(1, 500));
4145 rows
4146 }
4147
4148 async fn automation_v2_run_workspace_root(
4149 &self,
4150 run: &AutomationV2RunRecord,
4151 ) -> Option<String> {
4152 if let Some(root) = run
4153 .automation_snapshot
4154 .as_ref()
4155 .and_then(|automation| automation.workspace_root.as_ref())
4156 .map(|value| value.trim())
4157 .filter(|value| !value.is_empty())
4158 {
4159 return Some(root.to_string());
4160 }
4161 self.get_automation_v2(&run.automation_id)
4162 .await
4163 .and_then(|automation| automation.workspace_root)
4164 .map(|value| value.trim().to_string())
4165 .filter(|value| !value.is_empty())
4166 }
4167
4168 async fn sync_automation_scheduler_for_run_transition(
4169 &self,
4170 previous_status: AutomationRunStatus,
4171 run: &AutomationV2RunRecord,
4172 ) {
4173 let had_capacity = automation_status_uses_scheduler_capacity(&previous_status);
4174 let has_capacity = automation_status_uses_scheduler_capacity(&run.status);
4175 let had_lock = automation_status_holds_workspace_lock(&previous_status);
4176 let has_lock = automation_status_holds_workspace_lock(&run.status);
4177 let workspace_root = self.automation_v2_run_workspace_root(run).await;
4178 let mut scheduler = self.automation_scheduler.write().await;
4179
4180 if (had_capacity || had_lock) && !has_capacity && !has_lock {
4181 scheduler.release_run(&run.run_id);
4182 return;
4183 }
4184 if had_capacity && !has_capacity {
4185 scheduler.release_capacity(&run.run_id);
4186 }
4187 if had_lock && !has_lock {
4188 scheduler.release_workspace(&run.run_id);
4189 }
4190 if !had_lock && has_lock {
4191 if has_capacity {
4192 scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4193 } else {
4194 scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4195 }
4196 return;
4197 }
4198 if !had_capacity && has_capacity {
4199 scheduler.admit_run(&run.run_id, workspace_root.as_deref());
4200 }
4201 }
4202
4203 pub async fn recover_in_flight_runs(&self) -> usize {
4204 let runs = self
4205 .automation_v2_runs
4206 .read()
4207 .await
4208 .values()
4209 .cloned()
4210 .collect::<Vec<_>>();
4211 let mut recovered = 0usize;
4212 for run in runs {
4213 match run.status {
4214 AutomationRunStatus::Running => {
4215 let detail = "automation run interrupted by server restart".to_string();
4216 if self
4217 .update_automation_v2_run(&run.run_id, |row| {
4218 row.status = AutomationRunStatus::Failed;
4219 row.detail = Some(detail.clone());
4220 row.stop_kind = Some(AutomationStopKind::ServerRestart);
4221 row.stop_reason = Some(detail.clone());
4222 automation::record_automation_lifecycle_event(
4223 row,
4224 "run_failed_server_restart",
4225 Some(detail.clone()),
4226 Some(AutomationStopKind::ServerRestart),
4227 );
4228 })
4229 .await
4230 .is_some()
4231 {
4232 recovered += 1;
4233 }
4234 }
4235 AutomationRunStatus::Paused
4236 | AutomationRunStatus::Pausing
4237 | AutomationRunStatus::AwaitingApproval => {
4238 let workspace_root = self.automation_v2_run_workspace_root(&run).await;
4239 let mut scheduler = self.automation_scheduler.write().await;
4240 scheduler.reserve_workspace(&run.run_id, workspace_root.as_deref());
4241 for (node_id, output) in &run.checkpoint.node_outputs {
4242 if let Some((path, content_digest)) =
4243 automation::node_output::automation_output_validated_artifact(output)
4244 {
4245 scheduler.preexisting_registry.register_validated(
4246 &run.run_id,
4247 node_id,
4248 automation::scheduler::ValidatedArtifact {
4249 path,
4250 content_digest,
4251 },
4252 );
4253 }
4254 }
4255 }
4256 _ => {}
4257 }
4258 }
4259 recovered
4260 }
4261
4262 pub fn is_automation_scheduler_stopping(&self) -> bool {
4263 self.automation_scheduler_stopping.load(Ordering::Relaxed)
4264 }
4265
4266 pub fn set_automation_scheduler_stopping(&self, stopping: bool) {
4267 self.automation_scheduler_stopping
4268 .store(stopping, Ordering::Relaxed);
4269 }
4270
4271 pub async fn fail_running_automation_runs_for_shutdown(&self) -> usize {
4272 let run_ids = self
4273 .automation_v2_runs
4274 .read()
4275 .await
4276 .values()
4277 .filter(|run| matches!(run.status, AutomationRunStatus::Running))
4278 .map(|run| run.run_id.clone())
4279 .collect::<Vec<_>>();
4280 let mut failed = 0usize;
4281 for run_id in run_ids {
4282 let detail = "automation run stopped during server shutdown".to_string();
4283 if self
4284 .update_automation_v2_run(&run_id, |row| {
4285 row.status = AutomationRunStatus::Failed;
4286 row.detail = Some(detail.clone());
4287 row.stop_kind = Some(AutomationStopKind::Shutdown);
4288 row.stop_reason = Some(detail.clone());
4289 automation::record_automation_lifecycle_event(
4290 row,
4291 "run_failed_shutdown",
4292 Some(detail.clone()),
4293 Some(AutomationStopKind::Shutdown),
4294 );
4295 })
4296 .await
4297 .is_some()
4298 {
4299 failed += 1;
4300 }
4301 }
4302 failed
4303 }
4304
4305 pub async fn claim_next_queued_automation_v2_run(&self) -> Option<AutomationV2RunRecord> {
4306 let run_id = self
4307 .automation_v2_runs
4308 .read()
4309 .await
4310 .values()
4311 .filter(|row| row.status == AutomationRunStatus::Queued)
4312 .min_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms))
4313 .map(|row| row.run_id.clone())?;
4314 self.claim_specific_automation_v2_run(&run_id).await
4315 }
4316 pub async fn claim_specific_automation_v2_run(
4317 &self,
4318 run_id: &str,
4319 ) -> Option<AutomationV2RunRecord> {
4320 let mut guard = self.automation_v2_runs.write().await;
4321 let run = guard.get_mut(run_id)?;
4322 if run.status != AutomationRunStatus::Queued {
4323 return None;
4324 }
4325 let previous_status = run.status.clone();
4326 let now = now_ms();
4327 if run.runtime_context.is_none() {
4328 run.runtime_context = run.automation_snapshot.as_ref().and_then(|automation| {
4329 automation
4330 .runtime_context_materialization()
4331 .or_else(|| automation.approved_plan_runtime_context_materialization())
4332 });
4333 }
4334 if run.runtime_context.is_none() {
4335 let previous_status = run.status.clone();
4336 let now = now_ms();
4337 run.status = AutomationRunStatus::Failed;
4338 run.updated_at_ms = now;
4339 run.finished_at_ms.get_or_insert(now);
4340 run.scheduler = None;
4341 run.detail = Some("runtime context partition missing for automation run".to_string());
4342 let claimed = run.clone();
4343 drop(guard);
4344 self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
4345 .await;
4346 let _ = self.persist_automation_v2_runs().await;
4347 return None;
4348 }
4349 run.status = AutomationRunStatus::Running;
4350 run.updated_at_ms = now;
4351 run.started_at_ms.get_or_insert(now);
4352 run.scheduler = None;
4353 let claimed = run.clone();
4354 drop(guard);
4355 self.sync_automation_scheduler_for_run_transition(previous_status, &claimed)
4356 .await;
4357 let _ = self.persist_automation_v2_runs().await;
4358 Some(claimed)
4359 }
4360 pub async fn update_automation_v2_run(
4361 &self,
4362 run_id: &str,
4363 update: impl FnOnce(&mut AutomationV2RunRecord),
4364 ) -> Option<AutomationV2RunRecord> {
4365 let mut guard = self.automation_v2_runs.write().await;
4366 let run = guard.get_mut(run_id)?;
4367 let previous_status = run.status.clone();
4368 update(run);
4369 if run.status != AutomationRunStatus::Queued {
4370 run.scheduler = None;
4371 }
4372 run.updated_at_ms = now_ms();
4373 if matches!(
4374 run.status,
4375 AutomationRunStatus::Completed
4376 | AutomationRunStatus::Blocked
4377 | AutomationRunStatus::Failed
4378 | AutomationRunStatus::Cancelled
4379 ) {
4380 run.finished_at_ms.get_or_insert_with(now_ms);
4381 }
4382 let out = run.clone();
4383 drop(guard);
4384 self.sync_automation_scheduler_for_run_transition(previous_status, &out)
4385 .await;
4386 let _ = self.persist_automation_v2_runs().await;
4387 Some(out)
4388 }
4389
4390 pub async fn set_automation_v2_run_scheduler_metadata(
4391 &self,
4392 run_id: &str,
4393 meta: automation::SchedulerMetadata,
4394 ) -> Option<AutomationV2RunRecord> {
4395 self.update_automation_v2_run(run_id, |row| {
4396 row.scheduler = Some(meta);
4397 })
4398 .await
4399 }
4400
4401 pub async fn clear_automation_v2_run_scheduler_metadata(
4402 &self,
4403 run_id: &str,
4404 ) -> Option<AutomationV2RunRecord> {
4405 self.update_automation_v2_run(run_id, |row| {
4406 row.scheduler = None;
4407 })
4408 .await
4409 }
4410
4411 pub async fn add_automation_v2_session(
4412 &self,
4413 run_id: &str,
4414 session_id: &str,
4415 ) -> Option<AutomationV2RunRecord> {
4416 let updated = self
4417 .update_automation_v2_run(run_id, |row| {
4418 if !row.active_session_ids.iter().any(|id| id == session_id) {
4419 row.active_session_ids.push(session_id.to_string());
4420 }
4421 row.latest_session_id = Some(session_id.to_string());
4422 })
4423 .await;
4424 self.automation_v2_session_runs
4425 .write()
4426 .await
4427 .insert(session_id.to_string(), run_id.to_string());
4428 updated
4429 }
4430
4431 pub async fn clear_automation_v2_session(
4432 &self,
4433 run_id: &str,
4434 session_id: &str,
4435 ) -> Option<AutomationV2RunRecord> {
4436 self.automation_v2_session_runs
4437 .write()
4438 .await
4439 .remove(session_id);
4440 self.update_automation_v2_run(run_id, |row| {
4441 row.active_session_ids.retain(|id| id != session_id);
4442 })
4443 .await
4444 }
4445
4446 pub async fn forget_automation_v2_sessions(&self, session_ids: &[String]) {
4447 let mut guard = self.automation_v2_session_runs.write().await;
4448 for session_id in session_ids {
4449 guard.remove(session_id);
4450 }
4451 }
4452
4453 pub async fn add_automation_v2_instance(
4454 &self,
4455 run_id: &str,
4456 instance_id: &str,
4457 ) -> Option<AutomationV2RunRecord> {
4458 self.update_automation_v2_run(run_id, |row| {
4459 if !row.active_instance_ids.iter().any(|id| id == instance_id) {
4460 row.active_instance_ids.push(instance_id.to_string());
4461 }
4462 })
4463 .await
4464 }
4465
4466 pub async fn clear_automation_v2_instance(
4467 &self,
4468 run_id: &str,
4469 instance_id: &str,
4470 ) -> Option<AutomationV2RunRecord> {
4471 self.update_automation_v2_run(run_id, |row| {
4472 row.active_instance_ids.retain(|id| id != instance_id);
4473 })
4474 .await
4475 }
4476
4477 pub async fn apply_provider_usage_to_runs(
4478 &self,
4479 session_id: &str,
4480 prompt_tokens: u64,
4481 completion_tokens: u64,
4482 total_tokens: u64,
4483 ) {
4484 if let Some(policy) = self.routine_session_policy(session_id).await {
4485 let rate = self.token_cost_per_1k_usd.max(0.0);
4486 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4487 let mut guard = self.routine_runs.write().await;
4488 if let Some(run) = guard.get_mut(&policy.run_id) {
4489 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4490 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4491 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4492 run.estimated_cost_usd += delta_cost;
4493 run.updated_at_ms = now_ms();
4494 }
4495 drop(guard);
4496 let _ = self.persist_routine_runs().await;
4497 }
4498
4499 let maybe_v2_run_id = self
4500 .automation_v2_session_runs
4501 .read()
4502 .await
4503 .get(session_id)
4504 .cloned();
4505 if let Some(run_id) = maybe_v2_run_id {
4506 let rate = self.token_cost_per_1k_usd.max(0.0);
4507 let delta_cost = (total_tokens as f64 / 1000.0) * rate;
4508 let mut guard = self.automation_v2_runs.write().await;
4509 if let Some(run) = guard.get_mut(&run_id) {
4510 run.prompt_tokens = run.prompt_tokens.saturating_add(prompt_tokens);
4511 run.completion_tokens = run.completion_tokens.saturating_add(completion_tokens);
4512 run.total_tokens = run.total_tokens.saturating_add(total_tokens);
4513 run.estimated_cost_usd += delta_cost;
4514 run.updated_at_ms = now_ms();
4515 }
4516 drop(guard);
4517 let _ = self.persist_automation_v2_runs().await;
4518 }
4519 }
4520
4521 pub async fn evaluate_automation_v2_misfires(&self, now_ms: u64) -> Vec<String> {
4522 let mut fired = Vec::new();
4523 let mut guard = self.automations_v2.write().await;
4524 for automation in guard.values_mut() {
4525 if automation.status != AutomationV2Status::Active {
4526 continue;
4527 }
4528 let Some(next_fire_at_ms) = automation.next_fire_at_ms else {
4529 automation.next_fire_at_ms =
4530 automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4531 continue;
4532 };
4533 if now_ms < next_fire_at_ms {
4534 continue;
4535 }
4536 let run_count =
4537 automation_schedule_due_count(&automation.schedule, now_ms, next_fire_at_ms);
4538 let next = automation_schedule_next_fire_at_ms(&automation.schedule, now_ms);
4539 automation.next_fire_at_ms = next;
4540 automation.last_fired_at_ms = Some(now_ms);
4541 for _ in 0..run_count {
4542 fired.push(automation.automation_id.clone());
4543 }
4544 }
4545 drop(guard);
4546 let _ = self.persist_automations_v2().await;
4547 fired
4548 }
4549}
4550
4551async fn build_channels_config(
4552 state: &AppState,
4553 channels: &ChannelsConfigFile,
4554) -> Option<ChannelsConfig> {
4555 if channels.telegram.is_none() && channels.discord.is_none() && channels.slack.is_none() {
4556 return None;
4557 }
4558 Some(ChannelsConfig {
4559 telegram: channels.telegram.clone().map(|cfg| TelegramConfig {
4560 bot_token: cfg.bot_token,
4561 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4562 mention_only: cfg.mention_only,
4563 style_profile: cfg.style_profile,
4564 security_profile: cfg.security_profile,
4565 }),
4566 discord: channels.discord.clone().map(|cfg| DiscordConfig {
4567 bot_token: cfg.bot_token,
4568 guild_id: cfg.guild_id,
4569 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4570 mention_only: cfg.mention_only,
4571 security_profile: cfg.security_profile,
4572 }),
4573 slack: channels.slack.clone().map(|cfg| SlackConfig {
4574 bot_token: cfg.bot_token,
4575 channel_id: cfg.channel_id,
4576 allowed_users: config::channels::normalize_allowed_users_or_wildcard(cfg.allowed_users),
4577 mention_only: cfg.mention_only,
4578 security_profile: cfg.security_profile,
4579 }),
4580 server_base_url: state.server_base_url(),
4581 api_token: state.api_token().await.unwrap_or_default(),
4582 tool_policy: channels.tool_policy.clone(),
4583 })
4584}
4585
4586fn is_valid_owner_repo_slug(value: &str) -> bool {
4589 let trimmed = value.trim();
4590 if trimmed.is_empty() || trimmed.starts_with('/') || trimmed.ends_with('/') {
4591 return false;
4592 }
4593 let mut parts = trimmed.split('/');
4594 let Some(owner) = parts.next() else {
4595 return false;
4596 };
4597 let Some(repo) = parts.next() else {
4598 return false;
4599 };
4600 parts.next().is_none() && !owner.trim().is_empty() && !repo.trim().is_empty()
4601}
4602
4603fn legacy_automations_v2_path() -> Option<PathBuf> {
4604 config::paths::resolve_legacy_root_file_path("automations_v2.json")
4605 .filter(|path| path != &config::paths::resolve_automations_v2_path())
4606}
4607
4608fn candidate_automations_v2_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4609 let mut candidates = vec![active_path.clone()];
4610 if let Some(legacy_path) = legacy_automations_v2_path() {
4611 if !candidates.contains(&legacy_path) {
4612 candidates.push(legacy_path);
4613 }
4614 }
4615 let default_path = config::paths::default_state_dir().join("automations_v2.json");
4616 if !candidates.contains(&default_path) {
4617 candidates.push(default_path);
4618 }
4619 candidates
4620}
4621
4622async fn cleanup_stale_legacy_automations_v2_file(active_path: &PathBuf) -> anyhow::Result<()> {
4623 let Some(legacy_path) = legacy_automations_v2_path() else {
4624 return Ok(());
4625 };
4626 if legacy_path == *active_path || !legacy_path.exists() {
4627 return Ok(());
4628 }
4629 fs::remove_file(&legacy_path).await?;
4630 tracing::info!(
4631 active_path = active_path.display().to_string(),
4632 removed_path = legacy_path.display().to_string(),
4633 "removed stale legacy automation v2 file after canonical persistence"
4634 );
4635 Ok(())
4636}
4637
4638fn legacy_automation_v2_runs_path() -> Option<PathBuf> {
4639 config::paths::resolve_legacy_root_file_path("automation_v2_runs.json")
4640 .filter(|path| path != &config::paths::resolve_automation_v2_runs_path())
4641}
4642
4643fn candidate_automation_v2_runs_paths(active_path: &PathBuf) -> Vec<PathBuf> {
4644 let mut candidates = vec![active_path.clone()];
4645 if let Some(legacy_path) = legacy_automation_v2_runs_path() {
4646 if !candidates.contains(&legacy_path) {
4647 candidates.push(legacy_path);
4648 }
4649 }
4650 let default_path = config::paths::default_state_dir().join("automation_v2_runs.json");
4651 if !candidates.contains(&default_path) {
4652 candidates.push(default_path);
4653 }
4654 candidates
4655}
4656
4657fn parse_automation_v2_file(raw: &str) -> std::collections::HashMap<String, AutomationV2Spec> {
4658 serde_json::from_str::<std::collections::HashMap<String, AutomationV2Spec>>(raw)
4659 .unwrap_or_default()
4660}
4661
4662fn parse_automation_v2_runs_file(
4663 raw: &str,
4664) -> std::collections::HashMap<String, AutomationV2RunRecord> {
4665 serde_json::from_str::<std::collections::HashMap<String, AutomationV2RunRecord>>(raw)
4666 .unwrap_or_default()
4667}
4668
4669fn parse_optimization_campaigns_file(
4670 raw: &str,
4671) -> std::collections::HashMap<String, OptimizationCampaignRecord> {
4672 serde_json::from_str::<std::collections::HashMap<String, OptimizationCampaignRecord>>(raw)
4673 .unwrap_or_default()
4674}
4675
4676fn parse_optimization_experiments_file(
4677 raw: &str,
4678) -> std::collections::HashMap<String, OptimizationExperimentRecord> {
4679 serde_json::from_str::<std::collections::HashMap<String, OptimizationExperimentRecord>>(raw)
4680 .unwrap_or_default()
4681}
4682
4683fn routine_interval_ms(schedule: &RoutineSchedule) -> Option<u64> {
4684 match schedule {
4685 RoutineSchedule::IntervalSeconds { seconds } => Some(seconds.saturating_mul(1000)),
4686 RoutineSchedule::Cron { .. } => None,
4687 }
4688}
4689
4690fn parse_timezone(timezone: &str) -> Option<Tz> {
4691 timezone.trim().parse::<Tz>().ok()
4692}
4693
4694fn next_cron_fire_at_ms(expression: &str, timezone: &str, from_ms: u64) -> Option<u64> {
4695 let tz = parse_timezone(timezone)?;
4696 let schedule = Schedule::from_str(expression).ok()?;
4697 let from_dt = Utc.timestamp_millis_opt(from_ms as i64).single()?;
4698 let local_from = from_dt.with_timezone(&tz);
4699 let next = schedule.after(&local_from).next()?;
4700 Some(next.with_timezone(&Utc).timestamp_millis().max(0) as u64)
4701}
4702
4703fn compute_next_schedule_fire_at_ms(
4704 schedule: &RoutineSchedule,
4705 timezone: &str,
4706 from_ms: u64,
4707) -> Option<u64> {
4708 let _ = parse_timezone(timezone)?;
4709 match schedule {
4710 RoutineSchedule::IntervalSeconds { seconds } => {
4711 Some(from_ms.saturating_add(seconds.saturating_mul(1000)))
4712 }
4713 RoutineSchedule::Cron { expression } => next_cron_fire_at_ms(expression, timezone, from_ms),
4714 }
4715}
4716
4717fn compute_misfire_plan_for_schedule(
4718 now_ms: u64,
4719 next_fire_at_ms: u64,
4720 schedule: &RoutineSchedule,
4721 timezone: &str,
4722 policy: &RoutineMisfirePolicy,
4723) -> (u32, u64) {
4724 match schedule {
4725 RoutineSchedule::IntervalSeconds { .. } => {
4726 let Some(interval_ms) = routine_interval_ms(schedule) else {
4727 return (0, next_fire_at_ms);
4728 };
4729 compute_misfire_plan(now_ms, next_fire_at_ms, interval_ms, policy)
4730 }
4731 RoutineSchedule::Cron { expression } => {
4732 let aligned_next = next_cron_fire_at_ms(expression, timezone, now_ms)
4733 .unwrap_or_else(|| now_ms.saturating_add(60_000));
4734 match policy {
4735 RoutineMisfirePolicy::Skip => (0, aligned_next),
4736 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4737 RoutineMisfirePolicy::CatchUp { max_runs } => {
4738 let mut count = 0u32;
4739 let mut cursor = next_fire_at_ms;
4740 while cursor <= now_ms && count < *max_runs {
4741 count = count.saturating_add(1);
4742 let Some(next) = next_cron_fire_at_ms(expression, timezone, cursor) else {
4743 break;
4744 };
4745 if next <= cursor {
4746 break;
4747 }
4748 cursor = next;
4749 }
4750 (count, aligned_next)
4751 }
4752 }
4753 }
4754 }
4755}
4756
4757fn compute_misfire_plan(
4758 now_ms: u64,
4759 next_fire_at_ms: u64,
4760 interval_ms: u64,
4761 policy: &RoutineMisfirePolicy,
4762) -> (u32, u64) {
4763 if now_ms < next_fire_at_ms || interval_ms == 0 {
4764 return (0, next_fire_at_ms);
4765 }
4766 let missed = ((now_ms.saturating_sub(next_fire_at_ms)) / interval_ms) + 1;
4767 let aligned_next = next_fire_at_ms.saturating_add(missed.saturating_mul(interval_ms));
4768 match policy {
4769 RoutineMisfirePolicy::Skip => (0, aligned_next),
4770 RoutineMisfirePolicy::RunOnce => (1, aligned_next),
4771 RoutineMisfirePolicy::CatchUp { max_runs } => {
4772 let count = missed.min(u64::from(*max_runs)) as u32;
4773 (count, aligned_next)
4774 }
4775 }
4776}
4777
4778fn auto_generated_agent_name(agent_id: &str) -> String {
4779 let names = [
4780 "Maple", "Cinder", "Rivet", "Comet", "Atlas", "Juniper", "Quartz", "Beacon",
4781 ];
4782 let digest = Sha256::digest(agent_id.as_bytes());
4783 let idx = usize::from(digest[0]) % names.len();
4784 format!("{}-{:02x}", names[idx], digest[1])
4785}
4786
4787fn schedule_from_automation_v2(schedule: &AutomationV2Schedule) -> Option<RoutineSchedule> {
4788 match schedule.schedule_type {
4789 AutomationV2ScheduleType::Manual => None,
4790 AutomationV2ScheduleType::Interval => Some(RoutineSchedule::IntervalSeconds {
4791 seconds: schedule.interval_seconds.unwrap_or(60),
4792 }),
4793 AutomationV2ScheduleType::Cron => Some(RoutineSchedule::Cron {
4794 expression: schedule.cron_expression.clone().unwrap_or_default(),
4795 }),
4796 }
4797}
4798
4799fn automation_schedule_next_fire_at_ms(
4800 schedule: &AutomationV2Schedule,
4801 from_ms: u64,
4802) -> Option<u64> {
4803 let routine_schedule = schedule_from_automation_v2(schedule)?;
4804 compute_next_schedule_fire_at_ms(&routine_schedule, &schedule.timezone, from_ms)
4805}
4806
4807fn automation_schedule_due_count(
4808 schedule: &AutomationV2Schedule,
4809 now_ms: u64,
4810 next_fire_at_ms: u64,
4811) -> u32 {
4812 let Some(routine_schedule) = schedule_from_automation_v2(schedule) else {
4813 return 0;
4814 };
4815 let (count, _) = compute_misfire_plan_for_schedule(
4816 now_ms,
4817 next_fire_at_ms,
4818 &routine_schedule,
4819 &schedule.timezone,
4820 &schedule.misfire_policy,
4821 );
4822 count.max(1)
4823}
4824
4825#[derive(Debug, Clone, PartialEq, Eq)]
4826pub enum RoutineExecutionDecision {
4827 Allowed,
4828 RequiresApproval { reason: String },
4829 Blocked { reason: String },
4830}
4831
4832pub fn routine_uses_external_integrations(routine: &RoutineSpec) -> bool {
4833 let entrypoint = routine.entrypoint.to_ascii_lowercase();
4834 if entrypoint.starts_with("connector.")
4835 || entrypoint.starts_with("integration.")
4836 || entrypoint.contains("external")
4837 {
4838 return true;
4839 }
4840 routine
4841 .args
4842 .get("uses_external_integrations")
4843 .and_then(|v| v.as_bool())
4844 .unwrap_or(false)
4845 || routine
4846 .args
4847 .get("connector_id")
4848 .and_then(|v| v.as_str())
4849 .is_some()
4850}
4851
4852pub fn evaluate_routine_execution_policy(
4853 routine: &RoutineSpec,
4854 trigger_type: &str,
4855) -> RoutineExecutionDecision {
4856 if !routine_uses_external_integrations(routine) {
4857 return RoutineExecutionDecision::Allowed;
4858 }
4859 if !routine.external_integrations_allowed {
4860 return RoutineExecutionDecision::Blocked {
4861 reason: "external integrations are disabled by policy".to_string(),
4862 };
4863 }
4864 if routine.requires_approval {
4865 return RoutineExecutionDecision::RequiresApproval {
4866 reason: format!(
4867 "manual approval required before external side effects ({})",
4868 trigger_type
4869 ),
4870 };
4871 }
4872 RoutineExecutionDecision::Allowed
4873}
4874
4875fn is_valid_resource_key(key: &str) -> bool {
4876 let trimmed = key.trim();
4877 if trimmed.is_empty() {
4878 return false;
4879 }
4880 if trimmed == "swarm.active_tasks" {
4881 return true;
4882 }
4883 let allowed_prefix = ["run/", "mission/", "project/", "team/"];
4884 if !allowed_prefix
4885 .iter()
4886 .any(|prefix| trimmed.starts_with(prefix))
4887 {
4888 return false;
4889 }
4890 !trimmed.contains("//")
4891}
4892
4893impl Deref for AppState {
4894 type Target = RuntimeState;
4895
4896 fn deref(&self) -> &Self::Target {
4897 self.runtime
4898 .get()
4899 .expect("runtime accessed before startup completion")
4900 }
4901}
4902
4903#[derive(Clone)]
4904struct ServerPromptContextHook {
4905 state: AppState,
4906}
4907
4908impl ServerPromptContextHook {
4909 fn new(state: AppState) -> Self {
4910 Self { state }
4911 }
4912
4913 async fn open_memory_db(&self) -> Option<MemoryDatabase> {
4914 let paths = resolve_shared_paths().ok()?;
4915 MemoryDatabase::new(&paths.memory_db_path).await.ok()
4916 }
4917
4918 async fn open_memory_manager(&self) -> Option<tandem_memory::MemoryManager> {
4919 let paths = resolve_shared_paths().ok()?;
4920 tandem_memory::MemoryManager::new(&paths.memory_db_path)
4921 .await
4922 .ok()
4923 }
4924
4925 fn hash_query(input: &str) -> String {
4926 let mut hasher = Sha256::new();
4927 hasher.update(input.as_bytes());
4928 format!("{:x}", hasher.finalize())
4929 }
4930
4931 fn build_memory_block(hits: &[tandem_memory::types::GlobalMemorySearchHit]) -> String {
4932 let mut out = vec!["<memory_context>".to_string()];
4933 let mut used = 0usize;
4934 for hit in hits {
4935 let text = hit
4936 .record
4937 .content
4938 .split_whitespace()
4939 .take(60)
4940 .collect::<Vec<_>>()
4941 .join(" ");
4942 let line = format!(
4943 "- [{:.3}] {} (source={}, run={})",
4944 hit.score, text, hit.record.source_type, hit.record.run_id
4945 );
4946 used = used.saturating_add(line.len());
4947 if used > 2200 {
4948 break;
4949 }
4950 out.push(line);
4951 }
4952 out.push("</memory_context>".to_string());
4953 out.join("\n")
4954 }
4955
4956 fn extract_docs_source_url(chunk: &tandem_memory::types::MemoryChunk) -> Option<String> {
4957 chunk
4958 .metadata
4959 .as_ref()
4960 .and_then(|meta| meta.get("source_url"))
4961 .and_then(Value::as_str)
4962 .map(str::trim)
4963 .filter(|v| !v.is_empty())
4964 .map(ToString::to_string)
4965 }
4966
4967 fn extract_docs_relative_path(chunk: &tandem_memory::types::MemoryChunk) -> String {
4968 if let Some(path) = chunk
4969 .metadata
4970 .as_ref()
4971 .and_then(|meta| meta.get("relative_path"))
4972 .and_then(Value::as_str)
4973 .map(str::trim)
4974 .filter(|v| !v.is_empty())
4975 {
4976 return path.to_string();
4977 }
4978 chunk
4979 .source
4980 .strip_prefix("guide_docs:")
4981 .unwrap_or(chunk.source.as_str())
4982 .to_string()
4983 }
4984
4985 fn build_docs_memory_block(hits: &[tandem_memory::types::MemorySearchResult]) -> String {
4986 let mut out = vec!["<docs_context>".to_string()];
4987 let mut used = 0usize;
4988 for hit in hits {
4989 let url = Self::extract_docs_source_url(&hit.chunk).unwrap_or_default();
4990 let path = Self::extract_docs_relative_path(&hit.chunk);
4991 let text = hit
4992 .chunk
4993 .content
4994 .split_whitespace()
4995 .take(70)
4996 .collect::<Vec<_>>()
4997 .join(" ");
4998 let line = format!(
4999 "- [{:.3}] {} (doc_path={}, source_url={})",
5000 hit.similarity, text, path, url
5001 );
5002 used = used.saturating_add(line.len());
5003 if used > 2800 {
5004 break;
5005 }
5006 out.push(line);
5007 }
5008 out.push("</docs_context>".to_string());
5009 out.join("\n")
5010 }
5011
5012 async fn search_embedded_docs(
5013 &self,
5014 query: &str,
5015 limit: usize,
5016 ) -> Vec<tandem_memory::types::MemorySearchResult> {
5017 let Some(manager) = self.open_memory_manager().await else {
5018 return Vec::new();
5019 };
5020 let search_limit = (limit.saturating_mul(3)).clamp(6, 36) as i64;
5021 manager
5022 .search(
5023 query,
5024 Some(MemoryTier::Global),
5025 None,
5026 None,
5027 Some(search_limit),
5028 )
5029 .await
5030 .unwrap_or_default()
5031 .into_iter()
5032 .filter(|hit| hit.chunk.source.starts_with("guide_docs:"))
5033 .take(limit)
5034 .collect()
5035 }
5036
5037 fn should_skip_memory_injection(query: &str) -> bool {
5038 let trimmed = query.trim();
5039 if trimmed.is_empty() {
5040 return true;
5041 }
5042 let lower = trimmed.to_ascii_lowercase();
5043 let social = [
5044 "hi",
5045 "hello",
5046 "hey",
5047 "thanks",
5048 "thank you",
5049 "ok",
5050 "okay",
5051 "cool",
5052 "nice",
5053 "yo",
5054 "good morning",
5055 "good afternoon",
5056 "good evening",
5057 ];
5058 lower.len() <= 32 && social.contains(&lower.as_str())
5059 }
5060
5061 fn personality_preset_text(preset: &str) -> &'static str {
5062 match preset {
5063 "concise" => {
5064 "Default style: concise and high-signal. Prefer short direct responses unless detail is requested."
5065 }
5066 "friendly" => {
5067 "Default style: friendly and supportive while staying technically rigorous and concrete."
5068 }
5069 "mentor" => {
5070 "Default style: mentor-like. Explain decisions and tradeoffs clearly when complexity is non-trivial."
5071 }
5072 "critical" => {
5073 "Default style: critical and risk-first. Surface failure modes and assumptions early."
5074 }
5075 _ => {
5076 "Default style: balanced, pragmatic, and factual. Focus on concrete outcomes and actionable guidance."
5077 }
5078 }
5079 }
5080
5081 fn resolve_identity_block(config: &Value, agent_name: Option<&str>) -> Option<String> {
5082 let allow_agent_override = agent_name
5083 .map(|name| !matches!(name, "compaction" | "title" | "summary"))
5084 .unwrap_or(false);
5085 let legacy_bot_name = config
5086 .get("bot_name")
5087 .and_then(Value::as_str)
5088 .map(str::trim)
5089 .filter(|v| !v.is_empty());
5090 let bot_name = config
5091 .get("identity")
5092 .and_then(|identity| identity.get("bot"))
5093 .and_then(|bot| bot.get("canonical_name"))
5094 .and_then(Value::as_str)
5095 .map(str::trim)
5096 .filter(|v| !v.is_empty())
5097 .or(legacy_bot_name)
5098 .unwrap_or("Tandem");
5099
5100 let default_profile = config
5101 .get("identity")
5102 .and_then(|identity| identity.get("personality"))
5103 .and_then(|personality| personality.get("default"));
5104 let default_preset = default_profile
5105 .and_then(|profile| profile.get("preset"))
5106 .and_then(Value::as_str)
5107 .map(str::trim)
5108 .filter(|v| !v.is_empty())
5109 .unwrap_or("balanced");
5110 let default_custom = default_profile
5111 .and_then(|profile| profile.get("custom_instructions"))
5112 .and_then(Value::as_str)
5113 .map(str::trim)
5114 .filter(|v| !v.is_empty())
5115 .map(ToString::to_string);
5116 let legacy_persona = config
5117 .get("persona")
5118 .and_then(Value::as_str)
5119 .map(str::trim)
5120 .filter(|v| !v.is_empty())
5121 .map(ToString::to_string);
5122
5123 let per_agent_profile = if allow_agent_override {
5124 agent_name.and_then(|name| {
5125 config
5126 .get("identity")
5127 .and_then(|identity| identity.get("personality"))
5128 .and_then(|personality| personality.get("per_agent"))
5129 .and_then(|per_agent| per_agent.get(name))
5130 })
5131 } else {
5132 None
5133 };
5134 let preset = per_agent_profile
5135 .and_then(|profile| profile.get("preset"))
5136 .and_then(Value::as_str)
5137 .map(str::trim)
5138 .filter(|v| !v.is_empty())
5139 .unwrap_or(default_preset);
5140 let custom = per_agent_profile
5141 .and_then(|profile| profile.get("custom_instructions"))
5142 .and_then(Value::as_str)
5143 .map(str::trim)
5144 .filter(|v| !v.is_empty())
5145 .map(ToString::to_string)
5146 .or(default_custom)
5147 .or(legacy_persona);
5148
5149 let mut lines = vec![
5150 format!("You are {bot_name}, an AI assistant."),
5151 Self::personality_preset_text(preset).to_string(),
5152 ];
5153 if let Some(custom) = custom {
5154 lines.push(format!("Additional personality instructions: {custom}"));
5155 }
5156 Some(lines.join("\n"))
5157 }
5158
5159 fn build_memory_scope_block(
5160 session_id: &str,
5161 project_id: Option<&str>,
5162 workspace_root: Option<&str>,
5163 ) -> String {
5164 let mut lines = vec![
5165 "<memory_scope>".to_string(),
5166 format!("- current_session_id: {}", session_id),
5167 ];
5168 if let Some(project_id) = project_id.map(str::trim).filter(|value| !value.is_empty()) {
5169 lines.push(format!("- current_project_id: {}", project_id));
5170 }
5171 if let Some(workspace_root) = workspace_root
5172 .map(str::trim)
5173 .filter(|value| !value.is_empty())
5174 {
5175 lines.push(format!("- workspace_root: {}", workspace_root));
5176 }
5177 lines.push(
5178 "- default_memory_search_behavior: search current session, then current project/workspace, then global memory"
5179 .to_string(),
5180 );
5181 lines.push(
5182 "- use memory_search without IDs for normal recall; only pass tier/session_id/project_id when narrowing scope"
5183 .to_string(),
5184 );
5185 lines.push(
5186 "- when memory is sparse or stale, inspect the workspace with glob, grep, and read"
5187 .to_string(),
5188 );
5189 lines.push("</memory_scope>".to_string());
5190 lines.join("\n")
5191 }
5192}
5193
5194impl PromptContextHook for ServerPromptContextHook {
5195 fn augment_provider_messages(
5196 &self,
5197 ctx: PromptContextHookContext,
5198 mut messages: Vec<ChatMessage>,
5199 ) -> BoxFuture<'static, anyhow::Result<Vec<ChatMessage>>> {
5200 let this = self.clone();
5201 Box::pin(async move {
5202 if !this.state.is_ready() {
5205 return Ok(messages);
5206 }
5207 let run = this.state.run_registry.get(&ctx.session_id).await;
5208 let Some(run) = run else {
5209 return Ok(messages);
5210 };
5211 let config = this.state.config.get_effective_value().await;
5212 if let Some(identity_block) =
5213 Self::resolve_identity_block(&config, run.agent_profile.as_deref())
5214 {
5215 messages.push(ChatMessage {
5216 role: "system".to_string(),
5217 content: identity_block,
5218 attachments: Vec::new(),
5219 });
5220 }
5221 if let Some(session) = this.state.storage.get_session(&ctx.session_id).await {
5222 messages.push(ChatMessage {
5223 role: "system".to_string(),
5224 content: Self::build_memory_scope_block(
5225 &ctx.session_id,
5226 session.project_id.as_deref(),
5227 session.workspace_root.as_deref(),
5228 ),
5229 attachments: Vec::new(),
5230 });
5231 }
5232 let run_id = run.run_id;
5233 let user_id = run.client_id.unwrap_or_else(|| "default".to_string());
5234 let query = messages
5235 .iter()
5236 .rev()
5237 .find(|m| m.role == "user")
5238 .map(|m| m.content.clone())
5239 .unwrap_or_default();
5240 if query.trim().is_empty() {
5241 return Ok(messages);
5242 }
5243 if Self::should_skip_memory_injection(&query) {
5244 return Ok(messages);
5245 }
5246
5247 let docs_hits = this.search_embedded_docs(&query, 6).await;
5248 if !docs_hits.is_empty() {
5249 let docs_block = Self::build_docs_memory_block(&docs_hits);
5250 messages.push(ChatMessage {
5251 role: "system".to_string(),
5252 content: docs_block.clone(),
5253 attachments: Vec::new(),
5254 });
5255 this.state.event_bus.publish(EngineEvent::new(
5256 "memory.docs.context.injected",
5257 json!({
5258 "runID": run_id,
5259 "sessionID": ctx.session_id,
5260 "messageID": ctx.message_id,
5261 "iteration": ctx.iteration,
5262 "count": docs_hits.len(),
5263 "tokenSizeApprox": docs_block.split_whitespace().count(),
5264 "sourcePrefix": "guide_docs:"
5265 }),
5266 ));
5267 return Ok(messages);
5268 }
5269
5270 let Some(db) = this.open_memory_db().await else {
5271 return Ok(messages);
5272 };
5273 let started = now_ms();
5274 let hits = db
5275 .search_global_memory(&user_id, &query, 8, None, None, None)
5276 .await
5277 .unwrap_or_default();
5278 let latency_ms = now_ms().saturating_sub(started);
5279 let scores = hits.iter().map(|h| h.score).collect::<Vec<_>>();
5280 this.state.event_bus.publish(EngineEvent::new(
5281 "memory.search.performed",
5282 json!({
5283 "runID": run_id,
5284 "sessionID": ctx.session_id,
5285 "messageID": ctx.message_id,
5286 "providerID": ctx.provider_id,
5287 "modelID": ctx.model_id,
5288 "iteration": ctx.iteration,
5289 "queryHash": Self::hash_query(&query),
5290 "resultCount": hits.len(),
5291 "scoreMin": scores.iter().copied().reduce(f64::min),
5292 "scoreMax": scores.iter().copied().reduce(f64::max),
5293 "scores": scores,
5294 "latencyMs": latency_ms,
5295 "sources": hits.iter().map(|h| h.record.source_type.clone()).collect::<Vec<_>>(),
5296 }),
5297 ));
5298
5299 if hits.is_empty() {
5300 return Ok(messages);
5301 }
5302
5303 let memory_block = Self::build_memory_block(&hits);
5304 messages.push(ChatMessage {
5305 role: "system".to_string(),
5306 content: memory_block.clone(),
5307 attachments: Vec::new(),
5308 });
5309 this.state.event_bus.publish(EngineEvent::new(
5310 "memory.context.injected",
5311 json!({
5312 "runID": run_id,
5313 "sessionID": ctx.session_id,
5314 "messageID": ctx.message_id,
5315 "iteration": ctx.iteration,
5316 "count": hits.len(),
5317 "tokenSizeApprox": memory_block.split_whitespace().count(),
5318 }),
5319 ));
5320 Ok(messages)
5321 })
5322 }
5323}
5324
5325fn extract_event_session_id(properties: &Value) -> Option<String> {
5326 properties
5327 .get("sessionID")
5328 .or_else(|| properties.get("sessionId"))
5329 .or_else(|| properties.get("id"))
5330 .or_else(|| {
5331 properties
5332 .get("part")
5333 .and_then(|part| part.get("sessionID"))
5334 })
5335 .or_else(|| {
5336 properties
5337 .get("part")
5338 .and_then(|part| part.get("sessionId"))
5339 })
5340 .and_then(|v| v.as_str())
5341 .map(|s| s.to_string())
5342}
5343
5344fn extract_event_run_id(properties: &Value) -> Option<String> {
5345 properties
5346 .get("runID")
5347 .or_else(|| properties.get("run_id"))
5348 .or_else(|| properties.get("part").and_then(|part| part.get("runID")))
5349 .or_else(|| properties.get("part").and_then(|part| part.get("run_id")))
5350 .and_then(|v| v.as_str())
5351 .map(|s| s.to_string())
5352}
5353
5354pub fn extract_persistable_tool_part(properties: &Value) -> Option<(String, MessagePart)> {
5355 let part = properties.get("part")?;
5356 let part_type = part
5357 .get("type")
5358 .and_then(|v| v.as_str())
5359 .unwrap_or_default()
5360 .to_ascii_lowercase();
5361 if part_type != "tool"
5362 && part_type != "tool-invocation"
5363 && part_type != "tool-result"
5364 && part_type != "tool_invocation"
5365 && part_type != "tool_result"
5366 {
5367 return None;
5368 }
5369 let part_state = part
5370 .get("state")
5371 .and_then(|v| v.as_str())
5372 .unwrap_or_default()
5373 .to_ascii_lowercase();
5374 let has_result = part.get("result").is_some_and(|value| !value.is_null());
5375 let has_error = part
5376 .get("error")
5377 .and_then(|v| v.as_str())
5378 .is_some_and(|value| !value.trim().is_empty());
5379 if part_state == "running" && !has_result && !has_error {
5382 return None;
5383 }
5384 let tool = part.get("tool").and_then(|v| v.as_str())?.to_string();
5385 let message_id = part
5386 .get("messageID")
5387 .or_else(|| part.get("message_id"))
5388 .and_then(|v| v.as_str())?
5389 .to_string();
5390 let mut args = part.get("args").cloned().unwrap_or_else(|| json!({}));
5391 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5392 if let Some(preview) = properties
5393 .get("toolCallDelta")
5394 .and_then(|delta| delta.get("parsedArgsPreview"))
5395 .cloned()
5396 {
5397 let preview_nonempty = !preview.is_null()
5398 && !preview.as_object().is_some_and(|value| value.is_empty())
5399 && !preview
5400 .as_str()
5401 .map(|value| value.trim().is_empty())
5402 .unwrap_or(false);
5403 if preview_nonempty {
5404 args = preview;
5405 }
5406 }
5407 if args.is_null() || args.as_object().is_some_and(|value| value.is_empty()) {
5408 if let Some(raw_preview) = properties
5409 .get("toolCallDelta")
5410 .and_then(|delta| delta.get("rawArgsPreview"))
5411 .and_then(|value| value.as_str())
5412 .map(str::trim)
5413 .filter(|value| !value.is_empty())
5414 {
5415 args = Value::String(raw_preview.to_string());
5416 }
5417 }
5418 }
5419 if tool == "write" && (args.is_null() || args.as_object().is_some_and(|value| value.is_empty()))
5420 {
5421 tracing::info!(
5422 message_id = %message_id,
5423 has_tool_call_delta = properties.get("toolCallDelta").is_some(),
5424 part_state = %part.get("state").and_then(|v| v.as_str()).unwrap_or(""),
5425 has_result = part.get("result").is_some(),
5426 has_error = part.get("error").is_some(),
5427 "persistable write tool part still has empty args"
5428 );
5429 }
5430 let result = part.get("result").cloned().filter(|value| !value.is_null());
5431 let error = part
5432 .get("error")
5433 .and_then(|v| v.as_str())
5434 .map(|value| value.to_string());
5435 Some((
5436 message_id,
5437 MessagePart::ToolInvocation {
5438 tool,
5439 args,
5440 result,
5441 error,
5442 },
5443 ))
5444}
5445
5446pub fn derive_status_index_update(event: &EngineEvent) -> Option<StatusIndexUpdate> {
5447 let session_id = extract_event_session_id(&event.properties)?;
5448 let run_id = extract_event_run_id(&event.properties);
5449 let key = format!("run/{session_id}/status");
5450
5451 let mut base = serde_json::Map::new();
5452 base.insert("sessionID".to_string(), Value::String(session_id));
5453 if let Some(run_id) = run_id {
5454 base.insert("runID".to_string(), Value::String(run_id));
5455 }
5456
5457 match event.event_type.as_str() {
5458 "session.run.started" => {
5459 base.insert("state".to_string(), Value::String("running".to_string()));
5460 base.insert("phase".to_string(), Value::String("run".to_string()));
5461 base.insert(
5462 "eventType".to_string(),
5463 Value::String("session.run.started".to_string()),
5464 );
5465 Some(StatusIndexUpdate {
5466 key,
5467 value: Value::Object(base),
5468 })
5469 }
5470 "session.run.finished" => {
5471 base.insert("state".to_string(), Value::String("finished".to_string()));
5472 base.insert("phase".to_string(), Value::String("run".to_string()));
5473 if let Some(status) = event.properties.get("status").and_then(|v| v.as_str()) {
5474 base.insert("result".to_string(), Value::String(status.to_string()));
5475 }
5476 base.insert(
5477 "eventType".to_string(),
5478 Value::String("session.run.finished".to_string()),
5479 );
5480 Some(StatusIndexUpdate {
5481 key,
5482 value: Value::Object(base),
5483 })
5484 }
5485 "message.part.updated" => {
5486 let part = event.properties.get("part")?;
5487 let part_type = part.get("type").and_then(|v| v.as_str())?;
5488 let part_state = part.get("state").and_then(|v| v.as_str()).unwrap_or("");
5489 let (phase, tool_active) = match (part_type, part_state) {
5490 ("tool-invocation", _) | ("tool", "running") | ("tool", "") => ("tool", true),
5491 ("tool-result", _) | ("tool", "completed") | ("tool", "failed") => ("run", false),
5492 _ => return None,
5493 };
5494 base.insert("state".to_string(), Value::String("running".to_string()));
5495 base.insert("phase".to_string(), Value::String(phase.to_string()));
5496 base.insert("toolActive".to_string(), Value::Bool(tool_active));
5497 if let Some(tool_name) = part.get("tool").and_then(|v| v.as_str()) {
5498 base.insert("tool".to_string(), Value::String(tool_name.to_string()));
5499 }
5500 if let Some(tool_state) = part.get("state").and_then(|v| v.as_str()) {
5501 base.insert(
5502 "toolState".to_string(),
5503 Value::String(tool_state.to_string()),
5504 );
5505 }
5506 if let Some(tool_error) = part
5507 .get("error")
5508 .and_then(|v| v.as_str())
5509 .map(str::trim)
5510 .filter(|value| !value.is_empty())
5511 {
5512 base.insert(
5513 "toolError".to_string(),
5514 Value::String(tool_error.to_string()),
5515 );
5516 }
5517 if let Some(tool_call_id) = part
5518 .get("id")
5519 .and_then(|v| v.as_str())
5520 .map(str::trim)
5521 .filter(|value| !value.is_empty())
5522 {
5523 base.insert(
5524 "toolCallID".to_string(),
5525 Value::String(tool_call_id.to_string()),
5526 );
5527 }
5528 if let Some(args_preview) = part
5529 .get("args")
5530 .filter(|value| {
5531 !value.is_null()
5532 && !value.as_object().is_some_and(|map| map.is_empty())
5533 && !value
5534 .as_str()
5535 .map(|text| text.trim().is_empty())
5536 .unwrap_or(false)
5537 })
5538 .map(|value| truncate_text(&value.to_string(), 500))
5539 {
5540 base.insert(
5541 "toolArgsPreview".to_string(),
5542 Value::String(args_preview.to_string()),
5543 );
5544 }
5545 base.insert(
5546 "eventType".to_string(),
5547 Value::String("message.part.updated".to_string()),
5548 );
5549 Some(StatusIndexUpdate {
5550 key,
5551 value: Value::Object(base),
5552 })
5553 }
5554 _ => None,
5555 }
5556}
5557
5558pub async fn run_session_part_persister(state: AppState) {
5559 crate::app::tasks::run_session_part_persister(state).await
5560}
5561
5562pub async fn run_status_indexer(state: AppState) {
5563 crate::app::tasks::run_status_indexer(state).await
5564}
5565
5566pub async fn run_agent_team_supervisor(state: AppState) {
5567 crate::app::tasks::run_agent_team_supervisor(state).await
5568}
5569
5570pub async fn run_bug_monitor(state: AppState) {
5571 crate::app::tasks::run_bug_monitor(state).await
5572}
5573
5574pub async fn run_usage_aggregator(state: AppState) {
5575 crate::app::tasks::run_usage_aggregator(state).await
5576}
5577
5578pub async fn run_optimization_scheduler(state: AppState) {
5579 crate::app::tasks::run_optimization_scheduler(state).await
5580}
5581
5582pub async fn process_bug_monitor_event(
5583 state: &AppState,
5584 event: &EngineEvent,
5585 config: &BugMonitorConfig,
5586) -> anyhow::Result<BugMonitorIncidentRecord> {
5587 let submission =
5588 crate::bug_monitor::service::build_bug_monitor_submission_from_event(state, config, event)
5589 .await?;
5590 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
5591 state,
5592 submission.repo.as_deref().unwrap_or_default(),
5593 submission.fingerprint.as_deref().unwrap_or_default(),
5594 submission.title.as_deref(),
5595 submission.detail.as_deref(),
5596 &submission.excerpt,
5597 3,
5598 )
5599 .await;
5600 let fingerprint = submission
5601 .fingerprint
5602 .clone()
5603 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
5604 let default_workspace_root = state.workspace_index.snapshot().await.root;
5605 let workspace_root = config
5606 .workspace_root
5607 .clone()
5608 .unwrap_or(default_workspace_root);
5609 let now = now_ms();
5610
5611 let existing = state
5612 .bug_monitor_incidents
5613 .read()
5614 .await
5615 .values()
5616 .find(|row| row.fingerprint == fingerprint)
5617 .cloned();
5618
5619 let mut incident = if let Some(mut row) = existing {
5620 row.occurrence_count = row.occurrence_count.saturating_add(1);
5621 row.updated_at_ms = now;
5622 row.last_seen_at_ms = Some(now);
5623 if row.excerpt.is_empty() {
5624 row.excerpt = submission.excerpt.clone();
5625 }
5626 row
5627 } else {
5628 BugMonitorIncidentRecord {
5629 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
5630 fingerprint: fingerprint.clone(),
5631 event_type: event.event_type.clone(),
5632 status: "queued".to_string(),
5633 repo: submission.repo.clone().unwrap_or_default(),
5634 workspace_root,
5635 title: submission
5636 .title
5637 .clone()
5638 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
5639 detail: submission.detail.clone(),
5640 excerpt: submission.excerpt.clone(),
5641 source: submission.source.clone(),
5642 run_id: submission.run_id.clone(),
5643 session_id: submission.session_id.clone(),
5644 correlation_id: submission.correlation_id.clone(),
5645 component: submission.component.clone(),
5646 level: submission.level.clone(),
5647 occurrence_count: 1,
5648 created_at_ms: now,
5649 updated_at_ms: now,
5650 last_seen_at_ms: Some(now),
5651 draft_id: None,
5652 triage_run_id: None,
5653 last_error: None,
5654 duplicate_summary: None,
5655 duplicate_matches: None,
5656 event_payload: Some(event.properties.clone()),
5657 }
5658 };
5659 state.put_bug_monitor_incident(incident.clone()).await?;
5660
5661 if !duplicate_matches.is_empty() {
5662 incident.status = "duplicate_suppressed".to_string();
5663 let duplicate_summary =
5664 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
5665 incident.duplicate_summary = Some(duplicate_summary.clone());
5666 incident.duplicate_matches = Some(duplicate_matches.clone());
5667 incident.updated_at_ms = now_ms();
5668 state.put_bug_monitor_incident(incident.clone()).await?;
5669 state.event_bus.publish(EngineEvent::new(
5670 "bug_monitor.incident.duplicate_suppressed",
5671 serde_json::json!({
5672 "incident_id": incident.incident_id,
5673 "fingerprint": incident.fingerprint,
5674 "eventType": incident.event_type,
5675 "status": incident.status,
5676 "duplicate_summary": duplicate_summary,
5677 "duplicate_matches": duplicate_matches,
5678 }),
5679 ));
5680 return Ok(incident);
5681 }
5682
5683 let draft = match state.submit_bug_monitor_draft(submission).await {
5684 Ok(draft) => draft,
5685 Err(error) => {
5686 incident.status = "draft_failed".to_string();
5687 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5688 incident.updated_at_ms = now_ms();
5689 state.put_bug_monitor_incident(incident.clone()).await?;
5690 state.event_bus.publish(EngineEvent::new(
5691 "bug_monitor.incident.detected",
5692 serde_json::json!({
5693 "incident_id": incident.incident_id,
5694 "fingerprint": incident.fingerprint,
5695 "eventType": incident.event_type,
5696 "draft_id": incident.draft_id,
5697 "triage_run_id": incident.triage_run_id,
5698 "status": incident.status,
5699 "detail": incident.last_error,
5700 }),
5701 ));
5702 return Ok(incident);
5703 }
5704 };
5705 incident.draft_id = Some(draft.draft_id.clone());
5706 incident.status = "draft_created".to_string();
5707 state.put_bug_monitor_incident(incident.clone()).await?;
5708
5709 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
5710 state.clone(),
5711 &draft.draft_id,
5712 true,
5713 )
5714 .await
5715 {
5716 Ok((updated_draft, _run_id, _deduped)) => {
5717 incident.triage_run_id = updated_draft.triage_run_id.clone();
5718 if incident.triage_run_id.is_some() {
5719 incident.status = "triage_queued".to_string();
5720 }
5721 incident.last_error = None;
5722 }
5723 Err(error) => {
5724 incident.status = "draft_created".to_string();
5725 incident.last_error = Some(truncate_text(&error.to_string(), 500));
5726 }
5727 }
5728
5729 if let Some(draft_id) = incident.draft_id.clone() {
5730 let latest_draft = state
5731 .get_bug_monitor_draft(&draft_id)
5732 .await
5733 .unwrap_or(draft.clone());
5734 match crate::bug_monitor_github::publish_draft(
5735 state,
5736 &draft_id,
5737 Some(&incident.incident_id),
5738 crate::bug_monitor_github::PublishMode::Auto,
5739 )
5740 .await
5741 {
5742 Ok(outcome) => {
5743 incident.status = outcome.action;
5744 incident.last_error = None;
5745 }
5746 Err(error) => {
5747 let detail = truncate_text(&error.to_string(), 500);
5748 incident.last_error = Some(detail.clone());
5749 let mut failed_draft = latest_draft;
5750 failed_draft.status = "github_post_failed".to_string();
5751 failed_draft.github_status = Some("github_post_failed".to_string());
5752 failed_draft.last_post_error = Some(detail.clone());
5753 let evidence_digest = failed_draft.evidence_digest.clone();
5754 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
5755 let _ = crate::bug_monitor_github::record_post_failure(
5756 state,
5757 &failed_draft,
5758 Some(&incident.incident_id),
5759 "auto_post",
5760 evidence_digest.as_deref(),
5761 &detail,
5762 )
5763 .await;
5764 }
5765 }
5766 }
5767
5768 incident.updated_at_ms = now_ms();
5769 state.put_bug_monitor_incident(incident.clone()).await?;
5770 state.event_bus.publish(EngineEvent::new(
5771 "bug_monitor.incident.detected",
5772 serde_json::json!({
5773 "incident_id": incident.incident_id,
5774 "fingerprint": incident.fingerprint,
5775 "eventType": incident.event_type,
5776 "draft_id": incident.draft_id,
5777 "triage_run_id": incident.triage_run_id,
5778 "status": incident.status,
5779 }),
5780 ));
5781 Ok(incident)
5782}
5783
5784pub fn sha256_hex(parts: &[&str]) -> String {
5785 let mut hasher = Sha256::new();
5786 for part in parts {
5787 hasher.update(part.as_bytes());
5788 hasher.update([0u8]);
5789 }
5790 format!("{:x}", hasher.finalize())
5791}
5792
5793fn automation_status_uses_scheduler_capacity(status: &AutomationRunStatus) -> bool {
5794 matches!(status, AutomationRunStatus::Running)
5795}
5796
5797fn automation_status_holds_workspace_lock(status: &AutomationRunStatus) -> bool {
5798 matches!(
5799 status,
5800 AutomationRunStatus::Running
5801 | AutomationRunStatus::Pausing
5802 | AutomationRunStatus::Paused
5803 | AutomationRunStatus::AwaitingApproval
5804 )
5805}
5806
5807pub async fn run_routine_scheduler(state: AppState) {
5808 crate::app::tasks::run_routine_scheduler(state).await
5809}
5810
5811pub async fn run_routine_executor(state: AppState) {
5812 crate::app::tasks::run_routine_executor(state).await
5813}
5814
5815pub async fn build_routine_prompt(state: &AppState, run: &RoutineRunRecord) -> String {
5816 crate::app::routines::build_routine_prompt(state, run).await
5817}
5818
5819pub fn truncate_text(input: &str, max_len: usize) -> String {
5820 if input.len() <= max_len {
5821 return input.to_string();
5822 }
5823 let mut out = input[..max_len].to_string();
5824 out.push_str("...<truncated>");
5825 out
5826}
5827
5828pub async fn append_configured_output_artifacts(state: &AppState, run: &RoutineRunRecord) {
5829 crate::app::routines::append_configured_output_artifacts(state, run).await
5830}
5831
5832pub fn default_model_spec_from_effective_config(config: &Value) -> Option<ModelSpec> {
5833 let provider_id = config
5834 .get("default_provider")
5835 .and_then(|v| v.as_str())
5836 .map(str::trim)
5837 .filter(|v| !v.is_empty())?;
5838 let model_id = config
5839 .get("providers")
5840 .and_then(|v| v.get(provider_id))
5841 .and_then(|v| v.get("default_model"))
5842 .and_then(|v| v.as_str())
5843 .map(str::trim)
5844 .filter(|v| !v.is_empty())?;
5845 Some(ModelSpec {
5846 provider_id: provider_id.to_string(),
5847 model_id: model_id.to_string(),
5848 })
5849}
5850
5851pub async fn resolve_routine_model_spec_for_run(
5852 state: &AppState,
5853 run: &RoutineRunRecord,
5854) -> (Option<ModelSpec>, String) {
5855 crate::app::routines::resolve_routine_model_spec_for_run(state, run).await
5856}
5857
5858fn normalize_non_empty_list(raw: Vec<String>) -> Vec<String> {
5859 let mut out = Vec::new();
5860 let mut seen = std::collections::HashSet::new();
5861 for item in raw {
5862 let normalized = item.trim().to_string();
5863 if normalized.is_empty() {
5864 continue;
5865 }
5866 if seen.insert(normalized.clone()) {
5867 out.push(normalized);
5868 }
5869 }
5870 out
5871}
5872
5873#[cfg(not(feature = "browser"))]
5874impl AppState {
5875 pub async fn close_browser_sessions_for_owner(&self, _owner_session_id: &str) -> usize {
5876 0
5877 }
5878
5879 pub async fn close_all_browser_sessions(&self) -> usize {
5880 0
5881 }
5882
5883 pub async fn browser_status(&self) -> serde_json::Value {
5884 serde_json::json!({ "enabled": false, "sidecar": { "found": false }, "browser": { "found": false } })
5885 }
5886
5887 pub async fn browser_smoke_test(
5888 &self,
5889 _url: Option<String>,
5890 ) -> anyhow::Result<serde_json::Value> {
5891 anyhow::bail!("browser feature disabled")
5892 }
5893
5894 pub async fn install_browser_sidecar(&self) -> anyhow::Result<serde_json::Value> {
5895 anyhow::bail!("browser feature disabled")
5896 }
5897
5898 pub async fn browser_health_summary(&self) -> serde_json::Value {
5899 serde_json::json!({ "enabled": false })
5900 }
5901}
5902
5903pub mod automation;
5904pub use automation::*;
5905
5906#[cfg(test)]
5907mod tests;